December 10, 2017

Apache Spark - A dummy's introduction - 4 (Example)

A dummy's introduction is split in 4 parts:
  1. Architecture
  2. Data Structure
  3. Execution
  4. Example

Consider the following the original code of one batch. It opens an input file, calls a process function on each of its rows and writes each of the processed input to an output file. It also prints the number of input rows in the end.


1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
private static final String INPUT_FILENAME = "E:\\test\\input.txt";
    private static final String OUTPUT_FILENAME = "E:\\test\\output.txt";

    public static void main() {

        FileReader fileReader = new FileReader(INPUT_FILENAME);
        FileWriter fileWriter = new FileWriter(OUTPUT_FILENAME);
        String inputline = null;
        String outputline = null;
  int inputCounter = 0;
        BufferedReader bufferedReader = new BufferedReader(fileReader);
        BufferedWriter bufferedWriter = new BufferedWriter(fileWriter);

        while ((inputline = bufferedReader.readLine()) != null) {
            outputline = process(inputline);
            bufferedWriter.write(outputline);
   inputCounter++;
        }
  System.out.println(inputCounter);
    }

    public static String process(String input) {
        return input.toUpperCase();
    }




If we were to try to convert this existing batch to Spark, the following points could be handy:
  1. Create the Spark Context
  2. Use it to read the whole file as an RDD
  3. Instead of calling process() on each line, just map the input RDD into a new processed RDD
  4. Since we won't be running a loop for every line, how we calculate input count? Well, the process() function is called for every record in the RDD, so we create an Accumulator and increment its value every time it goes in the process() function.
  5. Finally, we save the input as a text file, and done!
Here's converted code:


1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
private static final String INPUT_FILENAME = "E:\\test\\input.txt";
    private static final String OUTPUT_FILENAME = "E:\\test\\output.txt";

    public static void main() {
  SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
  JavaSparkContext sc = new JavaSparkContext(conf);
        
  JavaRDD input = sc.textFile(INPUT_FILENAME);
        Accumulator inputCounter = sc.doubleAccumulator(0);
        
        input.map(row -> process(row, inputCounter));
        
        input.saveAsTextFile(OUTPUT_FILENAME );
        System.out.println(inputCounter.value().longValue());

        //You could avoid the Accumulator and print the count by the count() function. 
  //But its not a good idea. Read below why
    }

    public static String process(String input, Accumulator inputCounter) {
        inputCounter.add(1d);
        return input.toUpperCase();
    }


Why didn't we use count() for printing the count of all lines? Because it is another action. We are already calling one action - saveAsTextFile. Once this action is performed, the input RDD is wiped off the memory. So if we write another action (say, input.count() ) then Spark will re-create the entire input RDD just to call one count function. To avoid multiple processing for multiple actions, we just use Accumulators instead.
Some people persist the RDD in the memory for the count() function. This is still not as good, because persisting the RDD in the disk will block a sizable part of the memory and the access time will increase too. 

Apache Spark - A dummy's introduction - 3 (Execution)

A dummy's introduction is split in 4 parts:
  1. Architecture
  2. Data Structure
  3. Execution
  4. Example

Introduction

Just knowing the data structure and its operations is not enough to understand how Spark operates. The order of execution matters. In fact, it is a concept which a developer has to always remember while writing the code so as to avoid any silly mistakes.

Spark is lazy in execution

Always remember - Spark is lazy. Lazy is the keyword while writing code. What exactly do we mean?
Now, we can either perform actions or transformations on the RDDs. The transformations always created another RDD and that's it. Let's say after the application finished, these newly transformed RDDs would be dumped. There is no scope of these RDD variables outside the application. However actions, they are different. They return values. Maybe files or integers but they return something which has a real purpose outside the scope of the application. 
So when we say Spark is lazy, Spark doesn't execute it's tasks until an action is performed. Meaning, Spark is aware that transformations yield no value and it does nothing until it sees that an action is really performed on the RDD. 
Example: If we create an RDD of fruits, then we call the following functions:
  1. map - convert to upper case
  2. filter - keep only starting with 'B'
  3. save result as a file
