江南体育网页设计建设站

[Spark]Spark常用的优化方法

日期:2024-04-22 15:03 / 作者:佚名

目录

优化目的

Spark-core的优化

Yarn 模式下动态资源调度

Shuffle阶段调优

MapPartitions分区替换map计算结果

使用foreachPartitions替代foreach

设置num-executors参数

设置executor-memory参数

?设置executor-cores

注意Collect的使用

使用reduceByKey替换groupByKey

数据倾斜

将HDFS上的文本格式数据转换为Parquet格式数据

Spark-sql的优化

使用分区表

使用广播

使用重分区优化小文件

宽依赖和窄依赖

Spark Shuffle

Spark Stage的划分

yarn-cluster和yarn-client模式

Spark的Executor内存分配

目录

优化目的

Spark-core的优化

Yarn 模式下动态资源调度

Shuffle阶段调优

MapPartitions分区替换map计算结果

使用foreachPartitions替代foreach

设置num-executors参数

设置executor-memory参数

?设置executor-cores

注意Collect的使用

使用reduceByKey替换groupByKey

数据倾斜

将HDFS上的文本格式数据转换为Parquet格式数据

Spark-sql的优化

使用分区表

使用广播

使用重分区优化小文件

宽依赖和窄依赖

Spark Shuffle

Spark Stage的划分

yarn-cluster和yarn-client模式

Spark的Executor内存分配

Job的划分



Spark调优的目标是在不影响其他业务正常运行的前提下,高效的完成业务目标,通常为了达成该目标,一般需要最大限度利用集群的物理资源,如CPU、内存、磁盘IO,使其某一项达到瓶颈。

原理:

动态资源调度就是为了解决的资源浪费和资源不合理,根据当前应用任务的负载情况,实时的增减Executor个数,从而实现动态分配资源,使整个Spark系统更加健康。

适合场景:批任务。特别在使用Spark作为一个常驻的服务时候,动态资源调度将大大的提高资源的利用率。例如JDBCServer服务,大多数时间该进程并不接受JDBC请求,因此将这段空闲时间的资源释放出来,将极大的节约集群的资源

条件:必须开启Yarn External Shuffle才能使用这个功能

spark.shuffle.service.enabled=true

spark.dynamicAllocation.enabled=true //开启动态资源调度

spark.dynamicAllocation.minExecutors //最小Executor个数。

spark.dynamicAllocation.initialExecutors //初始Executor个数。

spark.dynamicAllocation.maxExecutors ? //最大executor个

spark.dynamicAllocation.executorIdleTimeout //普通Executor空闲超时时间。 ?

1)使用序列化KryoSerializer方式

spark支持使用kryo序列化机制。kryo序列化机制,比默认的java序列化机制,速度要快,序列化后的数据要更小,大概是java序列化机制的1/10,所以kryo序列化优化后,可以让网络传输的数据变少,在集群中耗费的内存资源大大减少。

第一步,在sparkconf中设置:SparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer")

第二步,注册你使用到的,需要通过kryo序列化的一些自定义类: SparkConf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer").registerKryoClasses(new? Class[]{CategorySortKey.class})

2)设置合理并行度

调整并行度让任务的数量和每个任务处理的数据与机器的处理能力达到最优。 查看CPU使用情况和内存占用情况,当任务和数据不是平均分布在各节点,而是集中在个别节点时,可以增大并行度使任务和数据更均匀的分布在各个节点。增加任务的并行度,充分利用集群机器的计算能力,一般并行度设置为集群CPU总和的2-3倍。

设置平行度的方法:

(1)在会产生shuffle的操作函数内设置并行度参数,优先级最高

? RDD. groupByKey(10);

(2)在代码配置参数中设置并行度,优先级次之

SparkConf spConf=new SparkConf().setMaster("local[4]")

.set("spark.default.parallelism", "10")

(3)在spark-defaults.conf配置文件中配置,优先级最低

spark.default.parallelism=10

3)使用广播变量

原理:

Broadcast把数据集合分发到每一个节点上,Spark任务在执行过程中要使用这个数据集合时,就会在本地查找Broadcast过来的数据集合。如果不使用Broadcast,每次任务需要数据集合时,都会把数据序列化到任务里面,不但耗时,还使任务变得很大。

