TypechoJoeTheme

从百草味到三只松鼠

统计
登录
用户名
密码

Spark -submit 参数及优化

本文最后更新于2021年12月27日,已超过998天没有更新。如果文章内容或图片资源失效,请留言反馈,我会及时处理,谢谢!

spark submit 基本格式举例:

spark-submit \ --class com.lance.MyMain \ --master yarn-cluster \ --executor-memory 1G \ --num-executors 8 \ --executor-cores 2 \ --queue lance_queue hdfs:////user/lance-1.0.jar arg1 arg2

前言

不要期待修改一个参数能够像魔法一样立马得到神奇的好效果!(某些时候效果确实很棒^_^)你应当把参数看作一道菜中的调味品,能够丰富味道,但主要还是得靠原材料的质量与炒菜的技艺。

开发Spark应用时,应当先优化好你的应用代码,再来思考调参优化(必要的参数的除外)。

调参是一个比较复杂的主题,不同的环境、不同的代码都会导致同样的参数产生不同的效果。建议尽量在确定您的生产环境情况后、在优化好存在明显问题的代码后,再做调参测试。

下面会列出开发中常用的部分参数,并加以解释,以作参考

个人瞎说

其实说了这么多,我认为最快捷的调优还是看 web UI 页面查看执行的某行代码所需时间。
大部分都是数据倾斜导致,服务器的性能一般情况下都是够的。
此时我们就该考虑,是否在处理数据时没有过滤掉某些特定的脏数据,可简单取样;
过滤完记得重分区,去调节并行度~~ 有的exe里可能没的数据了, 尽量让资源充分跑
设置 coalesce()方法的参数shuffle默认设置为false,repartition()方法就是coalesce()方法shuffle为true的情况。其实这是涉及到分区前后数目大小的关系,只需记住不经过shuffle,是无法将RDDde分区数变多 即可。

或者存在某些特定的大key 所致。再加盐打散等 ...
以及若出现一些重复利用的RDD数据,可适当缓存,注意 使用checkpoint的时候可先 cache 一遍,因为截断点时会起2个任务,一个是执行当前的算子,另一个就是去截断血缘关系并销毁rdd,此时提前一步缓存可适当提升运行效率。
以及在操作DB时 ,合理使用算子等(把数据库整崩了可就真的从删库到跑路啦~~ 运维兄弟得去翻 binlog 慢慢恢复 ... )

简单示例

一个Spark任务提交示例

spark-submit \

--queue test_queue \

--master yarn \

--deploy-mode cluster \

--num-executors 10 \ 指定启动10个Executor进程

--executor-memory 8G \ 每个Executor分配8G内存

--executor-cores 4 \ 每个Executor分配4个core,一个core对应一个线程

--driver-memory 4G --driver-cores 2 \ 为Driver进程分配4G内存

--conf spark.network.timeout=300 \ 设置集群内网络通信延迟为300s

--conf spark.locality.wait=9s \ 设置数据本地化加载等待时间

--class com.skey.spark.app.MyApp /home/jerry/spark-demo.jar

参数解释:

–queue test_queue 将任务提交到YARN上面的test_queue队列

–master yarn 将任务运行在YARN上面

–deploy-mode cluster 指定cluster模式运行

–num-executors 10 指定启动10个Executor进程

–executor-memory 8G 每个Executor分配8G内存

–executor-cores 4 每个Executor分配4个core,一个core对应一个线程

–driver-memory 4G 为Driver进程分配4G内存

–driver-cores 2 为Driver分配2个core,一个core对应一个线程

–conf spark.network.timeout=300s 设置集群内网络通信延迟为300s

–conf spark.locality.wait=9s 设置数据本地化加载等待时间

–class com.skey.spark.app.MyApp 指定需要运行的spark应用类

/home/jerry/spark-demo.jar 指定需要运行的jar包

常用参数

下面列出的参数以Spark 2.4.3的configuration为标准

spark.memory.fraction

默认值: 0.6

解释: 新版内存管理(UnifiedMemoryManager)模式中,execution和storage的总可用内存比例(它们之间可以相互借用内存)。内存值 = (总内存值 - 300M) * spark.memory.fraction(0.6)。

建议: 如果采用默认的新版内存管理模式的话,适当调大该值,可以有效的减少磁盘spill和踢除block的概率

spark.memory.storageFraction

默认值: 0.5

解释:新版内存管理(UnifiedMemoryManager)模式中,storage的内存比例,剩余的则是execution的内存。

建议:execution更多,例如shuffle更多,那么调小该值,性能更好。如果缓存较多,可以调大该值。

spark.memory.offHeap.enabled与spark.memory.offHeap.size

默认值: false与0