The code for the above example looks like this:


JavaRDD input = sc.textFile("E:\\test\\input.txt");
input.map(inputRecord -> inputRecord.toUpperCase());
input.filter(inputRecord -> inputRecord.startsWith('B'));
input.saveAsTextFile("E:\\test\\output.txt");


The steps 1 and 2, are transformations while step 3 is an action. Since Spark is lazy, nothing will really happen until Spark reads step 3. 
What we expect is:
But what happens is:
This is a beautiful feature about this framework - the executors don't perform any process until they see there's something valuable coming out of it. Isn't that similar to human behavior? If we never called the step 3, i.e. we never saved that file, why would we even bother to do steps 1 and 2? It would be a complete waste of time and resources to perform transformations without actions (Its useless to perform transformations when the result won't be remembered in any way).

Pipelining Execution

While Spark is lazy, Spark is lazy-efficient. Perhaps more efficient than we as developers are. Like how Bill Gates believes that lazy people find efficient ways to do things. 
In the above example, while we imagined spark to be working like a batch job, i.e. read inputs then process them all and then write them all at once, it may really not be so. Spark will perform sequences of transformations by element so no data is stored. 
So in our above example, the actual execution occurs as follows:
So in which case Spark behaves how? We cannot tell. But what we can surely tell is that we can trust Spark with the approach it chooses. It will always be efficient.

Partition Handling

I think its a good time now to understand how partitions are handled in Spark.
Now, we may have several executors in our cluster. That is completely upto us, how we configure spark. But what is not upto us, is how data is partitioned in Spark. To cite an example, let's say we want to read input file of roughly size 10,000 bytes. We can only specify the minimum number of partitions that the input file can be divided into. Spark takes a judgement and decides how many actual paritions of the original file will be made. If we specified a minimum parition of 2, we'd expect Spark to divide the file into two 5000 bytes parts. But if Spark calculates that partitioning the file into 10 paritions will lead to optimized run of the application, it will create 10 parts of 1000 bytes each. And you as a developer can do nothing about it. You have to leave certain things to the framework and this is one of them.
Similarly, the RDDs are also partitioned in reality. If we want to process an input file by loading it into an RDD, each file reads its partition, and created its "part" of the RDD. Which means, in reality, the whole RDD as we imagine it, is distributed across different executors.
It is important to remember at this point that each executor is immune from the others. This means each executor has its own set of variables. So when the input file is to be read in partitions, the Spark Application just decides the size of the partition. It sends each executor a copy of the entire file and their starting and end point of the partition. Then that executor reads its particular part of the file. Note how each executor even has a copy of the whole file!
Now, while writing into an output file, each executor will create its own partition of the output file and later the Spark Application will maintain it as one file.
You also cannot control individual executors in any way. You can control however how all executors behave. How that would be? Well by implementing the spark API.

Maintaining Order

The Spark Application partitions the data, something we have understood. But is the order of the records lost anywhere?
The two transformations of map and filter will never change the order of rows. This means the nth line of the input is the nth record of the RDD and when mapped into another RDD, it still remains the nth record.
However there are other transformations which are heavier and donot maintain the order. I haven't introduced them so far - groupBy and Join. You can read more about them later.

Apache Spark - A dummy's introduction - 2 (Data Structure)

A dummy's introduction is split in 4 parts:
  1. Architecture
  2. Data Structure
  3. Execution
  4. Example

Data Structures in Spark
Since we will use Java, of course we can use all our traditional data structures in the program. But, for Spark operations, we use special structures created for Spark. I'll introduce the most commonly used ones:

RDDs - Resilient Distributed Datasets