使用场景:

  1. 每个任务分片在执行中都需要同一份数据集合时,就可以把公共数据集Broadcast到每个节点,让每个节点在本地都保存一份。
  2. 大表和小表做join操作时可以把小表Broadcast到各个节点,从而就可以把join操作转变成普通的操作,减少了shuffle操作。

ArrayList list= new ArrayList();

? list.add("test");

? Broadcast bc= javaSparkContext.broadcast(list);

4)使用缓存

如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,也就是把中间计算结果缓存起来,避免每次迭代重计算。

常用的缓存方式:

MEMORY_ONLY_SER

MEMORY_ONLY

MEMORY_AND_DISK

DISK_ONLY

rdd.persist(StorageLevel.MEMORY_ONLY);

rdd.unpersist;

5)使用Checkpoint

checkpoint在spark中主要有两块应用:一块是在spark core中对RDD做checkpoint,可以切断做checkpoint RDD的依赖关系,将RDD数据保存到可靠存储(如HDFS)以便数据恢复;另外一块是应用在spark streaming中,使用checkpoint用来保存DStreamGraph以及相关配置信息,以便在Driver崩溃重启的时候能够接着之前进度继续进行处理(如之前waiting batch的job会在重启后继续处理)。

rdd.checkpoint() 并不会触发计算,只有在遇到action方法后,才会触发计算,在job执行完毕后,会启动checkpoint计算,对这个rdd再次触发一个job执行checkpoint计算。所以在checkpoint前,对rdd做cache,可以避免checkpoint计算过程中重新根据rdd依赖链计算。

javaSparkContext.setCheckpointDir(pathName);

rdd.cache();

rdd.checkpoint();

6)设置spark.shuffle.memoryFraction

该参数用于设置shuffle过程中一个task拉取到上个stage的task的输出后,进行聚合操作时能够使用的Executor内存的比例,默认是0.2。也就是说,Executor默认只有20%的内存用来进行该操作。shuffle操作在进行聚合时,如果发现使用的内存超出了这个20%的限制,那么多余的数据就会溢写到磁盘文件中去,此时就会极大地降低性能。

7)开启consolidateFiles优化

shuffle read的拉取过程是一边拉取一边进行聚合的。每个shuffle read task都会有一个自己的buffer缓冲,每次都只能拉取与buffer缓冲(这个缓存大小可以通过上面的参数来设定)相同大小的数据,然后通过内存中的一个Map进行聚合等操作。聚合完一批数据后,再拉取下一批数据,并放到buffer缓冲中进行聚合操作。一直循环,直到最后将所有数据到拉取完,并得到最终的结果。

开启consolidate机制之后,在shuffle write过程中,task就不是为下游stage的每个task创建一个磁盘文件了。此时会出现shuffleFileGroup的概念,每个shuffleFileGroup会对应一批磁盘文件,磁盘文件的数量与下游stage的task数量是相同的。一个Executor上有多少个CPU core,就可以并行执行多少个task。而第一批并行执行的每个task都会创建一个shuffleFileGroup,并将数据写入对应的磁盘文件内。

当Executor的CPU core执行完一批task,接着执行下一批task时,下一批task就会复用之前已有的shuffleFileGroup,包括其中的磁盘文件。也就是说,此时task会将数据写入已有的磁盘文件中,而不会写入新的磁盘文件中。因此,consolidate机制允许不同的task复用同一批磁盘文件,这样就可以有效将多个task的磁盘文件进行一定程度上的合并,从而大幅度减少磁盘文件的数量,进而提升shuffle write的性能。

?new SparkConf().set("spark.shuffle.consolidateFiles", "true") 默认为false。

使用mapPartitions,按每个分区计算结果

原理类似于“使用mapPartitions替代map”,也是一次函数调用处理一个partition的所有数据,而不是一次函数调用处理一条数据。在实践中发现,foreachPartitions类的算子,对性能的提升还是很有帮助的。比如在foreach函数中,将RDD中所有数据写Oracle,那么如果是普通的foreach算子,就会一条数据一条数据地写,每次函数调用可能就会创建一个数据库连接,此时就势必会频繁地创建和销毁数据库连接,性能是非常低下;但是如果用foreachPartitions算子一次性处理一个partition的数据,那么对于每个partition,只要创建一个数据库连接即可,然后执行批量插入操作,此时性能是比较高的。

