区块链技术博客
www.b2bchain.cn

HIVE 参数调优求职学习资料

本文介绍了HIVE 参数调优求职学习资料,有助于帮助完成毕业设计以及求职,是一篇很好的资料。

对技术面试,学习经验等有一些体会,在此分享。

以下包含MR和TEZ的参数,注意区分

常用参数汇总:
— 直接设置reduce的个数
set mapred.reduce.tasks = 15;
— 设置每个reduce的大小
set hive.exec.reducers.bytes.per.reducer=500000000;

— 设置调度优先级
set mapred.job.priority=VERY_HIGH;
(取值 mapreduce 分别设置优先级VERY_HIGH,HIGH,NORMAL 。设置这个值不会精确的控制MR的顺序,只是在计算任务向集群服务申请资源的时候会起作用。)
与之有关的设置调度队列:mapreduce.job.queuename

— 设置并发
set hive.exec.parallel=true;

— 设置名称
set mapred.job.name=my_job_{DATE};
(平台化没有很成熟的时候,这个很好用)

— 设置文件合并
set abaci.is.dag.job=false;
set hive.merge.mapredfiles=true;
set mapred.combine.input.format.local.only=false;
set hive.merge.smallfiles.avgsize=100000000;

— 动态分区
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;

— 设置内存大小
SET mapreduce.map.memory.mb=4096;
如果是mapper oom。需要查看下:
(1)适当增大mapper内存。(设置越大,资源消耗越大)

设置map内存

set mapreduce.map.memory.mb=3000;

设置为上面参数的0.8倍左右

set mapreduce.map.java.opts=-Xms2600m -Xmx2600m;
(2)减少每个mapper处理的数据量。
mapred.max.split.size,默认是256000000,可以适当缩小(设置越小,mapper越多,资源消耗越大)

如果是reduce oom,则需要查看下:
(1)reduce数量是否过少。通常可以增大到上百个。通过降低hive.exec.reducers.bytes.per.reducer来增加reduce数量。默认值为104857600,可以降低到50000000,来增加一倍的reduce数量。
(2)如果增加reduce数量无效,需要查看下数据是否有倾斜,通过select count(), JoinKey group by JoinKey order by count() desc limit 100,来查看JoinKey的前100个数据量,从而查看的JoinKey倾斜情况。
(3)最后尝试设置增加reducer内存。(以下两个参数必须同时设置)

设置reduce内存

set mapreduce.reduce.memory.mb=4096;

设置为上面参数的0.75倍

set mapreduce.reduce.java.opts=-Xms3276m -Xmx3276m;

— 最多同时运行map任务数 (hive streaming)
set mapred.job.map.capacity=2000;
set mapred.job.reduce.capacity=2000;

===============================================
增加内存
1、tez:

  • set hive.tez.container.size=6000;

    — map或reduce oom,同事设置了map、reduce的内存大小,建议分开设置,避免资源浪费
    可以将tez container分成2阶段分别设置 如下
    tez map 阶段:
    set hive.tez.map.memory.mb=4000;
    set mapreduce.map.java.opts=-Xms3200m -Xmx3200m;
    tez reducer 阶段:
    set hive.tez.reduce.memory.mb=6144;
    set mapreduce.reduce.java.opts=-Xms4608m -Xmx4608m;

  • set tez.am.resource.memory.mb=12000;

    — am oom

2、MR:
set mapreduce.map.memory.mb=6000;
set mapreduce.map.java.opts=-Xms4500m -Xmx4500m;

set mapreduce.reduce.memory.mb=6000;
set mapreduce.reduce.java.opts=-Xms4500m -Xmx4500m;

===============================================
SET hive.auto.convert.join.noconditionaltask.size=429496729;
hive.exec.orc.split.strategy参数的说明
http://cloudsqale.com/2018/11/12/tez-internals-2-number-of-map-tasks-for-large-orc-files-with-small-stripes-in-amazon-emr/

— set hive.exec.orc.split.strategy=BI
When hive.exec.orc.split.strategy=BI is set, Hive does not read ORC stripe information to define the number of input splits. Instead it uses the following algorithm:

Define the input size – 897,099,150 bytes
Define the block size – fs.s3n.block.size=67108864 (depending on S3 URI schema it can be fs.s3.block.size or fs.s3a.block.size)
Get the number of input splits as input size/fs.s3n.block.size = 897099150/67108864 = 13
Check the input split size 897099150/13 = 69007626 to make sure it is within tez.grouping.min-size=52428800 and tez.grouping.max-size=1073741824 by default. Since the split grouping is not required, and 13 Map tasks are created.

— set hive.exec.orc.split.strategy=ETL
When hive.exec.orc.split.strategy=ETL is set, the following algorithm is applied:

Read ORC stripe information – there are 402 stripes in my sample ORC file
Combine stripes to create inputs splits within mapreduce.input.fileinputformat.split.minsize=128000000 and mapreduce.input.fileinputformat.split.maxsize=256000000 (these values are set in my EMR cluster). This gives 897099150/128000000=7 input splits.

========================================================
— set hive.fetch.task.conversion=more;
hive.fetch.task.conversion
Expects one of [none, minimal, more].
Some select queries can be converted to single FETCH task minimizing latency.
Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins.

  1. none : disable hive.fetch.task.conversion
  2. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
  3. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)

问题:大表线性扫描查询速度过慢
对于大表没有聚合的明细查询,hive默认不走tez/MR job。
而大表线性扫描查询速度过于慢,原因:对于没有聚合的明细数据查询,hive默认采用无任务模式降低启动yarn作业的开销。
例如:SELECT * FROM employees;
设置set hive.fetch.task.conversion=more
在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。不需要起MapReduce job,直接通过Fetch task获取数据

— set hive.fetch.task.conversion=none

所有查询都会执行mapreduce程序

========================================================
set hive.exec.mode.local.auto=true; 开启本地MR

========================================================

MapJoin的优势在于没有shuffle
–是否自动转换为mapjoin
set hive.auto.convert.join = true;
–小表的最大文件大小,默认为25000000,即25M
set hive.mapjoin.smalltable.filesize = 25000000;
–是否将多个mapjoin合并为一个
set hive.auto.convert.join.noconditionaltask = true;
–多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。同时hive.auto.convert.join.noconditionaltask必须为true
set hive.auto.convert.join.noconditionaltask.size = 10000000;

mapjoin实现的方式:
1)在Map-Reduce的驱动程序中使用静态方法DistributedCache.addCacheFile()增加要拷贝的小表文件。 JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
2)在Map类的setup方法中使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

========================================================
设置执行引擎
set hive.execution.engine=mr;
set hive.execution.engine=tez;

========================================================
提高HiveQL聚合的执行性能。
SET hive.map.aggr = true;

可以将顶层的聚合操作放在Map阶段执行,
从而减轻清洗阶段数据传输和Reduce阶段的执行时间,提升总体性能。

执行原理: https://blog.csdn.net/chybin500/article/details/80988089

========================================================
Tez资源分配:https://cwiki.apache.org/confluence/display/TEZ/How+initial+task+parallelism+works?spm=a2c6h.13066369.0.0.47687f02dzrL4q
Apps using Tez have the ability to determine the number of tasks reading the initial external data for a job (the number of mappers in MapReduce parlance). Here is a short description of how that works.