RDDs are the fundamental units of Spark. All of Spark processing revolves around RDDs. The complicated name implies:
Resilience: If data in the memory is lost, this data structure can be recreated
Distributed: An RDD is processed across the cluster
Dataset: It can be loaded from a file/database or can be created programatically using Collections/other data structures
You can imagine an RDD to be a set of records of any object type/ data type. Note that RDDs are immutable (just like Strings are).
Let's try to create an RDD of Strings from a List and imagine how it looks like in an RDD. 


JavaRDD myRddOfFruits = sc.parallelize(Arrays.asList("Apples", "Oranges", "Bananas", "Grapes"));
//sc is the SparkContext which is required to create any Spark data structure


Similarly let's try to create an RDD of Strings from a file as below:


JavaRDD myRddOfFruits = sc.textFile("/path/to/input.txt");


With this, we have learnt the first step of our batches: reading input.

Basic Operations on RDDs

RDDs are immutable and cannot be modified, however we can create new RDDs from existing RDDs. In this way we can proceed to the second step of a batch: process the input. We will take the previous read input RDD and operate on it to create a new RDD which will be our processed RDD. We can also operate on our input RDD to create other values like integers instead of creating another RDD.
There are two major operations on RDDs:
  1. Actions: These operations always return a value.

    Some common examples are:
    1. count()
    2. saveAsTextFile()
  2. Transformations: These operations create a new RDD out of the previous RDD. It is easy to note that transformations are the key for our second step in batches: processing.

    Mainly the following transformations are used: 
    1. map
    2. filter
Let's look at each with examples.

count()

This is an action performed on an RDD. Count function, as name suggests just returns the number of elements in a given RDD. (The value is the count of elements in this case). Let's try to call count function on our RDD as before.


//Read the file
JavaRDD myRddOfFruits = sc.textFile("/path/to/input.txt");
//Find out how many rows are in this file
long noOfFruits = myRddOfFruits.count();


If we were to imagine, it would look like this:

saveAsTextFile()

Let's say we want to read from a Collection and write it into a file. How would you do it? You'd create the RDD from the database. But how do you save the contents of an RDD into a file? This function is used. Let's say we want to save our fruits collection in a file from before. 


//Read the collection
JavaRDD myRddOfFruits = sc.parallelize(Arrays.asList("Apples", "Oranges", "Bananas", "Grapes"));
//Save this RDD in a file
myRddOfFruits.saveAsTextFile("/path/to/output.txt");


The value returned above was a text file. This is an action performed on an RDD. 
With this we just learnt our third step of the batch: writing to output. Further, we'll see how the second step i.e. processing works.

map

Map is a tranformation performed on an RDD. It performs a particular transformation/function on every record of the RDD. This becomes the key of our second step i.e. processing. What is processing anyway? We take each record and perform a function on it, right? So map is exactly that. Let's try to transform our previous RDD of fruits as an example.


//Read the collection
JavaRDD myRddOfFruits = sc.parallelize(Arrays.asList("Apples", "Oranges", "Bananas", "Grapes"));
//Convert all Strings to Upper case
myRddOfFruits.map(fruitString -> fruitString.toUpperCase());

We already know that myRddOfFruits is an RDD of Strings. This means that the data type of each record is a String. So to represent each record in this RDD, we used a variable name inside this map function - fruitString. Thus fruitString is each record in the RDD.
Just in one line a lot happened! In the fourth line of the code, we called the map function on our RDD. Let me further explain the functional syntax inside the map function.
Further, inside the map, we have defined what our new transformed RDD should be. We have instructed, that for each fruitString, we want fruitString.toUpperCase(). The toUpperCase() also returns a String, so the data types match!
Spark will create a new RDD, take each record of myRddOfFruits and make it upper case using the function provided by String class. This new RDD will be pointed by the variable myRddOfFruits. 
Imagination in Spark is essential, this is how we can think of the code above:
Note:
Let's say we called the count() function on myRddOfFruits before the map, we would get value as 5, right? What if we called count() after the map? It would also return 5. The map function runs for every record in the RDD. This means the count after map will always remain the same.