解释:新版内存管理(UnifiedMemoryManager)模式中,是否开启堆外内存、堆外内存的大小

建议:开启堆外内存可以有效的避免GC,建议尝试开启,给一个适当的值,例如堆内存的10%。需要注意的是堆内、堆外内存是分开算的。

spark.memory.useLegacyMode

默认值: false

解释: 该参数用于控制是否使用遗留的内存管理模式(StaticMemoryManager,也就是钨丝计划之前的内存模型,即1.6版本以前)。遗留模式下,内存由spark.storage.memoryFraction(0.6) + spark.shuffle.memoryFraction(0.2) + 系统默认(0.2) 组成。

建议: 刚开始开发应用时,可以不开启,直接采用默认的内存管理器,进行动态分配即可。一旦开发完成,可以尝试调试shuffle、storage的比例,如果能够确定其值时,可以改为true,以减少动态分配、spill、剔除block的概率。另外动态分配模式下execution从storage借来的内存不管还需不需要用都不会还给storage,可能会导致部分问题。

spark.storage.memoryFraction

默认值: 0.6

解释: spark.memory.useLegacyMode为true时有效,用于控制缓存部分的内存

建议: 如果缓存数据比较大,可以调大该参数

spark.shuffle.memoryFraction

默认值: 0.2

解释: spark.memory.useLegacyMode为true时有效,用于控制shuffle部分

建议: 如果频繁发生spill(溢写),可以调大该参数。如果你的应用缓存用的很少,请使劲调大该参数^_^

spark.storage.unrollFraction

默认值: 0.2

解释: spark.memory.useLegacyMode为true时有效,用于缓存的数据的序列化/反序列化,占spark.storage.memoryFraction的20%

建议: 可以不调

spark.executor.memoryOverhead

默认值: executorMemory * 0.10,最低384M

解释: 设置每个Executor的堆外内存,主要用于JVM自身,字符串, NIO Buffer等开销。YARN模式下,等于 Container内存 - Executor内存。(没在官网找到该参数,源码中存在该参数)

建议: Spark使用的是基于nio的,nio使用直接内存区效率非常高,可以适当的分配部分内存到堆外区。

spark.locality.wait

默认值: 3s

解释: 用于指定数据本地化等待时长,包括3个子参数(spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack)用于更细致的调节。

建议: 通常统一调整spark.locality.wait即可。可以适当加长等待时间,特别是数据量比较大时,如果能够本地化处理,效果更佳。

spark.network.timeout

默认值: 120s

解释: spark内存通信的网络延时

建议: 如果spark应用处理比较耗时,那么可以适当调大该参数(例如300s),防止延时导致的报错

spark.default.parallelism

默认值: 无

解释: 用于指定RDD的shuffle操作的分区数,例如reduceByKey、join等。调用shuffle算子时,优先使用算子指定的分区数,否则使用spark.default.parallelism的值,如果还是没值,则使用父RDD的分区数的值。对sql中的shuffle无效。

建议: 如果你想精细化控制,愿意为每个shuffle算子添加parallelism值,那么不用设置^_^。建议可以先设置一个值(例如总core的2-3倍),然后代码中有需要调整的就向shuffle算子传参,覆盖本次的默认值。(不要误信网上瞎说的什么并行度)

spark.sql.shuffle.partitions

默认值: 200

解释: 用于指定SparkSQL执行shuffle时的分区数。(没在官网找到该参数)

建议: SQL在执行shuffle时,如果默认200小于你的总core数,就会浪费资源了。

建议设置为总core的2-3倍。

spark.jars

默认值: 无

解释: 指定jar包(用逗号分隔),会将其传到Driver、Executor端

建议: 除了spark lib下的包,其他额外引用的库建议都指定。(client模式下,只有Driver会用到的库,可以不传)

spark.jars.excludes

默认值: 无

解释: 排除不要的包,防止依赖冲突。格式 groupId:artifactId

建议: …

spark.executor.userClassPathFirst与spark.driver.userClassPathFirst

默认值: false

解释: 用于指定是否优先加载用户指定的jar

建议: 因为存在用户指定的jar与spark默认库下jar包可能冲突的问题,所以当冲突时,如果你能确定你的jar没问题,那么可以设置为true。(之前遇到一个华为的bug就是这样)

spark.driver.maxResultSize

默认值: 1g

解释: 数据传到Driver端的最大量,例如collect、take操作

建议: 有时候可能需要直接拉取大量的数据,可以根据数据量来调整该参数。

spark.reducer.maxSizeInFlight

默认值: 48m

解释: 每个reduce任务拉取数据的最大量