There is an InputInitializer specified for the initial vertex reading the external input. During vertex initialization, the InputInitializer is invoked to determine the number of shards of the external data distributed across the cluster. In MapReduce parlance, these would be called input splits and would be determined by the InputFormat for that external input.
If Tez grouping is enabled for the splits, then a generic grouping logic is run on these splits to group them into larger splits. The idea is to strike a balance between how parallel the processing is and how much work is being done in each parallel process.
First, Tez tries to find out the resource availability in the cluster for these tasks. For that, YARN provides a headroom value (and in future other attributes may be used). Lets say this value is T.
Next, Tez divides T with the resource per task (say M) to find out how many tasks can run in parallel at one (ie in a single wave). W = T/M.
Next W is multiplied by a wave factor (from configuration – tez.grouping.split-waves) to determine the number of tasks to be used. Lets say this value is N.
If there are a total of X splits (input shards) and N tasks then this would group X/N splits per task. Tez then estimates the size of data per task based on the number of splits per task.
If this value is between tez.grouping.max-size & tez.grouping.min-size then N is accepted as the number of tasks. If not, then N is adjusted to bring the data per task in line with the max/min depending on which threshold was crossed.
For experimental purposes tez.grouping.split-count can be set in configuration to specify the desired number of groups. If this config is specified then the above logic is ignored and Tez tries to group splits into the specified number of groups. This is best effort.
After this the grouping algorithm is executed. It groups splits by node locality, then rack locality, while respecting the group size limits.
If the final number of splits is X then X tasks are created in the initial vertex and they are executed on the cluster to read the external data.

Here is the detailed explanation of grouping algorithm (TezSplitGrouper.getGroupedSplits).

Figure out desired number of grouped splits. This is affected by several factors. If TEZ_GROUPING_SPLIT_COUNT is set, the value will be used as initial count. Otherwise, the one passed in parameter will be used. Then the initial count will be corrected according to TEZ_GROUPING_SPLIT_MIN_SIZE and TEZ_GROUPING_SPLIT_MAX_SIZE: if the initial count causes a too small grouped split size, it will be override as total input size/TEZ_GROUPING_SPLIT_MIN_SIZE; if initial count causes too large grouped split size, it will be override as total input size/TEZ_GROUPING_SPLIT_MAX_SIZE.
Try grouping splits of same node to desired grouped split size. In this pass, we only allow splits in the same node to group together. Also, small grouped split is not allowed.
If we can no longer get a group on node level locality, there are two choices then. One is fallback to rack locality and continue to group, the other is to just allow small group on node level locality. The first one is by default, and the second one requires setting configuration TEZ_GROUPING_NODE_LOCAL_ONLY.
If there are still ungrouped splits, allow small groups anyway and continue to use locality level in last step.

如果为拆分启用了Tez分组,则会在这些拆分上运行通用分组逻辑,以将它们分组为更大的拆分。我们的想法是在处理的并行程度与每个并行流程中的工作量之间取得平衡。

首先,Tez尝试为这些任务找出集群中的资源可用性。为此,YARN提供净空值(并且将来可以使用其他属性)。让我们说这个值是T.
接下来,Tez将T与每个任务的资源(比如M)分开,以找出在一个任务中(即在单个波中)并行运行的任务数。W = T / M.
接下来W乘以波动因子(来自配置 – tez.grouping.split-waves)以确定要使用的任务数。可以说这个值是N.
如果总共有X个分裂(输入分片)和N个任务,那么这将对每个任务分组X / N分割。然后,Tez根据每个任务的拆分数量估算每个任务的数据大小。
如果此值介于tez.grouping.max-size和tez.grouping.min-size之间,则接受N作为任务数。如果不是,则调整N以使每个任务的数据与最大/最小值一致,这取决于越过哪个阈值。
出于实验目的,可以在配置中设置tez.grouping.split-count以指定所需的组数。如果指定了此配置,则忽略上述逻辑,并且Tez尝试将拆分分组到指定数量的组中。这是最好的努力。
在此之后,执行分组算法。它按节点位置分组,然后按机架局部性分组,同时遵守组大小限制。

========================================================
并发设置
— 同一个sql中可以并行执行的job会并发的执行
SET hive.exec.parallel=true;

— 控制同一个sql中可以并行的job数量,默认是8
SET hive.exec.parallel.thread.number=128;

========================================================
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
这个参数表示执行前进行小文件合并

========================================================
set mapred.max.split.size=2048000000;
set mapred.min.split.size=2048000000;
( 根据不同集群的 【dfs.block.size 】这个配置设置,建议设成这个的值的整数倍 )
原文链接:https://blog.csdn.net/zhong_han_jun/article/details/50814246

控制map的数量:
重要 【 大小顺序如下 mapred.max.split.size <= mapred.min.split.size.per.node <= mapred.min.split.size.per.rack 】
set mapred.max.split.size=256000000; — 决定每个map处理的最大的文件大小,单位为B,256M这是默认值(1B=256M)
set mapred.min.split.size.per.node=1; — 节点中可以处理的最小的文件大小
set mapred.min.split.size.per.rack=1; — 机架中可以处理的最小的文件大小

对三个参数的理解
可以简单的理解为集群对一个表分区下面的文件进行分发到各个节点,之后根据mapred.max.split.size确认要启动多少个map数,逻辑如下
  a.假设有两个文件大小分别为(256M,280M)被分配到节点A,那么会启动两个map,剩余的文件大小为10MB和35MB因为每个大小都不足241MB会先做保留
  b.根据参数set mapred.min.split.size.per.node看剩余的大小情况并进行合并,如果值为1,表示a中每个剩余文件都会自己起一个map,这里会起两个,如果设置为大于45*1024*1024则会合并成一个块,并产生一个map
  如果mapred.min.split.size.per.node为10*1024*1024,那么在这个节点上一共会有4个map,处理的大小为(245MB,245MB,10MB,10MB,10MB,10MB),余下9MB
  如果mapred.min.split.size.per.node为45*1024*1024,那么会有三个map,处理的大小为(245MB,245MB,45MB)
  实际中mapred.min.split.size.per.node无法准确地设置成45*1024*1024,会有剩余并保留带下一步进行判断处理
  c. 对b中余出来的文件与其它节点余出来的文件根据mapred.min.split.size.per.rack大小进行判断是否合并,对再次余出来的文件独自产生一个map处理

控制reduce的数量:
方法1
set mapred.reduce.tasks=10; — 设置reduce的数量
方法2
set hive.exec.reducers.bytes.per.reducer = 1073741824 — 每个reduce处理的数据量,默认1GB

补充说明:一个集群可以有多个机架,一个机架有1至多个节点,这里的集群是mapreduce不是yarn,yarn没有详细了解过,另外如果想要实现map中的数据合并需要设置下面的参数,集群默认就是这个格式 set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

需要明确的问题
 a.我们该设置多少个map多少个reduce才合适?
  map数普遍是通过执行时长来确认的,至少应当保证每个map执行时长在1分钟以上,太短的话意味着大量重复的jvm启用和销毁。具体设置要根据具体任务来处理,有些任务占用cpu大,有些占用io大。我这边的话因为大任务经常搞了上千个map,作为小集群影响还是蛮大的,所以只对监控到的hql产生过大的map和reduce进行调整,经过一些简单测试,map数保证在三四百个其实不影响执行效率。

 b.设置了上面的参数会带来什么影响?
  设置map的话,影响不是很大,可能会带来更多的集群之间的io,毕竟要做节点之间的文件合并
  设置reduce的话,如果使用mapred.reduce.tasks,这个影响就大了,至少会造成同一个session每一个mr的job的reduce都是这么多个,而且reduce个数意味着最后的文件数量的输出,如果小文件太多的话可以打开reduce端的小文件合并参数,set hive.merge.mapredfiles=true

