December 10, 2017

Apache Spark - A dummy's introduction - 2 (Data Structure)

A dummy's introduction is split in 4 parts:
  1. Architecture
  2. Data Structure
  3. Execution
  4. Example

Data Structures in Spark
Since we will use Java, of course we can use all our traditional data structures in the program. But, for Spark operations, we use special structures created for Spark. I'll introduce the most commonly used ones:

RDDs - Resilient Distributed Datasets

RDDs are the fundamental units of Spark. All of Spark processing revolves around RDDs. The complicated name implies:
Resilience: If data in the memory is lost, this data structure can be recreated
Distributed: An RDD is processed across the cluster
Dataset: It can be loaded from a file/database or can be created programatically using Collections/other data structures
You can imagine an RDD to be a set of records of any object type/ data type. Note that RDDs are immutable (just like Strings are).
Let's try to create an RDD of Strings from a List and imagine how it looks like in an RDD. 


JavaRDD myRddOfFruits = sc.parallelize(Arrays.asList("Apples", "Oranges", "Bananas", "Grapes"));
//sc is the SparkContext which is required to create any Spark data structure


Similarly let's try to create an RDD of Strings from a file as below:


JavaRDD myRddOfFruits = sc.textFile("/path/to/input.txt");


With this, we have learnt the first step of our batches: reading input.

Basic Operations on RDDs

RDDs are immutable and cannot be modified, however we can create new RDDs from existing RDDs. In this way we can proceed to the second step of a batch: process the input. We will take the previous read input RDD and operate on it to create a new RDD which will be our processed RDD. We can also operate on our input RDD to create other values like integers instead of creating another RDD.
There are two major operations on RDDs:
  1. Actions: These operations always return a value.

    Some common examples are:
    1. count()
    2. saveAsTextFile()
  2. Transformations: These operations create a new RDD out of the previous RDD. It is easy to note that transformations are the key for our second step in batches: processing.

    Mainly the following transformations are used: 
    1. map
    2. filter
Let's look at each with examples.

count()

This is an action performed on an RDD. Count function, as name suggests just returns the number of elements in a given RDD. (The value is the count of elements in this case). Let's try to call count function on our RDD as before.


//Read the file
JavaRDD myRddOfFruits = sc.textFile("/path/to/input.txt");
//Find out how many rows are in this file
long noOfFruits = myRddOfFruits.count();


If we were to imagine, it would look like this:

saveAsTextFile()

Let's say we want to read from a Collection and write it into a file. How would you do it? You'd create the RDD from the database. But how do you save the contents of an RDD into a file? This function is used. Let's say we want to save our fruits collection in a file from before. 


//Read the collection
JavaRDD myRddOfFruits = sc.parallelize(Arrays.asList("Apples", "Oranges", "Bananas", "Grapes"));
//Save this RDD in a file
myRddOfFruits.saveAsTextFile("/path/to/output.txt");


The value returned above was a text file. This is an action performed on an RDD. 
With this we just learnt our third step of the batch: writing to output. Further, we'll see how the second step i.e. processing works.

map

Map is a tranformation performed on an RDD. It performs a particular transformation/function on every record of the RDD. This becomes the key of our second step i.e. processing. What is processing anyway? We take each record and perform a function on it, right? So map is exactly that. Let's try to transform our previous RDD of fruits as an example.


//Read the collection
JavaRDD myRddOfFruits = sc.parallelize(Arrays.asList("Apples", "Oranges", "Bananas", "Grapes"));
//Convert all Strings to Upper case
myRddOfFruits.map(fruitString -> fruitString.toUpperCase());

We already know that myRddOfFruits is an RDD of Strings. This means that the data type of each record is a String. So to represent each record in this RDD, we used a variable name inside this map function - fruitString. Thus fruitString is each record in the RDD.
Just in one line a lot happened! In the fourth line of the code, we called the map function on our RDD. Let me further explain the functional syntax inside the map function.
Further, inside the map, we have defined what our new transformed RDD should be. We have instructed, that for each fruitString, we want fruitString.toUpperCase(). The toUpperCase() also returns a String, so the data types match!
Spark will create a new RDD, take each record of myRddOfFruits and make it upper case using the function provided by String class. This new RDD will be pointed by the variable myRddOfFruits. 
Imagination in Spark is essential, this is how we can think of the code above:
Note:
Let's say we called the count() function on myRddOfFruits before the map, we would get value as 5, right? What if we called count() after the map? It would also return 5. The map function runs for every record in the RDD. This means the count after map will always remain the same.

filter

