December 10, 2017

Apache Spark - A dummy's introduction - 4 (Example)

A dummy's introduction is split in 4 parts:
  1. Architecture
  2. Data Structure
  3. Execution
  4. Example

Consider the following the original code of one batch. It opens an input file, calls a process function on each of its rows and writes each of the processed input to an output file. It also prints the number of input rows in the end.


1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static final String INPUT_FILENAME = "E:\\test\\input.txt";
    private static final String OUTPUT_FILENAME = "E:\\test\\output.txt";

    public static void main() {

        FileReader fileReader = new FileReader(INPUT_FILENAME);
        FileWriter fileWriter = new FileWriter(OUTPUT_FILENAME);
        String inputline = null;
        String outputline = null;
  int inputCounter = 0;
        BufferedReader bufferedReader = new BufferedReader(fileReader);
        BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);

        while ((inputline = bufferedReader.readLine()) != null) {
            outputline = process(inputline);
            bufferedWriter.write(outputline);
   inputCounter++;
        }
  System.out.println(inputCounter);
    }

    public static String process(String input) {
        return input.toUpperCase();
    }




If we were to try to convert this existing batch to Spark, the following points could be handy:
  1. Create the Spark Context
  2. Use it to read the whole file as an RDD
  3. Instead of calling process() on each line, just map the input RDD into a new processed RDD
  4. Since we won't be running a loop for every line, how we calculate input count? Well, the process() function is called for every record in the RDD, so we create an Accumulator and increment its value every time it goes in the process() function.
  5. Finally, we save the input as a text file, and done!
Here's converted code:


1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static final String INPUT_FILENAME = "E:\\test\\input.txt";
    private static final String OUTPUT_FILENAME = "E:\\test\\output.txt";

    public static void main() {
  SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
  JavaSparkContext sc = new JavaSparkContext(conf);
        
  JavaRDD input = sc.textFile(INPUT_FILENAME);
        Accumulator inputCounter = sc.doubleAccumulator(0);
        
        input.map(row -> process(row, inputCounter));
        
        input.saveAsTextFile(OUTPUT_FILENAME );
        System.out.println(inputCounter.value().longValue());

        //You could avoid the Accumulator and print the count by the count() function. 
  //But its not a good idea. Read below why
    }

    public static String process(String input, Accumulator inputCounter) {
        inputCounter.add(1d);
        return input.toUpperCase();
    }


Why didn't we use count() for printing the count of all lines? Because it is another action. We are already calling one action - saveAsTextFile. Once this action is performed, the input RDD is wiped off the memory. So if we write another action (say, input.count() ) then Spark will re-create the entire input RDD just to call one count function. To avoid multiple processing for multiple actions, we just use Accumulators instead.
Some people persist the RDD in the memory for the count() function. This is still not as good, because persisting the RDD in the disk will block a sizable part of the memory and the access time will increase too. 

Apache Spark - A dummy's introduction - 3 (Execution)

A dummy's introduction is split in 4 parts:
  1. Architecture
  2. Data Structure
  3. Execution
  4. Example

Introduction

Just knowing the data structure and its operations is not enough to understand how Spark operates. The order of execution matters. In fact, it is a concept which a developer has to always remember while writing the code so as to avoid any silly mistakes.

Spark is lazy in execution

Always remember - Spark is lazy. Lazy is the keyword while writing code. What exactly do we mean?
Now, we can either perform actions or transformations on the RDDs. The transformations always created another RDD and that's it. Let's say after the application finished, these newly transformed RDDs would be dumped. There is no scope of these RDD variables outside the application. However actions, they are different. They return values. Maybe files or integers but they return something which has a real purpose outside the scope of the application. 
So when we say Spark is lazy, Spark doesn't execute it's tasks until an action is performed. Meaning, Spark is aware that transformations yield no value and it does nothing until it sees that an action is really performed on the RDD. 
Example: If we create an RDD of fruits, then we call the following functions:
  1. map - convert to upper case
  2. filter - keep only starting with 'B'
  3. save result as a file
The code for the above example looks like this:


JavaRDD input = sc.textFile("E:\\test\\input.txt");
input.map(inputRecord -> inputRecord.toUpperCase());
input.filter(inputRecord -> inputRecord.startsWith('B'));
input.saveAsTextFile("E:\\test\\output.txt");


The steps 1 and 2, are transformations while step 3 is an action. Since Spark is lazy, nothing will really happen until Spark reads step 3. 
What we expect is:
But what happens is:
This is a beautiful feature about this framework - the executors don't perform any process until they see there's something valuable coming out of it. Isn't that similar to human behavior? If we never called the step 3, i.e. we never saved that file, why would we even bother to do steps 1 and 2? It would be a complete waste of time and resources to perform transformations without actions (Its useless to perform transformations when the result won't be remembered in any way).

Pipelining Execution