调节map、reduce数:
1、对MR任务:若mappers个数超过10w可适当调大参数

  • set mapred.max.split.size=2048000000 (这个值比较合适) 代表每个mapper处理的数据量,默认值为256M。可以根据历史值适当调大该参数值,减小mapper数量。

2、若reduces个数超过5k,可适当调小参数

  • set hive.exec.reducers.max=1009 代表实际运行的reduce个数,默认值为1009,可根据历史值适当调小该参数值,减小reduces数量。 或调大参数
  • set hive.exec.reducers.bytes.per.reducer=1073741824 代表每个reduce处理的数据量,默认值为1GB,可根据历史值适当调大该参数值,减小reduces数量。

3、对TEZ任务:若map task个数超过10w,除了上述参数外可以通过调大参数
set tez.grouping.min-size=1024000000;
set tez.grouping.max-size=2048000000;
代表Map每个task处理的数据量,可以根据历史值适当调大该参数值,减小map task数量。
若reduce task个数超过5k可以参考MR引擎配置进行修改。
调整该参数之后对split-size会出现影响,影响切分数据块的的大小,减小mr数量

========================================================
HQL任务生成文件数过多整改措施

Step 1
1、对会导致目标表生成过多小文件的任务加上参数
set hive.merge.filenum.limit=-1;
存在的风险:会导致fileMergeTask运行时间变长。同时,由于生成文件数量的减少,可以降低下流任务split oom的概率。

Step 2
若以上参数无法减少生成的文件数量,请使用以下方式:
1、设置参数
set hive.merge.size.per.task=256000000
代表每个task file merge后生成文件的大小,默认为256M,可根据历史值适当增加生成文件大小,减少文件数量。(单位:byte)

2、如果是由于生成任务的reduce task个数太多而导致,则适当减少reduce task个数。如何减少reduce个数请参考 计算引擎任务限制 中 “Parallelism” 条目的整改措施。
如果存在动态分区可以尝试增加参数
distribute by <parttiion_column>, <平均分布列> % <取模值>
“<partition_column>”即为动态分区字段,“<平均分布列>%<取模值>”是为了防止动态分区的分区之间数据分布不均匀而出现数据倾斜的情况,请根据实际情况酌情添加。

对TEZ任务:若map task个数超过10w,除了上述参数外可以通过调大参数

========================================================
set hive.insert.into.multilevel.dirs = false
这里唯一作用就是限制SQL中指定存放结果文件不存在的目录的深度不能大于1.


TEZ参数

任务进程重用是否开启,默认是true
tez.am.container.reuse.enabled


数据倾斜

1、set hive.optimize.skewjoin = true; — 默认是false

2、加一列hash值打散
CAST(11 * rand() AS INT) AS rn

CONCAT(goods_id,pmod(hash(CONCAT(cookie,stat_time)),10000))

3、空值处理
IF(goods_id = 0 OR goods_id IS NULL,-pmod(hash(concat(cookie,stat_time)),10000),goods_id) AS join_goods_id — 打散goods_id is null


mapjoin

set hive.auto.convert.join = false;

经典实用场景:维表join


设置默认文件格式

SET hive.default.fileformat=Orc


distribute by

1、动态分区的使用例子:将相同key分到同一个reduce任务
https://blog.csdn.net/mhtian2015/article/details/79898169?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-6

2、
如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。

  set mapred.reduce.tasks=10;

  create table a_1 as select * from a distribute by rand(123);

这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。


shuffle 阶段 OOM:
https://blog.csdn.net/houzhizhen/article/details/84773884?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase


MR源码 环形缓冲区:
http://bigdatadecode.club/MapReduce%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90–%E7%8E%AF%E5%BD%A2%E7%BC%93%E5%86%B2%E5%8C%BA.html

========================================================

高内存场景主要包括以下几种:

  • MAPJOIN:开启MAPJOIN之后,小表的数据会加载进MAP,在MAP端和大表进行JOIN,需要占用较大的MAP内存。
  • 使用UDF/UDAF/UDTF:比如collect_list(concat_ws(‘,’,name))实现字符串拼接函数,通过聚合拼接字符串,很容易造成单条数据较大,消耗较多内存。
  • 自定义UDF和jar包:自定义UDF或jar包实现方式导致在计算过程中需要较大内存。
  • 动态分区:内存使用随动态分区个数的增加而提高。
  • 单条数据过大:数据中大的map字段或者json字段,导致单条数据很大,处理过程需要较大内存。

以下包含MR和TEZ的参数,注意区分

常用参数汇总:
— 直接设置reduce的个数
set mapred.reduce.tasks = 15;
— 设置每个reduce的大小
set hive.exec.reducers.bytes.per.reducer=500000000;

— 设置调度优先级
set mapred.job.priority=VERY_HIGH;
(取值 mapreduce 分别设置优先级VERY_HIGH,HIGH,NORMAL 。设置这个值不会精确的控制MR的顺序,只是在计算任务向集群服务申请资源的时候会起作用。)
与之有关的设置调度队列:mapreduce.job.queuename

— 设置并发
set hive.exec.parallel=true;

— 设置名称
set mapred.job.name=my_job_{DATE};
(平台化没有很成熟的时候,这个很好用)

— 设置文件合并
set abaci.is.dag.job=false;
set hive.merge.mapredfiles=true;
set mapred.combine.input.format.local.only=false;
set hive.merge.smallfiles.avgsize=100000000;

— 动态分区
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;

— 设置内存大小
SET mapreduce.map.memory.mb=4096;
如果是mapper oom。需要查看下:
(1)适当增大mapper内存。(设置越大,资源消耗越大)

设置map内存

set mapreduce.map.memory.mb=3000;

设置为上面参数的0.8倍左右

set mapreduce.map.java.opts=-Xms2600m -Xmx2600m;
(2)减少每个mapper处理的数据量。
mapred.max.split.size,默认是256000000,可以适当缩小(设置越小,mapper越多,资源消耗越大)

如果是reduce oom,则需要查看下:
(1)reduce数量是否过少。通常可以增大到上百个。通过降低hive.exec.reducers.bytes.per.reducer来增加reduce数量。默认值为104857600,可以降低到50000000,来增加一倍的reduce数量。
(2)如果增加reduce数量无效,需要查看下数据是否有倾斜,通过select count(), JoinKey group by JoinKey order by count() desc limit 100,来查看JoinKey的前100个数据量,从而查看的JoinKey倾斜情况。
(3)最后尝试设置增加reducer内存。(以下两个参数必须同时设置)

设置reduce内存

set mapreduce.reduce.memory.mb=4096;

设置为上面参数的0.75倍

set mapreduce.reduce.java.opts=-Xms3276m -Xmx3276m;

— 最多同时运行map任务数 (hive streaming)
set mapred.job.map.capacity=2000;
set mapred.job.reduce.capacity=2000;

===============================================
增加内存
1、tez:

  • set hive.tez.container.size=6000;

    — map或reduce oom,同事设置了map、reduce的内存大小,建议分开设置,避免资源浪费
    可以将tez container分成2阶段分别设置 如下
    tez map 阶段:
    set hive.tez.map.memory.mb=4000;
    set mapreduce.map.java.opts=-Xms3200m -Xmx3200m;
    tez reducer 阶段:
    set hive.tez.reduce.memory.mb=6144;
    set mapreduce.reduce.java.opts=-Xms4608m -Xmx4608m;

  • set tez.am.resource.memory.mb=12000;

    — am oom