Filter is again, a transformation on an RDD. However as the name suggests, it filters out some of the records and only keeps some. Hence, unlike map, the count after a filter may be equal or less than that of the original RDD. Let's try to filter our fruits RDD as an example:


//Read the file as an RDD
JavaRDD myRddOfFruits = sc.textFile("/path/to/input.txt");
//Filter such that only fruits starting with 'B' remain
myRddOfFruits.filter(record -> record.startsWith('B'));


Just like map, we passed a function inside the filter method. This time, we chose to call each record of the RDD as a variable named record. Note that we don't have to worry about the specification of the record. Then, we filtered our RDD such that only fruits starting with the letter 'B' remained in the RDD. 
When we try to picture, the new filtered RDD looks like this:
Notice how the count() has changed!

Conclusion

In short, RDDs are the fundamental unit in which Spark operates. We can perform actions and transformations on these RDDs to achieve our desired result. The actions return a value while transformations create a new RDD. The map and filter transformations follow a functional programming syntax where we operate on each record of the RDD using the lambda function.

Apache Spark - A dummy's introduction - 1 (Architecture)

A dummy's introduction is split in 4 parts:
  1. Architecture
  2. Data Structure
  3. Execution
  4. Example


I'll introduce the main components of the architecture in order as follows.

Spark Application

There is a manager of these workers, called the Driver Program. Many times this is referred to as the Spark Application itself. You just have to submit your application (the batch, in our case) to the Spark Application and it will take care of the rest. It will divide the whole application into tasks (e.g.: read input, process input, write output). It is also responsible for creating the groups of batches, or partitions (which is a more correct technical term). The driver is lazy (more on that in part 3).

Spark Context

Spark is a framework - in order to use this framework, we need to create an instance of Spark Application. In order to do this, we create an object of the class SparkContext. It is crucial as it establishes the connection to the spark cluster. The special data structures that Spark uses cannot be created on the cluster without SparkContext. Thus each application has to have the SparkContext.

Cluster Manager (Yarn)

Then there is a Cluster Manager (e.g.: Apache Mesos or Yarn). The Cluster Manager exists for resource management. It physically assigns all resources to all workers. Thus it works in coordination with the Spark Application and the workers. However the Spark Application is abstracted from the Cluster Manager's working. As a developer, we might never care to look into more details. However an operations person might be interested to look into its details. The Cluster Manager also runs the Spark UI where we can study the performance of the application. It lets you monitor running jobs, and view statistics and configuration.

Executor