该参数用于设置Spark作业总共要用多少个Executor进程来执行。Driver在向YARN集群管理器申请资源时,YARN集群管理器会尽可能按照设置来在集群的各个工作节点上,启动相应数量的Executor进程。这个参数非常之重要,如果不设置的话,默认只会给你启动少量的Executor进程,此时你的Spark作业的运行速度是非常慢的。

该参数设置的太少,无法充分利用集群资源;设置的太多的话,大部分队列可能无法给予充分的资源。建议该参数设置1-5。

该参数用于设置每个Executor进程的内存。Executor内存的大小,很多时候直接决定了Spark作业的性能,而且跟常见的JVM OOM异常也有直接的关联。

针对数据交换的业务场景,建议本参数设置在512M及以下。

该参数用于设置每个Executor进程的CPU core数量。这个参数决定了每个Executor进程并行执行task线程的能力。因为每个CPU core同一时间只能执行一个task线程,因此每个Executor进程的CPU core数量越多,越能够快速地执行完分配给自己的所有task线程。

collect操作会将Executor的数据发送到Driver端,因此使用collect前需要确保Driver端内存足够,以免Driver进程发生OutOfMemory异常。当不确定数据量大小时,可使用saveAsTextFile等操作把数据写入HDFS中。只有在能够大致确定数据大小且driver内存充足的时候,才能使用collect。

reduceByKey会在Map端做本地聚合,使得Shuffle过程更加平缓,而groupByKey等Shuffle操作不会在Map端做聚合。因此能使用reduceByKey的地方尽量使用该算子,避免出现groupByKey。

当数据发生倾斜,虽然没有GC(Gabage Collection,垃圾回收),但是task执行时间严重不一致。

  1. 需要重新设计key,以更小粒度的key使得task大小合理化。
  2. 修改并行度。

列式存储布局查询中只涉及到部分列,所以只需读取这些列对应的数据块,而不需要读取整个表的数据,从而减少I/O开销。Parquet还支持灵活的压缩选项,可以显著减少磁盘上的存储。

数据量在1GB以上的大表之间相互关联,或者对大表进行聚合操作前,可以在建表时先对大表根据关联字段或聚合字段进行分区。这样可以避免Shuffle操作,提高性能。

将小表BroadCast到各个节点上,从而转变成非shuffle操作,提高任务执行性能。

spark.sql.autoBroadcastJoin.Threshold= 10485760? //-1表示不广播

df.repartition(5).write.mode(SaveMode.Append).saveAsTable("t1");

1)宽依赖:是指1个父RDD分区对应多个子RDD的分区

如:groupByKey,reduceByKey,sortByKey,join 即使shuffle操作算子一般属于宽依赖。

2)窄依赖:是指一个或多个父RDD分区对应一个子RDD分区

如:map,filter,union,co-partioned join

即使:宽依赖就是1对多,窄依赖就是一对一或者多对一

在Spark的中,两个Stage之间就是shuffle,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager,也即shuffle管理器。ShuffleManager随着Spark的发展有两种实现的方式,分别为HashShuffleManager和SortShuffleManager,因此spark的Shuffle有Hash Shuffle和Sort Shuffle两种。

在Spark 1.2以前,默认的shuffle计算引擎是HashShuffleManager,Spark 1.2以后的版本中,默认的ShuffleManager改成了SortShuffleManager。HashShuffleManager会产生很多中间小文件,SortShuffleManager相较于HashShuffleManager来说,有了一定的改进。主要就在于,每个Task在进行shuffle操作时,虽然也会产生较多的临时磁盘文件,但是最后会将所有的临时文件合并(merge)成一个磁盘文件,因此每个Task就只有一个磁盘文件。在下一个stage的shuffle read task拉取自己的数据时,只要根据索引读取每个磁盘文件中的部分数据即可。