2、MR:
set mapreduce.map.memory.mb=6000;
set mapreduce.map.java.opts=-Xms4500m -Xmx4500m;

set mapreduce.reduce.memory.mb=6000;
set mapreduce.reduce.java.opts=-Xms4500m -Xmx4500m;

===============================================
SET hive.auto.convert.join.noconditionaltask.size=429496729;
hive.exec.orc.split.strategy参数的说明
http://cloudsqale.com/2018/11/12/tez-internals-2-number-of-map-tasks-for-large-orc-files-with-small-stripes-in-amazon-emr/

— set hive.exec.orc.split.strategy=BI
When hive.exec.orc.split.strategy=BI is set, Hive does not read ORC stripe information to define the number of input splits. Instead it uses the following algorithm:

Define the input size – 897,099,150 bytes
Define the block size – fs.s3n.block.size=67108864 (depending on S3 URI schema it can be fs.s3.block.size or fs.s3a.block.size)
Get the number of input splits as input size/fs.s3n.block.size = 897099150/67108864 = 13
Check the input split size 897099150/13 = 69007626 to make sure it is within tez.grouping.min-size=52428800 and tez.grouping.max-size=1073741824 by default. Since the split grouping is not required, and 13 Map tasks are created.

— set hive.exec.orc.split.strategy=ETL
When hive.exec.orc.split.strategy=ETL is set, the following algorithm is applied:

Read ORC stripe information – there are 402 stripes in my sample ORC file
Combine stripes to create inputs splits within mapreduce.input.fileinputformat.split.minsize=128000000 and mapreduce.input.fileinputformat.split.maxsize=256000000 (these values are set in my EMR cluster). This gives 897099150/128000000=7 input splits.

========================================================
— set hive.fetch.task.conversion=more;
hive.fetch.task.conversion
Expects one of [none, minimal, more].
Some select queries can be converted to single FETCH task minimizing latency.
Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins.

  1. none : disable hive.fetch.task.conversion
  2. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
  3. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)

问题:大表线性扫描查询速度过慢
对于大表没有聚合的明细查询,hive默认不走tez/MR job。
而大表线性扫描查询速度过于慢,原因:对于没有聚合的明细数据查询,hive默认采用无任务模式降低启动yarn作业的开销。
例如:SELECT * FROM employees;
设置set hive.fetch.task.conversion=more
在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。不需要起MapReduce job,直接通过Fetch task获取数据

— set hive.fetch.task.conversion=none

所有查询都会执行mapreduce程序

========================================================
set hive.exec.mode.local.auto=true; 开启本地MR

========================================================

MapJoin的优势在于没有shuffle
–是否自动转换为mapjoin
set hive.auto.convert.join = true;
–小表的最大文件大小,默认为25000000,即25M
set hive.mapjoin.smalltable.filesize = 25000000;
–是否将多个mapjoin合并为一个
set hive.auto.convert.join.noconditionaltask = true;
–多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。同时hive.auto.convert.join.noconditionaltask必须为true
set hive.auto.convert.join.noconditionaltask.size = 10000000;

mapjoin实现的方式:
1)在Map-Reduce的驱动程序中使用静态方法DistributedCache.addCacheFile()增加要拷贝的小表文件。 JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
2)在Map类的setup方法中使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

========================================================
设置执行引擎
set hive.execution.engine=mr;
set hive.execution.engine=tez;

========================================================
提高HiveQL聚合的执行性能。
SET hive.map.aggr = true;

可以将顶层的聚合操作放在Map阶段执行,
从而减轻清洗阶段数据传输和Reduce阶段的执行时间,提升总体性能。

执行原理: https://blog.csdn.net/chybin500/article/details/80988089

========================================================
Tez资源分配:https://cwiki.apache.org/confluence/display/TEZ/How+initial+task+parallelism+works?spm=a2c6h.13066369.0.0.47687f02dzrL4q
Apps using Tez have the ability to determine the number of tasks reading the initial external data for a job (the number of mappers in MapReduce parlance). Here is a short description of how that works.

There is an InputInitializer specified for the initial vertex reading the external input. During vertex initialization, the InputInitializer is invoked to determine the number of shards of the external data distributed across the cluster. In MapReduce parlance, these would be called input splits and would be determined by the InputFormat for that external input.
If Tez grouping is enabled for the splits, then a generic grouping logic is run on these splits to group them into larger splits. The idea is to strike a balance between how parallel the processing is and how much work is being done in each parallel process.
First, Tez tries to find out the resource availability in the cluster for these tasks. For that, YARN provides a headroom value (and in future other attributes may be used). Lets say this value is T.
Next, Tez divides T with the resource per task (say M) to find out how many tasks can run in parallel at one (ie in a single wave). W = T/M.
Next W is multiplied by a wave factor (from configuration – tez.grouping.split-waves) to determine the number of tasks to be used. Lets say this value is N.
If there are a total of X splits (input shards) and N tasks then this would group X/N splits per task. Tez then estimates the size of data per task based on the number of splits per task.
If this value is between tez.grouping.max-size & tez.grouping.min-size then N is accepted as the number of tasks. If not, then N is adjusted to bring the data per task in line with the max/min depending on which threshold was crossed.
For experimental purposes tez.grouping.split-count can be set in configuration to specify the desired number of groups. If this config is specified then the above logic is ignored and Tez tries to group splits into the specified number of groups. This is best effort.
After this the grouping algorithm is executed. It groups splits by node locality, then rack locality, while respecting the group size limits.
If the final number of splits is X then X tasks are created in the initial vertex and they are executed on the cluster to read the external data.

Here is the detailed explanation of grouping algorithm (TezSplitGrouper.getGroupedSplits).

Figure out desired number of grouped splits. This is affected by several factors. If TEZ_GROUPING_SPLIT_COUNT is set, the value will be used as initial count. Otherwise, the one passed in parameter will be used. Then the initial count will be corrected according to TEZ_GROUPING_SPLIT_MIN_SIZE and TEZ_GROUPING_SPLIT_MAX_SIZE: if the initial count causes a too small grouped split size, it will be override as total input size/TEZ_GROUPING_SPLIT_MIN_SIZE; if initial count causes too large grouped split size, it will be override as total input size/TEZ_GROUPING_SPLIT_MAX_SIZE.
Try grouping splits of same node to desired grouped split size. In this pass, we only allow splits in the same node to group together. Also, small grouped split is not allowed.
If we can no longer get a group on node level locality, there are two choices then. One is fallback to rack locality and continue to group, the other is to just allow small group on node level locality. The first one is by default, and the second one requires setting configuration TEZ_GROUPING_NODE_LOCAL_ONLY.
If there are still ungrouped splits, allow small groups anyway and continue to use locality level in last step.

如果为拆分启用了Tez分组,则会在这些拆分上运行通用分组逻辑,以将它们分组为更大的拆分。我们的想法是在处理的并行程度与每个并行流程中的工作量之间取得平衡。