The worker we have been referring to above is called an executor. It is the slave in Spark's master-slave architecture. It receives tasks from the Spark Application and it's job is to execute them as per the schedule. It is important to note that each executor has its own set of resources (as allocated by the Cluster Manager). Which means, to start with, your application (the batch's jar/war file) is copied on to every executor. They have their own variables, cache etc. In effect, you can imagine each executor works independently.

October 30, 2017

Determination

There is this fish in the Great Barrier Reef of Australia - tuskfish. 

He wakes up every morning and swims to the edge of the reef. Then he digs the corals and the rubble searching for something. It looks unnatural to watch something dig with its fins, no hands. But it continues, and then finally he finds a clam. He manages to hold it in its mouth and swim all the way back, except this time with the clam in his mouth. 

He arrives at his lunch table, its kind of a hollow coral with walls. Like a shallow well. It wastes no time, and it throws the clam’s hard white shell across the walls in an attempt to smash it. Nothing happens. He repeats, he keeps throwing the clam on the same spot with high accuracy. Not giving up. 

Finally there’s a small crack, after a hundred attempts. He throws it once more, with the same intensity and there, it breaks open. A small squiggly meat comes out and the fish devours it. The camera zooms out, you see inside the bowl shaped rock, so many - so so many of such broken shells, accumulating over days. The fish does this everyday, all its life. Its a mere fish.

What do I have to complain about? There's so much hard work to do.



Courtesy: http://www.bbc.co.uk/ Blue Planet II

September 26, 2017

Why Apache Spark and what is parallel processing?

Recently at Rakuten we successfully renewed our mainframe from COBOL to Java (an old Fujitsu to Oracle Exalogic) - press release. It was almost a 3 year long project and towards the end, the entire company was working on it. I transitioned into the development team for the mainframe, to rewrite several batches from Cobol to Java.

Now of course I cannot read COBOL (although its not that different). We had a machine translation software convert the cobol to java but honestly, that was hardly anything. The converted code was not smart, it used zero OOP concepts, it used customized stacks to remember function calls (haha, imagine writing your own implementation to remember function calls) and it looked like COBOL just changed its clothes. As a result, these batches performed very slow - their performance times significantly higher than their COBOL counterparts for the same data.

So the job was simple - to make these batches faster, in fact, much faster than their COBOL counterparts, otherwise what's the point?

My team chose Apache Spark as a framework to work parallelly with data. In this post, I am trying to explore why they made this decision and how Spark looks like compared to traditional batch processing.

Background of our batches:


For most of the batch processing we did, there was a fixed pattern to be noticed:

  1. Read data from database and/or file
  2. Process that data for each row of the database table/ each record of the file
  3. Write to database and/or file.

Assume we have an input file which needs to be processed and written into an output file. Assume we have 100 records in the input file.
Traditional Processing
If we were to use traditional processing, we would read the first record, process it, write it to output. Then we would read next record, process it, write to output. We would keep repeating this until all 100 rows. This is a single flow - and as we can guess, it would take us a long time to complete the whole operation. Here's how it can be represented:

Batch Processing

However we already use batch processing. We divide our input records into groups. Let's say groups of 20. That makes a total of 5 groups.
Here we read the first group together (i.e. 20 records), process them, write them to output. Then we would read the next group, process them, write them. We would keep repeating this for all 5 groups. Of course it is much faster to do this than traditional processing, since we save a lot of time switching between tasks and dealing with files. Something like this:

Parallel Processing

However,
What if, we use that same batch processing model as above but with a twist. Let's say you had 5 different machines. You could give each group to one machine. The total time taken would be one fifth that of the batch processing model. This approach is called parallel processing. Roughly, instead of one worker, you have multiple workers who are working in batches:
This is also the approach that Spark takes to process our batches. The working of Spark is slightly different than the diagram above but we will come to the details later, in the next post.

So, why Spark?

In conclusion, we can say that indeed batch processing is much faster than traditional processing. Further, the reason why parallel processing is faster than batch processing is because you operate with multiple groups at the same time instead of just one group at a time. Of course this means we need more resources to achieve this (e.g.: the number of workers required increases in the above diagrams).
Batches which were being run on our mainframe, had very little logic to them. It was about feeding data from one process to another based on a bunch of conditions. Of course what I say is a very very simplified version of how it really looks like. But in summary, there was more work to do in getting the input and writing the output than there was in processing the data in between. Spark fit this choice because, even if we just got rid of the bloated processing of the converted code, we would have practically made no progress with the I/O operation. The logic processing would have pretty much remained the same. Apache Spark blessed us with the power of forgetting about the logic, and concentrating only on the I/O to speed up the processing times. It came with its own challenges, like writing the custom file readers and writers but that's the story of another post!


July 07, 2017

Everyday Motivation

Nights like these which never cease
When the warm summer air refuses to hug
Tickles of sweat shining on forearms
The noise of the silence across the neighborhood
An array of mess waiting to be cleared
Bullet things to do piling on top of other
No drop of saliva wetting a mum mouth
Dishes lying unwashed, laundry won't do itself
A half eaten dinner won't fill up an empty heart

Misses the mere presence of you

April 06, 2017

Weather small talks

From
Hibernating under the heater
Dragging through the dusky days
Wriggling out of winter's wrath
Pulling back from depressing emotions
Crawling out of the sleepy snow
And sobbing into the sleepless froids
To
Stepping into sunny springs
Awaiting the pink blossoms
Drinking beer to live a laugh
Finding reasons to go to the beach
Shedding off the excess fur
Dressing up in hats and gowns
Cheers to longer days and warmer spirits!

March 12, 2017

Immunity

As young kids to learn about roaches
how they've survived
thousands of years
Much before the humans evaded
those days when the dinosaurs thrived
they lived,
in their tiny little bodies with hard rock armors
The ultimate survivors of long lasting famines
when food was scarce, they ate their own
cannibals such
Those pesticides and medicines
they swallowed them all
nothing can destroy them
remnants of repellents they are
In need of no homes,
no particular demands
ugly looking creatures, as strong they are
In awe we remained
of the yet scary resistant mortals
mysterious experienced and immune.

As adults to realize
the cockroaches aren't scary no more
they've emerged far more victorious
than science textbooks care to mention
In men lie their horcrux
in their ability to advance
despite worlds falling apart
Shields of ignorance strengthened by recurrence
attention spans too short to notice embellishments
desires so selfish and hearts merciless
the added audacity to take advantage of the weaker rest
Abusing the abstract, defining their own
men of powerful stature lurking in homes
Not found in corners of pipes but
relaxing in their immune chairs.

Popular Posts