建议: 每个输出端都需要创建一个buffer,消耗比较大。如果内存比较大,可以提高该值,拉取速度会加快。

spark.shuffle.file.buffer

默认值: 32k

解释: shuffle时,数据输出到文件的buffer大小,写满buffer后,数据会溢写到磁盘。

建议: 调大该值,可以降低溢写到磁盘的次数(减少I/O次数),提高性能。

内存充足的话,可以调大,例如64k。

spark.shuffle.io.maxRetries

默认值: 3

解释: shuffle时,read端从write端拉取数据的重试次数

建议: 因为GC、网络延迟等问题可能会导致拉取失败,可以适当提高重试次数,防止意外。

spark.shuffle.io.retryWait

默认值: 5s

解释: spark.shuffle.io.maxRetries每次重试需要等多久

建议: 可以加大,提高稳定性

spark.shuffle.manager

默认值: sort

解释: 用于管理shuflle文件输出到磁盘的方式。建议百度看看详细流程。1.2版以前默认HashShuffleManager, 之后默认是SortShuffleManager。Spark 2后不再有该参数,直接是SortShuffleManager,默认会对数据进行排序。

建议: 可以通过spark.shuffle.sort.bypassMergeThreshold调整是否排序。

spark.shuffle.sort.bypassMergeThreshold

默认值: 200

解释: shffle时,如果read task数小于200(并且是非预聚合的Shuffle,例如reduceByKey是预聚合型),会启用bypass机制:不会进行排序操作,最后会合并task产生的文件,并创建索引

建议: 不需要排序时,可以提高该参数,以大于你的shuffle read task数。(会有不错的效率提升,我的一个应用降低了20%时间)

spark.kryo.registrationRequired

默认值: false

解释: 是否强制kryo序列化

建议: 如果为false,kyro需要为每个对象写未注册类的类名,会造成显著的性能开销。建议设置为true。(某些朋友对于JVM报出的有的类不知道怎么注册,在这里建议复制报错的类,使用Class.forName(“类”))

spark.rdd.compress

默认值: false

解释: 是否压缩序列化的RDD数据

建议: 开启可以减少内存,但是解压会增加CPU消耗时间

spark.scheduler.mode

默认值: FIFO

解释: 同一个SparkContext内的调度机制,包括FIFO、FAIR

建议: 一般使用较少。FIFO,先进先出,优先执行先提交的,有空闲的再给后面的job。FAIR,公平分配资源,为每个可以并行执行的job平均分配计算资源。

spark.streaming.backpressure.enabled

默认值: false

解释: 是否启用背压机制,控制SparkStream接收数据的速率。

建议: 流式处理中,处理速度如果较慢,会导致来的数据不断积压。启用后,Spark可以自己动态根据处理能力调整接收数据的量。如果存在积压情况,建议启用。另外还有几个参数,用于细节控制,不建议调整。

spark.streaming.blockInterval

默认值: 200ms

解释: 每批处理的task数 = 批处理间隔 / blockInterval

建议: blockInterval越大,task则越少,会导致部分core没有使用。可以根据你的core的量,适当降低该参数。(官方建议blockInterval最小值约为50ms)

spark.streaming.concurrentJobs

默认值: 1

解释: 一个SparkStream应用内可以同时运行多少个job。(没在官网找到该参数)

建议: 如果你分配的core比较多,每批的task数比较少(还可能处理时间比较长),空闲的core比较浪费,那么可以调高该参数,同时运行后面的job(如果2个job之间没有前后关联的话)

spark.driver.extraJavaOptions与spark.executor.extraJavaOptions

默认值: 无

解释: 用于指定JVM参数

建议: 看JVM调参部分

JVM调参

一般不要先调JVM,开发中的大多数问题都是代码质量不好导致的,先去看代码、业务逻辑是否有问题,优化、优化、再优化……^_^

然后,确定运行环境,再来调整JVM参数

注意,运行时默认值会随着JVM版本、系统环境而改变,请用 java -XX:+PrintFlagsFinal -version命令查看JVM最终参数

查看GC情况

查看每个节点的GC

添加-XX:PrintGCDetails,让每个节点打印GC日志,需要在每个节点分别查看。On YARN的话,可以点击WebUI界面查看每个节点日志。

想看Spark应用整体的吞吐量?

每个应用的Spark WebUI有展示,直接在这儿看就可以了。如果GC时间超过10%,那么说明你的应用需要优化了!

关于GC的选择(Java 8)

批处理,需要吞吐量较高?用 -XX:+UseParallelGC

流式处理,需要数据具有较高的一致性?用-XX:+UseConcMarkSweepGC 或 G1

琢磨不定?选 ParallelGC

批处理时想用G1?可以试试,但是吞吐量没有ParallelGC好(Java 8)