首先,Tez尝试为这些任务找出集群中的资源可用性。为此,YARN提供净空值(并且将来可以使用其他属性)。让我们说这个值是T.
接下来,Tez将T与每个任务的资源(比如M)分开,以找出在一个任务中(即在单个波中)并行运行的任务数。W = T / M.
接下来W乘以波动因子(来自配置 – tez.grouping.split-waves)以确定要使用的任务数。可以说这个值是N.
如果总共有X个分裂(输入分片)和N个任务,那么这将对每个任务分组X / N分割。然后,Tez根据每个任务的拆分数量估算每个任务的数据大小。
如果此值介于tez.grouping.max-size和tez.grouping.min-size之间,则接受N作为任务数。如果不是,则调整N以使每个任务的数据与最大/最小值一致,这取决于越过哪个阈值。
出于实验目的,可以在配置中设置tez.grouping.split-count以指定所需的组数。如果指定了此配置,则忽略上述逻辑,并且Tez尝试将拆分分组到指定数量的组中。这是最好的努力。
在此之后,执行分组算法。它按节点位置分组,然后按机架局部性分组,同时遵守组大小限制。

========================================================
并发设置
— 同一个sql中可以并行执行的job会并发的执行
SET hive.exec.parallel=true;

— 控制同一个sql中可以并行的job数量,默认是8
SET hive.exec.parallel.thread.number=128;

========================================================
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
这个参数表示执行前进行小文件合并

========================================================
set mapred.max.split.size=2048000000;
set mapred.min.split.size=2048000000;
( 根据不同集群的 【dfs.block.size 】这个配置设置,建议设成这个的值的整数倍 )
原文链接:https://blog.csdn.net/zhong_han_jun/article/details/50814246

控制map的数量:
重要 【 大小顺序如下 mapred.max.split.size <= mapred.min.split.size.per.node <= mapred.min.split.size.per.rack 】
set mapred.max.split.size=256000000; — 决定每个map处理的最大的文件大小,单位为B,256M这是默认值(1B=256M)
set mapred.min.split.size.per.node=1; — 节点中可以处理的最小的文件大小
set mapred.min.split.size.per.rack=1; — 机架中可以处理的最小的文件大小

对三个参数的理解
可以简单的理解为集群对一个表分区下面的文件进行分发到各个节点,之后根据mapred.max.split.size确认要启动多少个map数,逻辑如下
  a.假设有两个文件大小分别为(256M,280M)被分配到节点A,那么会启动两个map,剩余的文件大小为10MB和35MB因为每个大小都不足241MB会先做保留
  b.根据参数set mapred.min.split.size.per.node看剩余的大小情况并进行合并,如果值为1,表示a中每个剩余文件都会自己起一个map,这里会起两个,如果设置为大于45*1024*1024则会合并成一个块,并产生一个map
  如果mapred.min.split.size.per.node为10*1024*1024,那么在这个节点上一共会有4个map,处理的大小为(245MB,245MB,10MB,10MB,10MB,10MB),余下9MB
  如果mapred.min.split.size.per.node为45*1024*1024,那么会有三个map,处理的大小为(245MB,245MB,45MB)
  实际中mapred.min.split.size.per.node无法准确地设置成45*1024*1024,会有剩余并保留带下一步进行判断处理
  c. 对b中余出来的文件与其它节点余出来的文件根据mapred.min.split.size.per.rack大小进行判断是否合并,对再次余出来的文件独自产生一个map处理

控制reduce的数量:
方法1
set mapred.reduce.tasks=10; — 设置reduce的数量
方法2
set hive.exec.reducers.bytes.per.reducer = 1073741824 — 每个reduce处理的数据量,默认1GB

补充说明:一个集群可以有多个机架,一个机架有1至多个节点,这里的集群是mapreduce不是yarn,yarn没有详细了解过,另外如果想要实现map中的数据合并需要设置下面的参数,集群默认就是这个格式 set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

需要明确的问题
 a.我们该设置多少个map多少个reduce才合适?
  map数普遍是通过执行时长来确认的,至少应当保证每个map执行时长在1分钟以上,太短的话意味着大量重复的jvm启用和销毁。具体设置要根据具体任务来处理,有些任务占用cpu大,有些占用io大。我这边的话因为大任务经常搞了上千个map,作为小集群影响还是蛮大的,所以只对监控到的hql产生过大的map和reduce进行调整,经过一些简单测试,map数保证在三四百个其实不影响执行效率。

 b.设置了上面的参数会带来什么影响?
  设置map的话,影响不是很大,可能会带来更多的集群之间的io,毕竟要做节点之间的文件合并
  设置reduce的话,如果使用mapred.reduce.tasks,这个影响就大了,至少会造成同一个session每一个mr的job的reduce都是这么多个,而且reduce个数意味着最后的文件数量的输出,如果小文件太多的话可以打开reduce端的小文件合并参数,set hive.merge.mapredfiles=true

调节map、reduce数:
1、对MR任务:若mappers个数超过10w可适当调大参数

  • set mapred.max.split.size=2048000000 (这个值比较合适) 代表每个mapper处理的数据量,默认值为256M。可以根据历史值适当调大该参数值,减小mapper数量。

2、若reduces个数超过5k,可适当调小参数

  • set hive.exec.reducers.max=1009 代表实际运行的reduce个数,默认值为1009,可根据历史值适当调小该参数值,减小reduces数量。 或调大参数
  • set hive.exec.reducers.bytes.per.reducer=1073741824 代表每个reduce处理的数据量,默认值为1GB,可根据历史值适当调大该参数值,减小reduces数量。

3、对TEZ任务:若map task个数超过10w,除了上述参数外可以通过调大参数
set tez.grouping.min-size=1024000000;
set tez.grouping.max-size=2048000000;
代表Map每个task处理的数据量,可以根据历史值适当调大该参数值,减小map task数量。
若reduce task个数超过5k可以参考MR引擎配置进行修改。
调整该参数之后对split-size会出现影响,影响切分数据块的的大小,减小mr数量

========================================================
HQL任务生成文件数过多整改措施

Step 1
1、对会导致目标表生成过多小文件的任务加上参数
set hive.merge.filenum.limit=-1;
存在的风险:会导致fileMergeTask运行时间变长。同时,由于生成文件数量的减少,可以降低下流任务split oom的概率。

Step 2
若以上参数无法减少生成的文件数量,请使用以下方式:
1、设置参数
set hive.merge.size.per.task=256000000
代表每个task file merge后生成文件的大小,默认为256M,可根据历史值适当增加生成文件大小,减少文件数量。(单位:byte)

2、如果是由于生成任务的reduce task个数太多而导致,则适当减少reduce task个数。如何减少reduce个数请参考 计算引擎任务限制 中 “Parallelism” 条目的整改措施。
如果存在动态分区可以尝试增加参数
distribute by <parttiion_column>, <平均分布列> % <取模值>
“<partition_column>”即为动态分区字段,“<平均分布列>%<取模值>”是为了防止动态分区的分区之间数据分布不均匀而出现数据倾斜的情况,请根据实际情况酌情添加。

对TEZ任务:若map task个数超过10w,除了上述参数外可以通过调大参数

========================================================
set hive.insert.into.multilevel.dirs = false
这里唯一作用就是限制SQL中指定存放结果文件不存在的目录的深度不能大于1.


TEZ参数

任务进程重用是否开启,默认是true
tez.am.container.reuse.enabled


数据倾斜

1、set hive.optimize.skewjoin = true; — 默认是false

2、加一列hash值打散
CAST(11 * rand() AS INT) AS rn

CONCAT(goods_id,pmod(hash(CONCAT(cookie,stat_time)),10000))

3、空值处理
IF(goods_id = 0 OR goods_id IS NULL,-pmod(hash(concat(cookie,stat_time)),10000),goods_id) AS join_goods_id — 打散goods_id is null


