dongyl
Spark -submit 参数及优化
spark submit 基本格式举例:
spark-submit \ --class com.lance.MyMain \ --master yarn-cluster \ --executor-memory 1G \ --num-executors 8 \ --executor-cores 2 \ --queue lance_queue hdfs:////user/lance-1.0.jar arg1 arg2
前言
不要期待修改一个参数能够像魔法一样立马得到神奇的好效果!(某些时候效果确实很棒^_^)你应当把参数看作一道菜中的调味品,能够丰富味道,但主要还是得靠原材料的质量与炒菜的技艺。
开发Spark应用时,应当先优化好你的应用代码,再来思考调参优化(必要的参数的除外)。
调参是一个比较复杂的主题,不同的环境、不同的代码都会导致同样的参数产生不同的效果。建议尽量在确定您的生产环境情况后、在优化好存在明显问题的代码后,再做调参测试。
下面会列出开发中常用的部分参数,并加以解释,以作参考
个人瞎说
其实说了这么多,我认为最快捷的调优还是看 web UI 页面查看执行的某行代码所需时间。
大部分都是数据倾斜导致,服务器的性能一般情况下都是够的。
此时我们就该考虑,是否在处理数据时没有过滤掉某些特定的脏数据,可简单取样;
过滤完记得重分区,去调节并行度~~ 有的exe里可能没的数据了, 尽量让资源充分跑
设置 coalesce()方法的参数shuffle默认设置为false,repartition()方法就是coalesce()方法shuffle为true的情况。其实这是涉及到分区前后数目大小的关系,只需记住不经过shuffle,是无法将RDDde分区数变多 即可。
或者存在某些特定的大key 所致。再加盐打散等 ...
以及若出现一些重复利用的RDD数据,可适当缓存,注意 使用checkpoint的时候可先 cache 一遍,因为截断点时会起2个任务,一个是执行当前的算子,另一个就是去截断血缘关系并销毁rdd,此时提前一步缓存可适当提升运行效率。
以及在操作DB时 ,合理使用算子等(把数据库整崩了可就真的从删库到跑路啦~~ 运维兄弟得去翻 binlog 慢慢恢复 ... )
简单示例
一个Spark任务提交示例
spark-submit \
--queue test_queue \
--master yarn \
--deploy-mode cluster \
--num-executors 10 \ 指定启动10个Executor进程
--executor-memory 8G \ 每个Executor分配8G内存
--executor-cores 4 \ 每个Executor分配4个core,一个core对应一个线程
--driver-memory 4G --driver-cores 2 \ 为Driver进程分配4G内存
--conf spark.network.timeout=300 \ 设置集群内网络通信延迟为300s
--conf spark.locality.wait=9s \ 设置数据本地化加载等待时间
--class com.skey.spark.app.MyApp /home/jerry/spark-demo.jar
参数解释:
–queue test_queue 将任务提交到YARN上面的test_queue队列
–master yarn 将任务运行在YARN上面
–deploy-mode cluster 指定cluster模式运行
–num-executors 10 指定启动10个Executor进程
–executor-memory 8G 每个Executor分配8G内存
–executor-cores 4 每个Executor分配4个core,一个core对应一个线程
–driver-memory 4G 为Driver进程分配4G内存
–driver-cores 2 为Driver分配2个core,一个core对应一个线程
–conf spark.network.timeout=300s 设置集群内网络通信延迟为300s
–conf spark.locality.wait=9s 设置数据本地化加载等待时间
–class com.skey.spark.app.MyApp 指定需要运行的spark应用类
/home/jerry/spark-demo.jar 指定需要运行的jar包
常用参数
下面列出的参数以Spark 2.4.3的configuration为标准
spark.memory.fraction
默认值: 0.6
解释: 新版内存管理(UnifiedMemoryManager)模式中,execution和storage的总可用内存比例(它们之间可以相互借用内存)。内存值 = (总内存值 - 300M) * spark.memory.fraction(0.6)。
建议: 如果采用默认的新版内存管理模式的话,适当调大该值,可以有效的减少磁盘spill和踢除block的概率
spark.memory.storageFraction
默认值: 0.5
解释:新版内存管理(UnifiedMemoryManager)模式中,storage的内存比例,剩余的则是execution的内存。
建议:execution更多,例如shuffle更多,那么调小该值,性能更好。如果缓存较多,可以调大该值。
spark.memory.offHeap.enabled与spark.memory.offHeap.size
默认值: false与0
解释:新版内存管理(UnifiedMemoryManager)模式中,是否开启堆外内存、堆外内存的大小
建议:开启堆外内存可以有效的避免GC,建议尝试开启,给一个适当的值,例如堆内存的10%。需要注意的是堆内、堆外内存是分开算的。
spark.memory.useLegacyMode
默认值: false
解释: 该参数用于控制是否使用遗留的内存管理模式(StaticMemoryManager,也就是钨丝计划之前的内存模型,即1.6版本以前)。遗留模式下,内存由spark.storage.memoryFraction(0.6) + spark.shuffle.memoryFraction(0.2) + 系统默认(0.2) 组成。
建议: 刚开始开发应用时,可以不开启,直接采用默认的内存管理器,进行动态分配即可。一旦开发完成,可以尝试调试shuffle、storage的比例,如果能够确定其值时,可以改为true,以减少动态分配、spill、剔除block的概率。另外动态分配模式下execution从storage借来的内存不管还需不需要用都不会还给storage,可能会导致部分问题。
spark.storage.memoryFraction
默认值: 0.6
解释: spark.memory.useLegacyMode为true时有效,用于控制缓存部分的内存
建议: 如果缓存数据比较大,可以调大该参数
spark.shuffle.memoryFraction
默认值: 0.2
解释: spark.memory.useLegacyMode为true时有效,用于控制shuffle部分
建议: 如果频繁发生spill(溢写),可以调大该参数。如果你的应用缓存用的很少,请使劲调大该参数^_^
spark.storage.unrollFraction
默认值: 0.2
解释: spark.memory.useLegacyMode为true时有效,用于缓存的数据的序列化/反序列化,占spark.storage.memoryFraction的20%
建议: 可以不调
spark.executor.memoryOverhead
默认值: executorMemory * 0.10,最低384M
解释: 设置每个Executor的堆外内存,主要用于JVM自身,字符串, NIO Buffer等开销。YARN模式下,等于 Container内存 - Executor内存。(没在官网找到该参数,源码中存在该参数)
建议: Spark使用的是基于nio的,nio使用直接内存区效率非常高,可以适当的分配部分内存到堆外区。
spark.locality.wait
默认值: 3s
解释: 用于指定数据本地化等待时长,包括3个子参数(spark.locality.wait.node、spark.locality.wait.process、spark.locality.wait.rack)用于更细致的调节。
建议: 通常统一调整spark.locality.wait即可。可以适当加长等待时间,特别是数据量比较大时,如果能够本地化处理,效果更佳。
spark.network.timeout
默认值: 120s
解释: spark内存通信的网络延时
建议: 如果spark应用处理比较耗时,那么可以适当调大该参数(例如300s),防止延时导致的报错
spark.default.parallelism
默认值: 无
解释: 用于指定RDD的shuffle操作的分区数,例如reduceByKey、join等。调用shuffle算子时,优先使用算子指定的分区数,否则使用spark.default.parallelism的值,如果还是没值,则使用父RDD的分区数的值。对sql中的shuffle无效。
建议: 如果你想精细化控制,愿意为每个shuffle算子添加parallelism值,那么不用设置^_^。建议可以先设置一个值(例如总core的2-3倍),然后代码中有需要调整的就向shuffle算子传参,覆盖本次的默认值。(不要误信网上瞎说的什么并行度)
spark.sql.shuffle.partitions
默认值: 200
解释: 用于指定SparkSQL执行shuffle时的分区数。(没在官网找到该参数)
建议: SQL在执行shuffle时,如果默认200小于你的总core数,就会浪费资源了。
建议设置为总core的2-3倍。
spark.jars
默认值: 无
解释: 指定jar包(用逗号分隔),会将其传到Driver、Executor端
建议: 除了spark lib下的包,其他额外引用的库建议都指定。(client模式下,只有Driver会用到的库,可以不传)
spark.jars.excludes
默认值: 无
解释: 排除不要的包,防止依赖冲突。格式 groupId:artifactId
建议: …
spark.executor.userClassPathFirst与spark.driver.userClassPathFirst
默认值: false
解释: 用于指定是否优先加载用户指定的jar
建议: 因为存在用户指定的jar与spark默认库下jar包可能冲突的问题,所以当冲突时,如果你能确定你的jar没问题,那么可以设置为true。(之前遇到一个华为的bug就是这样)
spark.driver.maxResultSize
默认值: 1g
解释: 数据传到Driver端的最大量,例如collect、take操作
建议: 有时候可能需要直接拉取大量的数据,可以根据数据量来调整该参数。
spark.reducer.maxSizeInFlight
默认值: 48m
解释: 每个reduce任务拉取数据的最大量
建议: 每个输出端都需要创建一个buffer,消耗比较大。如果内存比较大,可以提高该值,拉取速度会加快。
spark.shuffle.file.buffer
默认值: 32k
解释: shuffle时,数据输出到文件的buffer大小,写满buffer后,数据会溢写到磁盘。
建议: 调大该值,可以降低溢写到磁盘的次数(减少I/O次数),提高性能。
内存充足的话,可以调大,例如64k。
spark.shuffle.io.maxRetries
默认值: 3
解释: shuffle时,read端从write端拉取数据的重试次数
建议: 因为GC、网络延迟等问题可能会导致拉取失败,可以适当提高重试次数,防止意外。
spark.shuffle.io.retryWait
默认值: 5s
解释: spark.shuffle.io.maxRetries每次重试需要等多久
建议: 可以加大,提高稳定性
spark.shuffle.manager
默认值: sort
解释: 用于管理shuflle文件输出到磁盘的方式。建议百度看看详细流程。1.2版以前默认HashShuffleManager, 之后默认是SortShuffleManager。Spark 2后不再有该参数,直接是SortShuffleManager,默认会对数据进行排序。
建议: 可以通过spark.shuffle.sort.bypassMergeThreshold调整是否排序。
spark.shuffle.sort.bypassMergeThreshold
默认值: 200
解释: shffle时,如果read task数小于200(并且是非预聚合的Shuffle,例如reduceByKey是预聚合型),会启用bypass机制:不会进行排序操作,最后会合并task产生的文件,并创建索引
建议: 不需要排序时,可以提高该参数,以大于你的shuffle read task数。(会有不错的效率提升,我的一个应用降低了20%时间)
spark.kryo.registrationRequired
默认值: false
解释: 是否强制kryo序列化
建议: 如果为false,kyro需要为每个对象写未注册类的类名,会造成显著的性能开销。建议设置为true。(某些朋友对于JVM报出的有的类不知道怎么注册,在这里建议复制报错的类,使用Class.forName(“类”))
spark.rdd.compress
默认值: false
解释: 是否压缩序列化的RDD数据
建议: 开启可以减少内存,但是解压会增加CPU消耗时间
spark.scheduler.mode
默认值: FIFO
解释: 同一个SparkContext内的调度机制,包括FIFO、FAIR
建议: 一般使用较少。FIFO,先进先出,优先执行先提交的,有空闲的再给后面的job。FAIR,公平分配资源,为每个可以并行执行的job平均分配计算资源。
spark.streaming.backpressure.enabled
默认值: false
解释: 是否启用背压机制,控制SparkStream接收数据的速率。
建议: 流式处理中,处理速度如果较慢,会导致来的数据不断积压。启用后,Spark可以自己动态根据处理能力调整接收数据的量。如果存在积压情况,建议启用。另外还有几个参数,用于细节控制,不建议调整。
spark.streaming.blockInterval
默认值: 200ms
解释: 每批处理的task数 = 批处理间隔 / blockInterval
建议: blockInterval越大,task则越少,会导致部分core没有使用。可以根据你的core的量,适当降低该参数。(官方建议blockInterval最小值约为50ms)
spark.streaming.concurrentJobs
默认值: 1
解释: 一个SparkStream应用内可以同时运行多少个job。(没在官网找到该参数)
建议: 如果你分配的core比较多,每批的task数比较少(还可能处理时间比较长),空闲的core比较浪费,那么可以调高该参数,同时运行后面的job(如果2个job之间没有前后关联的话)
spark.driver.extraJavaOptions与spark.executor.extraJavaOptions
默认值: 无
解释: 用于指定JVM参数
建议: 看JVM调参部分
JVM调参
一般不要先调JVM,开发中的大多数问题都是代码质量不好导致的,先去看代码、业务逻辑是否有问题,优化、优化、再优化……^_^
然后,确定运行环境,再来调整JVM参数
注意,运行时默认值会随着JVM版本、系统环境而改变,请用 java -XX:+PrintFlagsFinal -version命令查看JVM最终参数
查看GC情况
查看每个节点的GC
添加-XX:PrintGCDetails,让每个节点打印GC日志,需要在每个节点分别查看。On YARN的话,可以点击WebUI界面查看每个节点日志。
想看Spark应用整体的吞吐量?
每个应用的Spark WebUI有展示,直接在这儿看就可以了。如果GC时间超过10%,那么说明你的应用需要优化了!
关于GC的选择(Java 8)
批处理,需要吞吐量较高?用 -XX:+UseParallelGC
流式处理,需要数据具有较高的一致性?用-XX:+UseConcMarkSweepGC 或 G1
琢磨不定?选 ParallelGC
批处理时想用G1?可以试试,但是吞吐量没有ParallelGC好(Java 8)
并行垃圾收集器参数(ParallelGC)
以吞吐量优先为准则
-XX:+UseParallelGC 设置年轻代使用ParallelGC,早期版本中老年代会默认使用SerialGC。
-XX:+UseParallelOldGC 设置老年代也使用ParallelOldGC,Java1.6后开始支持(我的系统中是默认开启的,你的不一定)
-XX:ParallelGCThreads=8 设置并行收集垃圾的线程数为8,一般同CPU核个数
-XX:MaxGCPauseMillis=100 设置ParallelGC在年轻代单次回收的最长耗时为100毫秒。如果耗时超过该值,JVM会自动调整年轻代内存大小,以适应该值。可以调大该值,例如500,以保证吞吐量。
-XX:GCTimeRatio=99 设置垃圾回收时间占总时间的百分比最高为1%,公式为1/(1+99),即吞吐量为99%。默认为99。
-XX:+UseAdaptiveSizePolicy 启用自适应策略。JVM会自动调整年轻代Eden区与Survivor区大小的比例,以适应GCTimeRatio的值。建议开启。
并发垃圾收集器参数(ConcMarkSweepGC)
以响应时间优先为准则
-XX:+UseConcMarkSweepGC 设置老年代使用ConcMarkSweepGC,年轻代默认使用ParNewGC。
-XX:+UseParNewGC 设置年轻代为ParNewGC。JDK5以上使用CMS时,默认年轻代会采用ParNewGC。
-XX:+UseCMSCompactAtFullCollection 开启压缩整理(默认),压缩整理用于消除内存碎片化问题(因为CMS采用的标记清除算法,不会进行压缩整理,所以要单独开启)
-XX:CMSFullGCsBeforeCompaction=2 设置进行2次FullGC后(默认0次),开始对内存进行压缩整理。
-XX:CMSInitiatingOccupancyFraction=70 设置老年代内存使用70%后开始进行并发垃圾收集
内存调整常用参数
常用比例
-Xmx4G 设置JVM最大堆内存为4G
-Xms4G 设置JVM初始堆内存为4G,一般设为与-Xmx同样大即可(避免重新分配内存)
-Xmn1G 设置年轻代内存大小为1G。堆内存 = 老年代 + 年轻代,因此年轻代与老年代之间要根据GC情况取得一个平衡。例如MinorGC较多,可以调大年轻代,MajorGC较多可以调大老年代(即调小年轻代)。
-XX:NewSize=512M 设置年轻代初始大小为512M,一般设为与MaxNewSize同样大即可。(不建议使用,直接用-Xmn)
-XX:MaxNewSize=1024M 设置年轻代最大大小为1024M。(不建议使用,直接用-Xmn)
-XX:MetaspaceSize=128M 设置元信息区(永久代)初始大小为256M,一般设为与MaxMetaspaceSize同样大即可。(Java8以前叫做-XX:PermSize)
-XX:MaxMetaspaceSize=256M 设置元信息区(永久代)最大大小为256M。(Java8以前叫做-XX:MaxPermSize)
-XX:NewRatio=2 设置年轻代与老年代的大小比例为 1 : 2 (Java8默认为2)
-XX:SurvivorRatio=8 设置年轻代中Eden区与Survivor的大小比例为 8 : 1 : 1 (enden : survivor0 : survivor1)
-Xss256K 设置每个线程私有栈的的大小为256K(Java8默认为1M)。线程越多内存占用越大,需要启用超多线程时,可以调低该参数,例如128k。
-XX:MaxDirectMemory=100M 设置最大堆外内存
其他常用参数
-XX:+PrintGCDetails 打印GC详细信息
-XX:+PrintGCTimeStamps 打印GC时的时间戳
-Xloggc:/home/jerry/logs/gc.log 设置GC日志输出目录
-XX:+HeapDumpOnOutOfMemoryError 当发生内存溢出异常时,dump出堆信息
-XX:HeapDumpPath=./my_java.hprof 指定dump堆信息的输出路径
-XX:MaxTenuringThreshold=8 设置对象在年轻代经历垃圾回收仍然存活8次后进入老年代。最大值15(因为JVM用4bit表示该值),默认值15(使用CMS时,默认为6)
-XX:+PrintFlagsFinal 打印JVM最终参数(例如 java -XX:+PrintFlagsFinal -version)
-XX:+PrintFlagsInitial 打印JVM初始参数(最终值会随环境改变)
未完待续 .
调优-官方文档
Tuning Spark:
https://spark.apache.org/docs/latest/tuning.html
SparkStream Performance Tuning:
https://spark.apache.org/docs/latest/streaming-programming-guide.html#performance-tuning