Sunday, 10 July 2016

Apache Spark Job Tuning - Part II

Tuning Resource Allocation
The two main resources that Spark (and YARN) think about are CPU and memory. Disk and network I/O, of course, play a part in Spark performance as well, but neither Spark nor YARN currently do anything to actively manage them.
Every Spark executor in an application has the same fixed number of cores and same fixed heap size. The number of cores can be specified with the --executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spark.executor.cores property in the spark-defaults.conf file or on a SparkConf object. Similarly, the heap size can be controlled with the --executor-memory flag or the spark.executor.memoryproperty.
The cores property controls the number of concurrent tasks an executor can run. --executor-cores 5 means that each executor can run a maximum of five tasks at the same time. The memory property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.
With the spark.dynamicAllocation.enabled property, Dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle.
It’s also important to think about how the resources requested by Spark will fit into what YARN has available. The relevant YARN properties are:
  • yarn.nodemanager.resource.memory-mb controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores controls the maximum sum of cores used by the containers on each node.
Asking for five executor cores will result in a request to YARN for five virtual cores. The memory requested from YARN is a little more complex for a couple reasons:
  • --executor-memory/spark.executor.memory controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. The value of the spark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor. It defaults to max(384, .07 * spark.executor.memory).
  • YARN may round the requested memory up a little. YARN’s yarn.scheduler.minimum-allocation-mb and yarn.scheduler.increment-allocation-mb properties control the minimum and increment request values respectively.
The hierarchy of memory properties in Spark and YARN:
Type ‘msinfo32.exe’ in Run prompt of Windows and hit Enter to get Memory and Core information; as shown below in snapshot for a given server with configs 32 GB Memory & 8 cores.
You can also view same info from Task Manager à Performance Tab as shown below.
Example, to make memory & executor management concrete:
Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities,yarn.nodemanager.resource.memory-mb andyarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes.
The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:
  • 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers.
  • The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node.
  • 15 cores per executor can lead to bad HDFS I/O throughput.
A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why?
  • This config results in three executors on all nodes except for the one with the AM, which will have two executors.
  • --executor-memory was derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19.
On similar terms, for our 6 Nodes cluster with 32GB and 8 cores, an optimal config can be derived as:
Case 1 --
3 executors on each node (except on one with AM) = 17 executors
Executor cores => 8/3 ~ 2 cores
Executor memory => 31/3 = 10 => 10*0.07 = 0.7 => 10-0.7 ~ 9
————————————————————————————————-
Case 2 --
2 executors on each node (except on one with AM) = 11 executors
Executor cores => 8/2 ~ 4 cores
Executor memory => 31/2 = 15 => 15*0.07 = 1.05 => 15-1.05 ~ 14
Tuning Parallelism
Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.
How is this number determined? The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: the coalesce transformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian creates an RDD with their product.
To determine the number of partitions in an RDD, you can always call rdd.partitions().size().
If there are fewer tasks than slots available to run them in, the stage won’t be taking advantage of all the CPU available. A small number of tasks also mean that more memory pressure is placed on any aggregation operations that occur in each task.
When the records destined for these aggregation operations do not easily fit in memory, some mayhem can ensue, like
  • Holding many records in these data structures puts pressure on garbage collection
  • When the records do not fit in memory, Spark will spill them to disk, which causes disk I/O and sorting.
So how do you increase the number of partitions? 1. If the stage in question is reading from Hadoop, your options are:
  • Use the repartition transformation, which will trigger a shuffle.
  • Configure your InputFormat to create more splits.
  • Write the input data out to HDFS with a smaller block size.
2. If the stage is getting its input from another stage, the transformation that triggered the stage boundary will accept a numPartitions argument, such as
What should “X” be? The most straightforward way to tune the number of partitions is experimentation: Look at the number of partitions in the parent RDD and then keep multiplying that by 1.5 until performance stops improving.
In fact, when in doubt, it’s almost always better to err on the side of a larger number of tasks (and thus partitions). This advice is in contrast to recommendations for MapReduce, which requires you to be more conservative with the number of tasks. The difference stems from the fact that MapReduce has a high startup overhead for tasks, while Spark does not.
Slimming Down Your Data Structures
Data flows through Spark in the form of records. A record has two representations: a deserialized Java object representation and a serialized binary representation. In general, Spark uses the deserialized representation for records in memory and the serialized representation for records stored on disk or being transferred over the network.
The spark.serializer property controls the serializer that’s used to convert between these two representations. The Kryo serializer,org.apache.spark.serializer.KryoSerializer, is the preferred option. It is unfortunately not the default, but the Kryo serializer should alwaysbe used.
Data Formats
Whenever you have the power to make the decision about how data is stored on disk, use an extensible binary format like Avro, Parquet, Thrift, or Protobuf. Pickone of these formats and stick to it. To be clear, when one talks about using Avro, Thrift, or Protobuf on Hadoop, they mean that each record is an Avro/Thrift/Protobuf struct stored in a sequence file. JSON is just not worth it.
************************* End of Part 2***********************
References - -

3 comments: