Monday, 11 July 2016

Getting started with Apache Spark on Windows in 10 minutes

Apache Spark is an open source cluster computing framework. Originally developed at the University of California, Berkeley's AMPLab, the Spark codebase was later donated to the Apache Software Foundation that has maintained it since. Spark provides an interface for programming entire clusters with implicit data parallelism and fault-tolerance.
For more details on Apache Spark please refer –
In this post I am going to discuss the steps involved in setting up Spark on Windows machine, and we will be using Python to interact with this system. The best part is that it will all take 10 minutes to get started and no additional tools like Cygwin or Git is required.
Pre-requisites:
  1. OS: Windows 7 +
    1. Minimum 2 Core processor
    2. Minimum RAM – 4 GB
    3. Minimum Disk space – 10 GB
  2. Apache Spark (latest stable version)
  3. Python – Anaconda version (download its installer from here)
    1. I am using Python 3
  4. Spyder IDE (will be installed as part of Anaconda)
  5. Interest in Apache Spark (most important of all)
Installation
For requisite 2, navigate to Apache Spark download page and download a latest stable version, in my case I selected pre-built one for Hadoop Version 2.6 as shown below:

Once downloaded, extract the contents and put the contents to a folder in C drive or D drive, as I did by creating a folder named ‘Spark’ in D drive and move the contents therein, as shown below:
And that’s it, Spark is installed now.

Now, as mentioned in previous section for requisite 3, I downloaded Python 3 version Anaconda and installed it on my machine with OS Windows 7 (64-bit), as shown below:

After the above installation is complete, the whole package should come up in your ‘All Programs’ at Windows Start button as shown below:
We can see that Python and Spyder IDE have been installed.

Setting up Spark
Now comes the real part, where we start with Spark using Python in an interactive mode. This mode we are going to proceed with is the enhanced version of PySpark, which is done using IPython console (an enhanced Python interpreter).

Step 1: To begin open the Spyder IDE, which will look like below:
Here, Leftmost pane is for project navigation space, middle one is code editor and leftmost has an active IPython console in interactive mode. (All are labeled in figure).

Step 2: Create a file setUpSpark.py in your project workspace and paste the code as shown below:
  1. # *** coding: utf-8 ***
  2. """
  3. Ensure the code have execute privileges
  4. ----------------------------------------------------------------------
  5. Execute this script once when Spyder is started on Windows
  6. ----------------------------------------------------------------------
  7. """
  8.  
  9. import os
  10. import sys
  11. os.chdir("<type your workspace directory here>")
  12. os.curdir
  13.  
  14. # Configure the environment.
  15. # Set this up to the directory where Spark is installed
  16. if 'SPARK_HOME' not in os.environ:
  17. os.environ['SPARK_HOME'] = '<Path where Spark is installed>'
  18.  
  19. # Create a variable for our root path
  20. SPARK_HOME = os.environ['SPARK_HOME']
  21.  
  22. #Add the following paths to the system path.
  23. sys.path.insert(0,os.path.join(SPARK_HOME,"python"))
  24. sys.path.insert(0,os.path.join(SPARK_HOME,"python","lib"))
  25. sys.path.insert(0,os.path \
  26. .join(SPARK_HOME,"python","lib","pyspark.zip"))
  27. sys.path.insert(0,os.path \
  28. .join(SPARK_HOME,"python","lib","py4j-0.9-src.zip"))
  29.  
  30. # Initiate Spark context.
  31. from pyspark import SparkContext
  32. from pyspark import SparkConf
  33.  
  34.  
  35. # Configure Spark Settings
  36. conf=SparkConf()
  37. conf.set("spark.executor.memory", "1g")
  38. conf.set("spark.cores.max", "2")
  39.  
  40. conf.setAppName("Shaz Spark")
  41.  
  42. ## Initialize SparkContext.
  43. sc = SparkContext('local', conf=conf)
  44.  
  45. #Test with a data file, I used an auto data file
  46. lines=sc.textFile("data/auto-data.csv")
  47. print (lines.count())
The code is self-explanatory with the comments I have added to along with it. Please let me know if any clarity is required.

Step 3: We have now our Spark application Up and running with the Spark Context as sc. View the application status using its web UI, to which you can visit using the web address ashttp://localhost:4040/jobs/
With this we conclude this post, and I encourage you to try hand on with different Spark capabilities. I will share some basic Spark operations in my next post. Till then, Happy Sparking.

Sunday, 10 July 2016

Apache Spark Job Tuning - Part II

Tuning Resource Allocation
The two main resources that Spark (and YARN) think about are CPU and memory. Disk and network I/O, of course, play a part in Spark performance as well, but neither Spark nor YARN currently do anything to actively manage them.
Every Spark executor in an application has the same fixed number of cores and same fixed heap size. The number of cores can be specified with the --executor-cores flag when invoking spark-submit, spark-shell, and pyspark from the command line, or by setting the spark.executor.cores property in the spark-defaults.conf file or on a SparkConf object. Similarly, the heap size can be controlled with the --executor-memory flag or the spark.executor.memoryproperty.
The cores property controls the number of concurrent tasks an executor can run. --executor-cores 5 means that each executor can run a maximum of five tasks at the same time. The memory property impacts the amount of data Spark can cache, as well as the maximum sizes of the shuffle data structures used for grouping, aggregations, and joins.
With the spark.dynamicAllocation.enabled property, Dynamic allocation enables a Spark application to request executors when there is a backlog of pending tasks and free up executors when idle.
It’s also important to think about how the resources requested by Spark will fit into what YARN has available. The relevant YARN properties are:
  • yarn.nodemanager.resource.memory-mb controls the maximum sum of memory used by the containers on each node.
  • yarn.nodemanager.resource.cpu-vcores controls the maximum sum of cores used by the containers on each node.
Asking for five executor cores will result in a request to YARN for five virtual cores. The memory requested from YARN is a little more complex for a couple reasons:
  • --executor-memory/spark.executor.memory controls the executor heap size, but JVMs can also use some memory off heap, for example for interned Strings and direct byte buffers. The value of the spark.yarn.executor.memoryOverhead property is added to the executor memory to determine the full memory request to YARN for each executor. It defaults to max(384, .07 * spark.executor.memory).
  • YARN may round the requested memory up a little. YARN’s yarn.scheduler.minimum-allocation-mb and yarn.scheduler.increment-allocation-mb properties control the minimum and increment request values respectively.
The hierarchy of memory properties in Spark and YARN:
Type ‘msinfo32.exe’ in Run prompt of Windows and hit Enter to get Memory and Core information; as shown below in snapshot for a given server with configs 32 GB Memory & 8 cores.
You can also view same info from Task Manager à Performance Tab as shown below.
Example, to make memory & executor management concrete:
Imagine a cluster with six nodes running NodeManagers, each equipped with 16 cores and 64GB of memory. The NodeManager capacities,yarn.nodemanager.resource.memory-mb andyarn.nodemanager.resource.cpu-vcores, should probably be set to 63 * 1024 = 64512 (megabytes) and 15 respectively. We avoid allocating 100% of the resources to YARN containers because the node needs some resources to run the OS and Hadoop daemons. In this case, we leave a gigabyte and a core for these system processes.
The likely first impulse would be to use --num-executors 6 --executor-cores 15 --executor-memory 63G. However, this is the wrong approach because:
  • 63GB + the executor memory overhead won’t fit within the 63GB capacity of the NodeManagers.
  • The application master will take up a core on one of the nodes, meaning that there won’t be room for a 15-core executor on that node.
  • 15 cores per executor can lead to bad HDFS I/O throughput.
A better option would be to use --num-executors 17 --executor-cores 5 --executor-memory 19G. Why?
  • This config results in three executors on all nodes except for the one with the AM, which will have two executors.
  • --executor-memory was derived as (63/3 executors per node) = 21. 21 * 0.07 = 1.47. 21 – 1.47 ~ 19.
On similar terms, for our 6 Nodes cluster with 32GB and 8 cores, an optimal config can be derived as:
Case 1 --
3 executors on each node (except on one with AM) = 17 executors
Executor cores => 8/3 ~ 2 cores
Executor memory => 31/3 = 10 => 10*0.07 = 0.7 => 10-0.7 ~ 9
————————————————————————————————-
Case 2 --
2 executors on each node (except on one with AM) = 11 executors
Executor cores => 8/2 ~ 4 cores
Executor memory => 31/2 = 15 => 15*0.07 = 1.05 => 15-1.05 ~ 14
Tuning Parallelism
Every Spark stage has a number of tasks, each of which processes data sequentially. In tuning Spark jobs, this number is probably the single most important parameter in determining performance.
How is this number determined? The number of tasks in a stage is the same as the number of partitions in the last RDD in the stage. The number of partitions in an RDD is the same as the number of partitions in the RDD on which it depends, with a couple exceptions: the coalesce transformation allows creating an RDD with fewer partitions than its parent RDD, the union transformation creates an RDD with the sum of its parents’ number of partitions, and cartesian creates an RDD with their product.
To determine the number of partitions in an RDD, you can always call rdd.partitions().size().
If there are fewer tasks than slots available to run them in, the stage won’t be taking advantage of all the CPU available. A small number of tasks also mean that more memory pressure is placed on any aggregation operations that occur in each task.
When the records destined for these aggregation operations do not easily fit in memory, some mayhem can ensue, like
  • Holding many records in these data structures puts pressure on garbage collection
  • When the records do not fit in memory, Spark will spill them to disk, which causes disk I/O and sorting.
So how do you increase the number of partitions? 1. If the stage in question is reading from Hadoop, your options are:
  • Use the repartition transformation, which will trigger a shuffle.
  • Configure your InputFormat to create more splits.
  • Write the input data out to HDFS with a smaller block size.
2. If the stage is getting its input from another stage, the transformation that triggered the stage boundary will accept a numPartitions argument, such as
What should “X” be? The most straightforward way to tune the number of partitions is experimentation: Look at the number of partitions in the parent RDD and then keep multiplying that by 1.5 until performance stops improving.
In fact, when in doubt, it’s almost always better to err on the side of a larger number of tasks (and thus partitions). This advice is in contrast to recommendations for MapReduce, which requires you to be more conservative with the number of tasks. The difference stems from the fact that MapReduce has a high startup overhead for tasks, while Spark does not.
Slimming Down Your Data Structures
Data flows through Spark in the form of records. A record has two representations: a deserialized Java object representation and a serialized binary representation. In general, Spark uses the deserialized representation for records in memory and the serialized representation for records stored on disk or being transferred over the network.
The spark.serializer property controls the serializer that’s used to convert between these two representations. The Kryo serializer,org.apache.spark.serializer.KryoSerializer, is the preferred option. It is unfortunately not the default, but the Kryo serializer should alwaysbe used.
Data Formats
Whenever you have the power to make the decision about how data is stored on disk, use an extensible binary format like Avro, Parquet, Thrift, or Protobuf. Pickone of these formats and stick to it. To be clear, when one talks about using Avro, Thrift, or Protobuf on Hadoop, they mean that each record is an Avro/Thrift/Protobuf struct stored in a sequence file. JSON is just not worth it.
************************* End of Part 2***********************
References - -

Apache Spark Job Tuning - Part I

Tuning Your Apache Spark Jobs - Part 1
Learn techniques for tuning your Apache Spark jobs for optimal efficiency —
When you write Apache Spark code and page through the public APIs, you come across words like transformationaction, and RDD. Understanding Spark at this level is vital for writing Spark programs. Similarly, when things start to fail, or when you venture into the web UI to try to understand why your application is taking so long, you’re confronted with a new vocabulary of words like jobstage, and task. Understanding Spark at this level is vital for writing good Spark programs, and of course by good, I mean fast. To write a Spark program that will execute efficiently, it is very, very helpful to understand Spark’s underlying execution model.
In this post, I have tried to explain the basics of how Spark programs are actually executed on a cluster. I have followed Spark manual and a cloudera blog, based on which devised an example to get a practical approach on this. Hope this helps to get some understanding on the said topic.
How Spark Executes Your Program
A Spark application consists of a single driver process and a set of executor processes scattered across nodes on the cluster.
The driver is the process that is in charge of the high-level control flow of work that needs to be done. The executor processes are responsible for executing this work, in the form of tasks, as well as for storing any data that the user chooses to cache. A single executor has a number of slots for running tasks, and will run many concurrently throughout its lifetime. Deploying these processes on the cluster is up to the cluster manager in use (YARN, Mesos, or Spark Standalone), but the driver and executor themselves exist in every Spark application.
At the top of the execution hierarchy are jobs. Invoking an action inside a Spark application triggers the launch of a Spark job to fulfil it. To decide what this job looks like, Spark examines the graph of RDDs on which that action depends and formulates an execution plan. This plan starts with the farthest-back RDDs—that is, those that depend on no other RDDs or reference already-cached data–and culminates in the final RDD required to produce the action’s results.
The execution plan consists of assembling the job’s transformations into stages. A stage corresponds to a collection of tasks that all execute the same code, each on a different subset of the data. Each stage contains a sequence of transformations that can be completed withoutshuffling the full data.
What determines whether data needs to be shuffled? Recall that an RDD comprises a fixed number of partitions, each of which comprises a number of records. For the RDDs returned by so-called narrow transformations like map and filter, the records required to compute the records in a single partition reside in a single partition in the parent RDD. Each object is only dependent on a single object in the parent.
However, Spark also supports transformations with wide dependencies such as groupByKey and reduceByKey. In these dependencies, the data required to compute the records in a single partition may reside in many partitions of the parent RDD. All of the tuples with the same key must end up in the same partition, processed by the same task. To satisfy these operations, Spark must execute a shuffle, which transfers data around the cluster and results in a new stage with a new set of partitions.
For example, consider the following code:
It executes a single action, which depends on a sequence of transformations on an RDD derived from a text file. This code would execute in a single stage, because none of the outputs of these three operations depend on data that can come from different partitions than their inputs.
In contrast, this code finds how many times each character appears in all the words that appear more than 1,000 times in a text file.
This process would break down into three stages. The reduceByKey operations result in stage boundaries, because computing their outputs requires repartitioning the data by keys.
Here is a more complicated transformation graph including a join transformation with multiple dependencies.
The pink boxes show the resulting stage graph used to execute it.
At each stage boundary, data is written to disk by tasks in the parent stages and then fetched over the network by tasks in the child stage. The number of data partitions in the parent stage may be different than the number of partitions in the child stage. Transformations that may trigger a stage boundary typically accept a numPartitions argument that determines how many partitions to split the data into in the child stage.
Just as the number of reducers is an important parameter in tuning MapReduce jobs, tuning the number of partitions at stage boundaries can often make or break an application’s performance. We’ll delve deeper into how to tune this number in a later section.
************************* End of Part 1***********************
References - -