对RDD的操作分为transformation和action两类,真正的作业提交运行发生在action之后,调用action之后会将对原始输入数据的所有transformation操作封装成作业并向集群提交运行。这个过程大致可以如下描述:

  1. 由DAGScheduler对RDD之间的依赖性进行分析,通过DAG来分析各个RDD之间的转换依赖关系
  2. 根据DAGScheduler分析得到的RDD依赖关系将Job划分成多个stage
  3. 每个stage会生成一个TaskSet并提交给TaskScheduler,调度权转交给TaskScheduler,由它来负责分发task到worker执行

stage的划分:

stage的划分基于DAG确定依赖关系,将依赖链断开,每个stage内部可以并行运行,整个作业按照stage顺序依次执行,最终完成整个Job。Spark利用依赖关系,调度器从DAG图末端出发,逆向遍历整个依赖关系链,遇到ShuffleDependency(宽依赖关系的一种叫法)就断开,遇到NarrowDependency(窄依赖)就将其加入到当前stage。stage中task数目由stage末端的RDD分区个数来决定,RDD转换是基于分区的一种粗粒度计算,一个stage执行的结果就是这几个分区构成的RDD。

yarn-cluster和yarn-client模式的区别其实就是Application Master进程的区别,yarn-cluster模式下,driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行。然而yarn-cluster模式不适合运行交互类型的作业。而yarn-client模式下,Application Master仅仅向YARN请求executor,client会和请求的container通信来调度他们工作,也就是说Client不能离开。

yarn-cluster适用于生产环境;而yarn-client适用于交互和调试。

Spark在Executor上的内存分配

spark.serializer (default org.apache.spark.serializer.JavaSerializer )

? 建议设置为 org.apache.spark.serializer.KryoSerializer,因为KryoSerializer比JavaSerializer快,但是有可能会有些Object会序列化失败,这个时候就需要显示的对序列化失败的类进行KryoSerializer的注册,这个时候要配置spark.kryo.registrator参数

Spark Executor有两种内存:

堆内内存:受JVM管理

堆外内存:不受jvm管理

Executo堆内内存:

Spark在一个Executor中的内存分为三块,一块是execution内存,一块是storage内存,一块是other内存。

execution和storage是Spark Executor中内存的大户,other占用内存相对少很多。在spark-1.6.0以前的版本,execution和storage的内存分配是固定的,使用的参数配置分别是spark.shuffle.memoryFraction(execution内存占Executor总内存大小,default 0.2)和spark.storage.memoryFraction(storage内存占Executor内存大小,default 0.6),因为是1.6.0以前这两块内存是互相隔离的,这就导致了Executor的内存利用率不高,而且需要根据Application的具体情况,使用者自己来调节这两个参数才能优化Spark的内存使用。在spark-1.6.0以上的版本,execution内存和storage内存可以相互借用,提高了内存的Spark中内存的使用率,同时也减少了OOM的情况。

  1. execution内存是执行内存,文档中说join,aggregate都在这部分内存中执行,shuffle的数据也会先缓存在这个内存中,满了再写入磁盘,能够减少IO。其实map过程也是在这个内存中执行的。默认总内存的0.2,由Spark应用程序启动时的–executor-memory或spark.executor.memory参数配置。
  2. storage内存是存储broadcast,cache,persist数据的地方。默认总内存的0.6,通过spark.storage.storageFraction参数设置。

other内存是程序执行时预留给自己的内存。默认总内存的0.2。

Executo堆外内存:

在默认情况下,堆外内存并不启用,可通过配置spark.memory.offHeap.enabled参数启用,并由spark.memory.offHeap.size参数设定堆外空间的大小。堆外内存主要存储经过序列化的二进制数据。

堆外的空间分配较为简单,除了没有 other空间,存储内存、执行内存的大小同样是固定的,所有运行中的并发任务共享存储内存和执行内存。

1、Application :

应用,创建一个SparkContext可以认为创建了一个Application

2、Job;

在一个app中每执行一次行动算子就会创建一个Job,一个application会有多个job

3、stage;

阶段,每碰到一个shuffle算子,会产生一个新的stage,一个Job中可以包含多个stage;

4、task

任务,表示阶段执行的时候的并行度,一个stage会有多个task;

平台注册入口