mapjoin

set hive.auto.convert.join = false;

经典实用场景:维表join


设置默认文件格式

SET hive.default.fileformat=Orc


distribute by

1、动态分区的使用例子:将相同key分到同一个reduce任务
https://blog.csdn.net/mhtian2015/article/details/79898169?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-6

2、
如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。

  set mapred.reduce.tasks=10;

  create table a_1 as select * from a distribute by rand(123);

这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。


shuffle 阶段 OOM:
https://blog.csdn.net/houzhizhen/article/details/84773884?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase


MR源码 环形缓冲区:
http://bigdatadecode.club/MapReduce%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90–%E7%8E%AF%E5%BD%A2%E7%BC%93%E5%86%B2%E5%8C%BA.html

========================================================

高内存场景主要包括以下几种:

  • MAPJOIN:开启MAPJOIN之后,小表的数据会加载进MAP,在MAP端和大表进行JOIN,需要占用较大的MAP内存。
  • 使用UDF/UDAF/UDTF:比如collect_list(concat_ws(‘,’,name))实现字符串拼接函数,通过聚合拼接字符串,很容易造成单条数据较大,消耗较多内存。
  • 自定义UDF和jar包:自定义UDF或jar包实现方式导致在计算过程中需要较大内存。
  • 动态分区:内存使用随动态分区个数的增加而提高。
  • 单条数据过大:数据中大的map字段或者json字段,导致单条数据很大,处理过程需要较大内存。

以下包含MR和TEZ的参数,注意区分

常用参数汇总:
— 直接设置reduce的个数
set mapred.reduce.tasks = 15;
— 设置每个reduce的大小
set hive.exec.reducers.bytes.per.reducer=500000000;

— 设置调度优先级
set mapred.job.priority=VERY_HIGH;
(取值 mapreduce 分别设置优先级VERY_HIGH,HIGH,NORMAL 。设置这个值不会精确的控制MR的顺序,只是在计算任务向集群服务申请资源的时候会起作用。)
与之有关的设置调度队列:mapreduce.job.queuename

— 设置并发
set hive.exec.parallel=true;

— 设置名称
set mapred.job.name=my_job_{DATE};
(平台化没有很成熟的时候,这个很好用)

— 设置文件合并
set abaci.is.dag.job=false;
set hive.merge.mapredfiles=true;
set mapred.combine.input.format.local.only=false;
set hive.merge.smallfiles.avgsize=100000000;

— 动态分区
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.dynamic.partition=true;

— 设置内存大小
SET mapreduce.map.memory.mb=4096;
如果是mapper oom。需要查看下:
(1)适当增大mapper内存。(设置越大,资源消耗越大)

设置map内存

set mapreduce.map.memory.mb=3000;

设置为上面参数的0.8倍左右

set mapreduce.map.java.opts=-Xms2600m -Xmx2600m;
(2)减少每个mapper处理的数据量。
mapred.max.split.size,默认是256000000,可以适当缩小(设置越小,mapper越多,资源消耗越大)

如果是reduce oom,则需要查看下:
(1)reduce数量是否过少。通常可以增大到上百个。通过降低hive.exec.reducers.bytes.per.reducer来增加reduce数量。默认值为104857600,可以降低到50000000,来增加一倍的reduce数量。
(2)如果增加reduce数量无效,需要查看下数据是否有倾斜,通过select count(), JoinKey group by JoinKey order by count() desc limit 100,来查看JoinKey的前100个数据量,从而查看的JoinKey倾斜情况。
(3)最后尝试设置增加reducer内存。(以下两个参数必须同时设置)

设置reduce内存

set mapreduce.reduce.memory.mb=4096;

设置为上面参数的0.75倍

set mapreduce.reduce.java.opts=-Xms3276m -Xmx3276m;

— 最多同时运行map任务数 (hive streaming)
set mapred.job.map.capacity=2000;
set mapred.job.reduce.capacity=2000;

===============================================
增加内存
1、tez:

  • set hive.tez.container.size=6000;

    — map或reduce oom,同事设置了map、reduce的内存大小,建议分开设置,避免资源浪费
    可以将tez container分成2阶段分别设置 如下
    tez map 阶段:
    set hive.tez.map.memory.mb=4000;
    set mapreduce.map.java.opts=-Xms3200m -Xmx3200m;
    tez reducer 阶段:
    set hive.tez.reduce.memory.mb=6144;
    set mapreduce.reduce.java.opts=-Xms4608m -Xmx4608m;

  • set tez.am.resource.memory.mb=12000;

    — am oom

2、MR:
set mapreduce.map.memory.mb=6000;
set mapreduce.map.java.opts=-Xms4500m -Xmx4500m;

set mapreduce.reduce.memory.mb=6000;
set mapreduce.reduce.java.opts=-Xms4500m -Xmx4500m;

===============================================
SET hive.auto.convert.join.noconditionaltask.size=429496729;
hive.exec.orc.split.strategy参数的说明
http://cloudsqale.com/2018/11/12/tez-internals-2-number-of-map-tasks-for-large-orc-files-with-small-stripes-in-amazon-emr/

— set hive.exec.orc.split.strategy=BI
When hive.exec.orc.split.strategy=BI is set, Hive does not read ORC stripe information to define the number of input splits. Instead it uses the following algorithm:

Define the input size – 897,099,150 bytes
Define the block size – fs.s3n.block.size=67108864 (depending on S3 URI schema it can be fs.s3.block.size or fs.s3a.block.size)
Get the number of input splits as input size/fs.s3n.block.size = 897099150/67108864 = 13
Check the input split size 897099150/13 = 69007626 to make sure it is within tez.grouping.min-size=52428800 and tez.grouping.max-size=1073741824 by default. Since the split grouping is not required, and 13 Map tasks are created.

— set hive.exec.orc.split.strategy=ETL
When hive.exec.orc.split.strategy=ETL is set, the following algorithm is applied:

Read ORC stripe information – there are 402 stripes in my sample ORC file
Combine stripes to create inputs splits within mapreduce.input.fileinputformat.split.minsize=128000000 and mapreduce.input.fileinputformat.split.maxsize=256000000 (these values are set in my EMR cluster). This gives 897099150/128000000=7 input splits.

========================================================
— set hive.fetch.task.conversion=more;
hive.fetch.task.conversion
Expects one of [none, minimal, more].
Some select queries can be converted to single FETCH task minimizing latency.
Currently the query should be single sourced not having any subquery and should not have any aggregations or distincts (which incurs RS), lateral views and joins.

  1. none : disable hive.fetch.task.conversion
  2. minimal : SELECT STAR, FILTER on partition columns, LIMIT only
  3. more : SELECT, FILTER, LIMIT only (support TABLESAMPLE and virtual columns)

问题:大表线性扫描查询速度过慢
对于大表没有聚合的明细查询,hive默认不走tez/MR job。
而大表线性扫描查询速度过于慢,原因:对于没有聚合的明细数据查询,hive默认采用无任务模式降低启动yarn作业的开销。
例如:SELECT * FROM employees;
设置set hive.fetch.task.conversion=more
在这种情况下,Hive可以简单地读取employee对应的存储目录下的文件,然后输出查询结果到控制台。不需要起MapReduce job,直接通过Fetch task获取数据

— set hive.fetch.task.conversion=none

所有查询都会执行mapreduce程序