filter

Filter is again, a transformation on an RDD. However as the name suggests, it filters out some of the records and only keeps some. Hence, unlike map, the count after a filter may be equal or less than that of the original RDD. Let's try to filter our fruits RDD as an example:


//Read the file as an RDD
JavaRDD myRddOfFruits = sc.textFile("/path/to/input.txt");
//Filter such that only fruits starting with 'B' remain
myRddOfFruits.filter(record -> record.startsWith('B'));


Just like map, we passed a function inside the filter method. This time, we chose to call each record of the RDD as a variable named record. Note that we don't have to worry about the specification of the record. Then, we filtered our RDD such that only fruits starting with the letter 'B' remained in the RDD. 
When we try to picture, the new filtered RDD looks like this:
Notice how the count() has changed!

Conclusion

In short, RDDs are the fundamental unit in which Spark operates. We can perform actions and transformations on these RDDs to achieve our desired result. The actions return a value while transformations create a new RDD. The map and filter transformations follow a functional programming syntax where we operate on each record of the RDD using the lambda function.

Apache Spark - A dummy's introduction - 1 (Architecture)

A dummy's introduction is split in 4 parts:
  1. Architecture
  2. Data Structure
  3. Execution
  4. Example


I'll introduce the main components of the architecture in order as follows.

Spark Application

There is a manager of these workers, called the Driver Program. Many times this is referred to as the Spark Application itself. You just have to submit your application (the batch, in our case) to the Spark Application and it will take care of the rest. It will divide the whole application into tasks (e.g.: read input, process input, write output). It is also responsible for creating the groups of batches, or partitions (which is a more correct technical term). The driver is lazy (more on that in part 3).

Spark Context

Spark is a framework - in order to use this framework, we need to create an instance of Spark Application. In order to do this, we create an object of the class SparkContext. It is crucial as it establishes the connection to the spark cluster. The special data structures that Spark uses cannot be created on the cluster without SparkContext. Thus each application has to have the SparkContext.

Cluster Manager (Yarn)

Then there is a Cluster Manager (e.g.: Apache Mesos or Yarn). The Cluster Manager exists for resource management. It physically assigns all resources to all workers. Thus it works in coordination with the Spark Application and the workers. However the Spark Application is abstracted from the Cluster Manager's working. As a developer, we might never care to look into more details. However an operations person might be interested to look into its details. The Cluster Manager also runs the Spark UI where we can study the performance of the application. It lets you monitor running jobs, and view statistics and configuration.

Executor