While Spark is lazy, Spark is lazy-efficient. Perhaps more efficient than we as developers are. Like how Bill Gates believes that lazy people find efficient ways to do things. 
In the above example, while we imagined spark to be working like a batch job, i.e. read inputs then process them all and then write them all at once, it may really not be so. Spark will perform sequences of transformations by element so no data is stored. 
So in our above example, the actual execution occurs as follows:
So in which case Spark behaves how? We cannot tell. But what we can surely tell is that we can trust Spark with the approach it chooses. It will always be efficient.

Partition Handling

I think its a good time now to understand how partitions are handled in Spark.
Now, we may have several executors in our cluster. That is completely upto us, how we configure spark. But what is not upto us, is how data is partitioned in Spark. To cite an example, let's say we want to read input file of roughly size 10,000 bytes. We can only specify the minimum number of partitions that the input file can be divided into. Spark takes a judgement and decides how many actual paritions of the original file will be made. If we specified a minimum parition of 2, we'd expect Spark to divide the file into two 5000 bytes parts. But if Spark calculates that partitioning the file into 10 paritions will lead to optimized run of the application, it will create 10 parts of 1000 bytes each. And you as a developer can do nothing about it. You have to leave certain things to the framework and this is one of them.
Similarly, the RDDs are also partitioned in reality. If we want to process an input file by loading it into an RDD, each file reads its partition, and created its "part" of the RDD. Which means, in reality, the whole RDD as we imagine it, is distributed across different executors.
It is important to remember at this point that each executor is immune from the others. This means each executor has its own set of variables. So when the input file is to be read in partitions, the Spark Application just decides the size of the partition. It sends each executor a copy of the entire file and their starting and end point of the partition. Then that executor reads its particular part of the file. Note how each executor even has a copy of the whole file!
Now, while writing into an output file, each executor will create its own partition of the output file and later the Spark Application will maintain it as one file.
You also cannot control individual executors in any way. You can control however how all executors behave. How that would be? Well by implementing the spark API.

Maintaining Order

The Spark Application partitions the data, something we have understood. But is the order of the records lost anywhere?
The two transformations of map and filter will never change the order of rows. This means the nth line of the input is the nth record of the RDD and when mapped into another RDD, it still remains the nth record.
However there are other transformations which are heavier and donot maintain the order. I haven't introduced them so far - groupBy and Join. You can read more about them later.

Apache Spark - A dummy's introduction - 2 (Data Structure)

A dummy's introduction is split in 4 parts:
  1. Architecture
  2. Data Structure
  3. Execution
  4. Example

Data Structures in Spark
Since we will use Java, of course we can use all our traditional data structures in the program. But, for Spark operations, we use special structures created for Spark. I'll introduce the most commonly used ones:

RDDs - Resilient Distributed Datasets

RDDs are the fundamental units of Spark. All of Spark processing revolves around RDDs. The complicated name implies:
Resilience: If data in the memory is lost, this data structure can be recreated
Distributed: An RDD is processed across the cluster
Dataset: It can be loaded from a file/database or can be created programatically using Collections/other data structures
You can imagine an RDD to be a set of records of any object type/ data type. Note that RDDs are immutable (just like Strings are).
Let's try to create an RDD of Strings from a List and imagine how it looks like in an RDD. 


JavaRDD myRddOfFruits = sc.parallelize(Arrays.asList("Apples", "Oranges", "Bananas", "Grapes"));
//sc is the SparkContext which is required to create any Spark data structure


Similarly let's try to create an RDD of Strings from a file as below:


JavaRDD myRddOfFruits = sc.textFile("/path/to/input.txt");


With this, we have learnt the first step of our batches: reading input.

Basic Operations on RDDs

RDDs are immutable and cannot be modified, however we can create new RDDs from existing RDDs. In this way we can proceed to the second step of a batch: process the input. We will take the previous read input RDD and operate on it to create a new RDD which will be our processed RDD. We can also operate on our input RDD to create other values like integers instead of creating another RDD.
There are two major operations on RDDs:
  1. Actions: These operations always return a value.

    Some common examples are:
    1. count()
    2. saveAsTextFile()
  2. Transformations: These operations create a new RDD out of the previous RDD. It is easy to note that transformations are the key for our second step in batches: processing.

    Mainly the following transformations are used: 
    1. map
    2. filter
Let's look at each with examples.

count()

This is an action performed on an RDD. Count function, as name suggests just returns the number of elements in a given RDD. (The value is the count of elements in this case). Let's try to call count function on our RDD as before.


//Read the file
JavaRDD myRddOfFruits = sc.textFile("/path/to/input.txt");
//Find out how many rows are in this file
long noOfFruits = myRddOfFruits.count();


If we were to imagine, it would look like this:

saveAsTextFile()

Let's say we want to read from a Collection and write it into a file. How would you do it? You'd create the RDD from the database. But how do you save the contents of an RDD into a file? This function is used. Let's say we want to save our fruits collection in a file from before. 