========================================================
set hive.exec.mode.local.auto=true; 开启本地MR

========================================================

MapJoin的优势在于没有shuffle
–是否自动转换为mapjoin
set hive.auto.convert.join = true;
–小表的最大文件大小,默认为25000000,即25M
set hive.mapjoin.smalltable.filesize = 25000000;
–是否将多个mapjoin合并为一个
set hive.auto.convert.join.noconditionaltask = true;
–多个mapjoin转换为1个时,所有小表的文件大小总和的最大值。同时hive.auto.convert.join.noconditionaltask必须为true
set hive.auto.convert.join.noconditionaltask.size = 10000000;

mapjoin实现的方式:
1)在Map-Reduce的驱动程序中使用静态方法DistributedCache.addCacheFile()增加要拷贝的小表文件。 JobTracker在作业启动之前会获取这个URI列表,并将相应的文件拷贝到各个TaskTracker的本地磁盘上。
2)在Map类的setup方法中使用DistributedCache.getLocalCacheFiles()方法获取文件目录,并使用标准的文件读写API读取相应的文件。

========================================================
设置执行引擎
set hive.execution.engine=mr;
set hive.execution.engine=tez;

========================================================
提高HiveQL聚合的执行性能。
SET hive.map.aggr = true;

可以将顶层的聚合操作放在Map阶段执行,
从而减轻清洗阶段数据传输和Reduce阶段的执行时间,提升总体性能。

执行原理: https://blog.csdn.net/chybin500/article/details/80988089

========================================================
Tez资源分配:https://cwiki.apache.org/confluence/display/TEZ/How+initial+task+parallelism+works?spm=a2c6h.13066369.0.0.47687f02dzrL4q
Apps using Tez have the ability to determine the number of tasks reading the initial external data for a job (the number of mappers in MapReduce parlance). Here is a short description of how that works.

There is an InputInitializer specified for the initial vertex reading the external input. During vertex initialization, the InputInitializer is invoked to determine the number of shards of the external data distributed across the cluster. In MapReduce parlance, these would be called input splits and would be determined by the InputFormat for that external input.
If Tez grouping is enabled for the splits, then a generic grouping logic is run on these splits to group them into larger splits. The idea is to strike a balance between how parallel the processing is and how much work is being done in each parallel process.
First, Tez tries to find out the resource availability in the cluster for these tasks. For that, YARN provides a headroom value (and in future other attributes may be used). Lets say this value is T.
Next, Tez divides T with the resource per task (say M) to find out how many tasks can run in parallel at one (ie in a single wave). W = T/M.
Next W is multiplied by a wave factor (from configuration – tez.grouping.split-waves) to determine the number of tasks to be used. Lets say this value is N.
If there are a total of X splits (input shards) and N tasks then this would group X/N splits per task. Tez then estimates the size of data per task based on the number of splits per task.
If this value is between tez.grouping.max-size & tez.grouping.min-size then N is accepted as the number of tasks. If not, then N is adjusted to bring the data per task in line with the max/min depending on which threshold was crossed.
For experimental purposes tez.grouping.split-count can be set in configuration to specify the desired number of groups. If this config is specified then the above logic is ignored and Tez tries to group splits into the specified number of groups. This is best effort.
After this the grouping algorithm is executed. It groups splits by node locality, then rack locality, while respecting the group size limits.
If the final number of splits is X then X tasks are created in the initial vertex and they are executed on the cluster to read the external data.

Here is the detailed explanation of grouping algorithm (TezSplitGrouper.getGroupedSplits).

Figure out desired number of grouped splits. This is affected by several factors. If TEZ_GROUPING_SPLIT_COUNT is set, the value will be used as initial count. Otherwise, the one passed in parameter will be used. Then the initial count will be corrected according to TEZ_GROUPING_SPLIT_MIN_SIZE and TEZ_GROUPING_SPLIT_MAX_SIZE: if the initial count causes a too small grouped split size, it will be override as total input size/TEZ_GROUPING_SPLIT_MIN_SIZE; if initial count causes too large grouped split size, it will be override as total input size/TEZ_GROUPING_SPLIT_MAX_SIZE.
Try grouping splits of same node to desired grouped split size. In this pass, we only allow splits in the same node to group together. Also, small grouped split is not allowed.
If we can no longer get a group on node level locality, there are two choices then. One is fallback to rack locality and continue to group, the other is to just allow small group on node level locality. The first one is by default, and the second one requires setting configuration TEZ_GROUPING_NODE_LOCAL_ONLY.
If there are still ungrouped splits, allow small groups anyway and continue to use locality level in last step.

如果为拆分启用了Tez分组,则会在这些拆分上运行通用分组逻辑,以将它们分组为更大的拆分。我们的想法是在处理的并行程度与每个并行流程中的工作量之间取得平衡。

首先,Tez尝试为这些任务找出集群中的资源可用性。为此,YARN提供净空值(并且将来可以使用其他属性)。让我们说这个值是T.
接下来,Tez将T与每个任务的资源(比如M)分开,以找出在一个任务中(即在单个波中)并行运行的任务数。W = T / M.
接下来W乘以波动因子(来自配置 – tez.grouping.split-waves)以确定要使用的任务数。可以说这个值是N.
如果总共有X个分裂(输入分片)和N个任务,那么这将对每个任务分组X / N分割。然后,Tez根据每个任务的拆分数量估算每个任务的数据大小。
如果此值介于tez.grouping.max-size和tez.grouping.min-size之间,则接受N作为任务数。如果不是,则调整N以使每个任务的数据与最大/最小值一致,这取决于越过哪个阈值。
出于实验目的,可以在配置中设置tez.grouping.split-count以指定所需的组数。如果指定了此配置,则忽略上述逻辑,并且Tez尝试将拆分分组到指定数量的组中。这是最好的努力。
在此之后,执行分组算法。它按节点位置分组,然后按机架局部性分组,同时遵守组大小限制。

========================================================
并发设置
— 同一个sql中可以并行执行的job会并发的执行
SET hive.exec.parallel=true;

— 控制同一个sql中可以并行的job数量,默认是8
SET hive.exec.parallel.thread.number=128;

========================================================
set hive.input.format=org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
这个参数表示执行前进行小文件合并

========================================================
set mapred.max.split.size=2048000000;
set mapred.min.split.size=2048000000;
( 根据不同集群的 【dfs.block.size 】这个配置设置,建议设成这个的值的整数倍 )
原文链接:https://blog.csdn.net/zhong_han_jun/article/details/50814246

控制map的数量:
重要 【 大小顺序如下 mapred.max.split.size <= mapred.min.split.size.per.node <= mapred.min.split.size.per.rack 】
set mapred.max.split.size=256000000; — 决定每个map处理的最大的文件大小,单位为B,256M这是默认值(1B=256M)
set mapred.min.split.size.per.node=1; — 节点中可以处理的最小的文件大小
set mapred.min.split.size.per.rack=1; — 机架中可以处理的最小的文件大小