The worker we have been referring to above is called an executor. It is the slave in Spark's master-slave architecture. It receives tasks from the Spark Application and it's job is to execute them as per the schedule. It is important to note that each executor has its own set of resources (as allocated by the Cluster Manager). Which means, to start with, your application (the batch's jar/war file) is copied on to every executor. They have their own variables, cache etc. In effect, you can imagine each executor works independently.

October 30, 2017

Determination

There is this fish in the Great Barrier Reef of Australia - tuskfish. 

He wakes up every morning and swims to the edge of the reef. Then he digs the corals and the rubble searching for something. It looks unnatural to watch something dig with its fins, no hands. But it continues, and then finally he finds a clam. He manages to hold it in its mouth and swim all the way back, except this time with the clam in his mouth. 

He arrives at his lunch table, its kind of a hollow coral with walls. Like a shallow well. It wastes no time, and it throws the clam’s hard white shell across the walls in an attempt to smash it. Nothing happens. He repeats, he keeps throwing the clam on the same spot with high accuracy. Not giving up. 

Finally there’s a small crack, after a hundred attempts. He throws it once more, with the same intensity and there, it breaks open. A small squiggly meat comes out and the fish devours it. The camera zooms out, you see inside the bowl shaped rock, so many - so so many of such broken shells, accumulating over days. The fish does this everyday, all its life. Its a mere fish.

What do I have to complain about? There's so much hard work to do.



Courtesy: http://www.bbc.co.uk/ Blue Planet II

September 26, 2017

Why Apache Spark and what is parallel processing?

Recently at Rakuten we successfully renewed our mainframe from COBOL to Java (an old Fujitsu to Oracle Exalogic) - press release. It was almost a 3 year long project and towards the end, the entire company was working on it. I transitioned into the development team for the mainframe, to rewrite several batches from Cobol to Java.

Now of course I cannot read COBOL (although its not that different). We had a machine translation software convert the cobol to java but honestly, that was hardly anything. The converted code was not smart, it used zero OOP concepts, it used customized stacks to remember function calls (haha, imagine writing your own implementation to remember function calls) and it looked like COBOL just changed its clothes. As a result, these batches performed very slow - their performance times significantly higher than their COBOL counterparts for the same data.

So the job was simple - to make these batches faster, in fact, much faster than their COBOL counterparts, otherwise what's the point?

My team chose Apache Spark as a framework to work parallelly with data. In this post, I am trying to explore why they made this decision and how Spark looks like compared to traditional batch processing.

Background of our batches:


For most of the batch processing we did, there was a fixed pattern to be noticed:

  1. Read data from database and/or file
  2. Process that data for each row of the database table/ each record of the file
  3. Write to database and/or file.

Assume we have an input file which needs to be processed and written into an output file. Assume we have 100 records in the input file.
Traditional Processing
If we were to use traditional processing, we would read the first record, process it, write it to output. Then we would read next record, process it, write to output. We would keep repeating this until all 100 rows. This is a single flow - and as we can guess, it would take us a long time to complete the whole operation. Here's how it can be represented:

Batch Processing

However we already use batch processing. We divide our input records into groups. Let's say groups of 20. That makes a total of 5 groups.
Here we read the first group together (i.e. 20 records), process them, write them to output. Then we would read the next group, process them, write them. We would keep repeating this for all 5 groups. Of course it is much faster to do this than traditional processing, since we save a lot of time switching between tasks and dealing with files. Something like this:

Parallel Processing

However,
What if, we use that same batch processing model as above but with a twist. Let's say you had 5 different machines. You could give each group to one machine. The total time taken would be one fifth that of the batch processing model. This approach is called parallel processing. Roughly, instead of one worker, you have multiple workers who are working in batches:
This is also the approach that Spark takes to process our batches. The working of Spark is slightly different than the diagram above but we will come to the details later, in the next post.

So, why Spark?

In conclusion, we can say that indeed batch processing is much faster than traditional processing. Further, the reason why parallel processing is faster than batch processing is because you operate with multiple groups at the same time instead of just one group at a time. Of course this means we need more resources to achieve this (e.g.: the number of workers required increases in the above diagrams).
Batches which were being run on our mainframe, had very little logic to them. It was about feeding data from one process to another based on a bunch of conditions. Of course what I say is a very very simplified version of how it really looks like. But in summary, there was more work to do in getting the input and writing the output than there was in processing the data in between. Spark fit this choice because, even if we just got rid of the bloated processing of the converted code, we would have practically made no progress with the I/O operation. The logic processing would have pretty much remained the same. Apache Spark blessed us with the power of forgetting about the logic, and concentrating only on the I/O to speed up the processing times. It came with its own challenges, like writing the custom file readers and writers but that's the story of another post!


July 07, 2017

Everyday Motivation

Nights like these which never cease
When the warm summer air refuses to hug
Tickles of sweat shining on forearms
The noise of the silence across the neighborhood
An array of mess waiting to be cleared
Bullet things to do piling on top of other
No drop of saliva wetting a mum mouth
Dishes lying unwashed, laundry won't do itself
A half eaten dinner won't fill up an empty heart

Misses the mere presence of you

Popular Posts