//Read the collection
JavaRDD myRddOfFruits = sc.parallelize(Arrays.asList("Apples", "Oranges", "Bananas", "Grapes"));
//Save this RDD in a file
myRddOfFruits.saveAsTextFile("/path/to/output.txt");


The value returned above was a text file. This is an action performed on an RDD. 
With this we just learnt our third step of the batch: writing to output. Further, we'll see how the second step i.e. processing works.

map

Map is a tranformation performed on an RDD. It performs a particular transformation/function on every record of the RDD. This becomes the key of our second step i.e. processing. What is processing anyway? We take each record and perform a function on it, right? So map is exactly that. Let's try to transform our previous RDD of fruits as an example.


//Read the collection
JavaRDD myRddOfFruits = sc.parallelize(Arrays.asList("Apples", "Oranges", "Bananas", "Grapes"));
//Convert all Strings to Upper case
myRddOfFruits.map(fruitString -> fruitString.toUpperCase());

We already know that myRddOfFruits is an RDD of Strings. This means that the data type of each record is a String. So to represent each record in this RDD, we used a variable name inside this map function - fruitString. Thus fruitString is each record in the RDD.
Just in one line a lot happened! In the fourth line of the code, we called the map function on our RDD. Let me further explain the functional syntax inside the map function.
Further, inside the map, we have defined what our new transformed RDD should be. We have instructed, that for each fruitString, we want fruitString.toUpperCase(). The toUpperCase() also returns a String, so the data types match!
Spark will create a new RDD, take each record of myRddOfFruits and make it upper case using the function provided by String class. This new RDD will be pointed by the variable myRddOfFruits. 
Imagination in Spark is essential, this is how we can think of the code above:
Note:
Let's say we called the count() function on myRddOfFruits before the map, we would get value as 5, right? What if we called count() after the map? It would also return 5. The map function runs for every record in the RDD. This means the count after map will always remain the same.

filter

Filter is again, a transformation on an RDD. However as the name suggests, it filters out some of the records and only keeps some. Hence, unlike map, the count after a filter may be equal or less than that of the original RDD. Let's try to filter our fruits RDD as an example:


//Read the file as an RDD
JavaRDD myRddOfFruits = sc.textFile("/path/to/input.txt");
//Filter such that only fruits starting with 'B' remain
myRddOfFruits.filter(record -> record.startsWith('B'));


Just like map, we passed a function inside the filter method. This time, we chose to call each record of the RDD as a variable named record. Note that we don't have to worry about the specification of the record. Then, we filtered our RDD such that only fruits starting with the letter 'B' remained in the RDD. 
When we try to picture, the new filtered RDD looks like this:
Notice how the count() has changed!

Conclusion

In short, RDDs are the fundamental unit in which Spark operates. We can perform actions and transformations on these RDDs to achieve our desired result. The actions return a value while transformations create a new RDD. The map and filter transformations follow a functional programming syntax where we operate on each record of the RDD using the lambda function.

Apache Spark - A dummy's introduction - 1 (Architecture)

A dummy's introduction is split in 4 parts:
  1. Architecture
  2. Data Structure
  3. Execution
  4. Example


I'll introduce the main components of the architecture in order as follows.

Spark Application

There is a manager of these workers, called the Driver Program. Many times this is referred to as the Spark Application itself. You just have to submit your application (the batch, in our case) to the Spark Application and it will take care of the rest. It will divide the whole application into tasks (e.g.: read input, process input, write output). It is also responsible for creating the groups of batches, or partitions (which is a more correct technical term). The driver is lazy (more on that in part 3).

Spark Context

Spark is a framework - in order to use this framework, we need to create an instance of Spark Application. In order to do this, we create an object of the class SparkContext. It is crucial as it establishes the connection to the spark cluster. The special data structures that Spark uses cannot be created on the cluster without SparkContext. Thus each application has to have the SparkContext.

Cluster Manager (Yarn)

Then there is a Cluster Manager (e.g.: Apache Mesos or Yarn). The Cluster Manager exists for resource management. It physically assigns all resources to all workers. Thus it works in coordination with the Spark Application and the workers. However the Spark Application is abstracted from the Cluster Manager's working. As a developer, we might never care to look into more details. However an operations person might be interested to look into its details. The Cluster Manager also runs the Spark UI where we can study the performance of the application. It lets you monitor running jobs, and view statistics and configuration.

Executor

The worker we have been referring to above is called an executor. It is the slave in Spark's master-slave architecture. It receives tasks from the Spark Application and it's job is to execute them as per the schedule. It is important to note that each executor has its own set of resources (as allocated by the Cluster Manager). Which means, to start with, your application (the batch's jar/war file) is copied on to every executor. They have their own variables, cache etc. In effect, you can imagine each executor works independently.

Popular Posts