对三个参数的理解
可以简单的理解为集群对一个表分区下面的文件进行分发到各个节点,之后根据mapred.max.split.size确认要启动多少个map数,逻辑如下
  a.假设有两个文件大小分别为(256M,280M)被分配到节点A,那么会启动两个map,剩余的文件大小为10MB和35MB因为每个大小都不足241MB会先做保留
  b.根据参数set mapred.min.split.size.per.node看剩余的大小情况并进行合并,如果值为1,表示a中每个剩余文件都会自己起一个map,这里会起两个,如果设置为大于45*1024*1024则会合并成一个块,并产生一个map
  如果mapred.min.split.size.per.node为10*1024*1024,那么在这个节点上一共会有4个map,处理的大小为(245MB,245MB,10MB,10MB,10MB,10MB),余下9MB
  如果mapred.min.split.size.per.node为45*1024*1024,那么会有三个map,处理的大小为(245MB,245MB,45MB)
  实际中mapred.min.split.size.per.node无法准确地设置成45*1024*1024,会有剩余并保留带下一步进行判断处理
  c. 对b中余出来的文件与其它节点余出来的文件根据mapred.min.split.size.per.rack大小进行判断是否合并,对再次余出来的文件独自产生一个map处理

控制reduce的数量:
方法1
set mapred.reduce.tasks=10; — 设置reduce的数量
方法2
set hive.exec.reducers.bytes.per.reducer = 1073741824 — 每个reduce处理的数据量,默认1GB

补充说明:一个集群可以有多个机架,一个机架有1至多个节点,这里的集群是mapreduce不是yarn,yarn没有详细了解过,另外如果想要实现map中的数据合并需要设置下面的参数,集群默认就是这个格式 set hive.input.format = org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;

需要明确的问题
 a.我们该设置多少个map多少个reduce才合适?
  map数普遍是通过执行时长来确认的,至少应当保证每个map执行时长在1分钟以上,太短的话意味着大量重复的jvm启用和销毁。具体设置要根据具体任务来处理,有些任务占用cpu大,有些占用io大。我这边的话因为大任务经常搞了上千个map,作为小集群影响还是蛮大的,所以只对监控到的hql产生过大的map和reduce进行调整,经过一些简单测试,map数保证在三四百个其实不影响执行效率。

 b.设置了上面的参数会带来什么影响?
  设置map的话,影响不是很大,可能会带来更多的集群之间的io,毕竟要做节点之间的文件合并
  设置reduce的话,如果使用mapred.reduce.tasks,这个影响就大了,至少会造成同一个session每一个mr的job的reduce都是这么多个,而且reduce个数意味着最后的文件数量的输出,如果小文件太多的话可以打开reduce端的小文件合并参数,set hive.merge.mapredfiles=true

调节map、reduce数:
1、对MR任务:若mappers个数超过10w可适当调大参数

  • set mapred.max.split.size=2048000000 (这个值比较合适) 代表每个mapper处理的数据量,默认值为256M。可以根据历史值适当调大该参数值,减小mapper数量。

2、若reduces个数超过5k,可适当调小参数

  • set hive.exec.reducers.max=1009 代表实际运行的reduce个数,默认值为1009,可根据历史值适当调小该参数值,减小reduces数量。 或调大参数
  • set hive.exec.reducers.bytes.per.reducer=1073741824 代表每个reduce处理的数据量,默认值为1GB,可根据历史值适当调大该参数值,减小reduces数量。

3、对TEZ任务:若map task个数超过10w,除了上述参数外可以通过调大参数
set tez.grouping.min-size=1024000000;
set tez.grouping.max-size=2048000000;
代表Map每个task处理的数据量,可以根据历史值适当调大该参数值,减小map task数量。
若reduce task个数超过5k可以参考MR引擎配置进行修改。
调整该参数之后对split-size会出现影响,影响切分数据块的的大小,减小mr数量

========================================================
HQL任务生成文件数过多整改措施

Step 1
1、对会导致目标表生成过多小文件的任务加上参数
set hive.merge.filenum.limit=-1;
存在的风险:会导致fileMergeTask运行时间变长。同时,由于生成文件数量的减少,可以降低下流任务split oom的概率。

Step 2
若以上参数无法减少生成的文件数量,请使用以下方式:
1、设置参数
set hive.merge.size.per.task=256000000
代表每个task file merge后生成文件的大小,默认为256M,可根据历史值适当增加生成文件大小,减少文件数量。(单位:byte)

2、如果是由于生成任务的reduce task个数太多而导致,则适当减少reduce task个数。如何减少reduce个数请参考 计算引擎任务限制 中 “Parallelism” 条目的整改措施。
如果存在动态分区可以尝试增加参数
distribute by <parttiion_column>, <平均分布列> % <取模值>
“<partition_column>”即为动态分区字段,“<平均分布列>%<取模值>”是为了防止动态分区的分区之间数据分布不均匀而出现数据倾斜的情况,请根据实际情况酌情添加。

对TEZ任务:若map task个数超过10w,除了上述参数外可以通过调大参数

========================================================
set hive.insert.into.multilevel.dirs = false
这里唯一作用就是限制SQL中指定存放结果文件不存在的目录的深度不能大于1.


TEZ参数

任务进程重用是否开启,默认是true
tez.am.container.reuse.enabled


数据倾斜

1、set hive.optimize.skewjoin = true; — 默认是false

2、加一列hash值打散
CAST(11 * rand() AS INT) AS rn

CONCAT(goods_id,pmod(hash(CONCAT(cookie,stat_time)),10000))

3、空值处理
IF(goods_id = 0 OR goods_id IS NULL,-pmod(hash(concat(cookie,stat_time)),10000),goods_id) AS join_goods_id — 打散goods_id is null


mapjoin

set hive.auto.convert.join = false;

经典实用场景:维表join


设置默认文件格式

SET hive.default.fileformat=Orc


distribute by

1、动态分区的使用例子:将相同key分到同一个reduce任务
https://blog.csdn.net/mhtian2015/article/details/79898169?utm_medium=distribute.pc_relevant.none-task-blog-baidujs-6

2、
如果表a只有一个文件,大小为120M,但包含几千万的记录,如果用1个map去完成这个任务,肯定是比较耗时的,这种情况下,我们要考虑将这一个文件合理的拆分成多个,这样就可以用多个map任务去完成。

  set mapred.reduce.tasks=10;

  create table a_1 as select * from a distribute by rand(123);

这样会将a表的记录,随机的分散到包含10个文件的a_1表中,再用a_1代替上面sql中的a表,则会用10个map任务去完成。每个map任务处理大于12M(几百万记录)的数据,效率肯定会好很多。


shuffle 阶段 OOM:
https://blog.csdn.net/houzhizhen/article/details/84773884?utm_medium=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase&depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromMachineLearnPai2-1.nonecase


MR源码 环形缓冲区:
http://bigdatadecode.club/MapReduce%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90–%E7%8E%AF%E5%BD%A2%E7%BC%93%E5%86%B2%E5%8C%BA.html

========================================================

高内存场景主要包括以下几种:

  • MAPJOIN:开启MAPJOIN之后,小表的数据会加载进MAP,在MAP端和大表进行JOIN,需要占用较大的MAP内存。
  • 使用UDF/UDAF/UDTF:比如collect_list(concat_ws(‘,’,name))实现字符串拼接函数,通过聚合拼接字符串,很容易造成单条数据较大,消耗较多内存。
  • 自定义UDF和jar包:自定义UDF或jar包实现方式导致在计算过程中需要较大内存。
  • 动态分区:内存使用随动态分区个数的增加而提高。
  • 单条数据过大:数据中大的map字段或者json字段,导致单条数据很大,处理过程需要较大内存。

部分转自互联网,侵权删除联系

赞(0) 打赏
部分文章转自网络,侵权联系删除b2bchain区块链学习技术社区 » HIVE 参数调优求职学习资料
分享到: 更多 (0)

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

b2b链

联系我们联系我们