并行垃圾收集器参数(ParallelGC)

以吞吐量优先为准则

-XX:+UseParallelGC 设置年轻代使用ParallelGC,早期版本中老年代会默认使用SerialGC。

-XX:+UseParallelOldGC 设置老年代也使用ParallelOldGC,Java1.6后开始支持(我的系统中是默认开启的,你的不一定)

-XX:ParallelGCThreads=8 设置并行收集垃圾的线程数为8,一般同CPU核个数

-XX:MaxGCPauseMillis=100 设置ParallelGC在年轻代单次回收的最长耗时为100毫秒。如果耗时超过该值,JVM会自动调整年轻代内存大小,以适应该值。可以调大该值,例如500,以保证吞吐量。

-XX:GCTimeRatio=99 设置垃圾回收时间占总时间的百分比最高为1%,公式为1/(1+99),即吞吐量为99%。默认为99。

-XX:+UseAdaptiveSizePolicy 启用自适应策略。JVM会自动调整年轻代Eden区与Survivor区大小的比例,以适应GCTimeRatio的值。建议开启。

并发垃圾收集器参数(ConcMarkSweepGC)

以响应时间优先为准则

-XX:+UseConcMarkSweepGC 设置老年代使用ConcMarkSweepGC,年轻代默认使用ParNewGC。

-XX:+UseParNewGC 设置年轻代为ParNewGC。JDK5以上使用CMS时,默认年轻代会采用ParNewGC。

-XX:+UseCMSCompactAtFullCollection 开启压缩整理(默认),压缩整理用于消除内存碎片化问题(因为CMS采用的标记清除算法,不会进行压缩整理,所以要单独开启)

-XX:CMSFullGCsBeforeCompaction=2 设置进行2次FullGC后(默认0次),开始对内存进行压缩整理。

-XX:CMSInitiatingOccupancyFraction=70 设置老年代内存使用70%后开始进行并发垃圾收集

内存调整常用参数

常用比例

-Xmx4G 设置JVM最大堆内存为4G

-Xms4G 设置JVM初始堆内存为4G,一般设为与-Xmx同样大即可(避免重新分配内存)

-Xmn1G 设置年轻代内存大小为1G。堆内存 = 老年代 + 年轻代,因此年轻代与老年代之间要根据GC情况取得一个平衡。例如MinorGC较多,可以调大年轻代,MajorGC较多可以调大老年代(即调小年轻代)。

-XX:NewSize=512M 设置年轻代初始大小为512M,一般设为与MaxNewSize同样大即可。(不建议使用,直接用-Xmn)

-XX:MaxNewSize=1024M 设置年轻代最大大小为1024M。(不建议使用,直接用-Xmn)

-XX:MetaspaceSize=128M 设置元信息区(永久代)初始大小为256M,一般设为与MaxMetaspaceSize同样大即可。(Java8以前叫做-XX:PermSize)

-XX:MaxMetaspaceSize=256M 设置元信息区(永久代)最大大小为256M。(Java8以前叫做-XX:MaxPermSize)

-XX:NewRatio=2 设置年轻代与老年代的大小比例为 1 : 2 (Java8默认为2)

-XX:SurvivorRatio=8 设置年轻代中Eden区与Survivor的大小比例为 8 : 1 : 1 (enden : survivor0 : survivor1)

-Xss256K 设置每个线程私有栈的的大小为256K(Java8默认为1M)。线程越多内存占用越大,需要启用超多线程时,可以调低该参数,例如128k。

-XX:MaxDirectMemory=100M 设置最大堆外内存

其他常用参数

-XX:+PrintGCDetails 打印GC详细信息

-XX:+PrintGCTimeStamps 打印GC时的时间戳

-Xloggc:/home/jerry/logs/gc.log 设置GC日志输出目录

-XX:+HeapDumpOnOutOfMemoryError 当发生内存溢出异常时,dump出堆信息

-XX:HeapDumpPath=./my_java.hprof 指定dump堆信息的输出路径

-XX:MaxTenuringThreshold=8 设置对象在年轻代经历垃圾回收仍然存活8次后进入老年代。最大值15(因为JVM用4bit表示该值),默认值15(使用CMS时,默认为6)

-XX:+PrintFlagsFinal 打印JVM最终参数(例如 java -XX:+PrintFlagsFinal -version)

-XX:+PrintFlagsInitial 打印JVM初始参数(最终值会随环境改变)

未完待续 .

调优-官方文档

Tuning Spark:

https://spark.apache.org/docs/latest/tuning.html

SparkStream Performance Tuning:

https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning

赞(2)
评论 (0)
苏ICP备2021053031号-1