A dummy's introduction is split in 4 parts:
Consider the following the original code of one batch. It opens an input file, calls a process function on each of its rows and writes each of the processed input to an output file. It also prints the number of input rows in the end.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | private static final String INPUT_FILENAME = "E:\\test\\input.txt"; private static final String OUTPUT_FILENAME = "E:\\test\\output.txt"; public static void main() { FileReader fileReader = new FileReader(INPUT_FILENAME); FileWriter fileWriter = new FileWriter(OUTPUT_FILENAME); String inputline = null; String outputline = null; int inputCounter = 0; BufferedReader bufferedReader = new BufferedReader(fileReader); BufferedWriter bufferedWriter = new BufferedWriter(fileWriter); while ((inputline = bufferedReader.readLine()) != null) { outputline = process(inputline); bufferedWriter.write(outputline); inputCounter++; } System.out.println(inputCounter); } public static String process(String input) { return input.toUpperCase(); } |
If we were to try to convert this existing batch to Spark, the following points could be handy:
- Create the Spark Context
- Use it to read the whole file as an RDD
- Instead of calling process() on each line, just map the input RDD into a new processed RDD
- Since we won't be running a loop for every line, how we calculate input count? Well, the process() function is called for every record in the RDD, so we create an Accumulator and increment its value every time it goes in the process() function.
- Finally, we save the input as a text file, and done!
Here's converted code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | private static final String INPUT_FILENAME = "E:\\test\\input.txt"; private static final String OUTPUT_FILENAME = "E:\\test\\output.txt"; public static void main() { SparkConf conf = new SparkConf().setAppName(appName).setMaster(master); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD input = sc.textFile(INPUT_FILENAME); Accumulator inputCounter = sc.doubleAccumulator(0); input.map(row -> process(row, inputCounter)); input.saveAsTextFile(OUTPUT_FILENAME ); System.out.println(inputCounter.value().longValue()); //You could avoid the Accumulator and print the count by the count() function. //But its not a good idea. Read below why } public static String process(String input, Accumulator inputCounter) { inputCounter.add(1d); return input.toUpperCase(); } |
Why didn't we use count() for printing the count of all lines? Because it is another action. We are already calling one action - saveAsTextFile. Once this action is performed, the input RDD is wiped off the memory. So if we write another action (say, input.count() ) then Spark will re-create the entire input RDD just to call one count function. To avoid multiple processing for multiple actions, we just use Accumulators instead.
Some people persist the RDD in the memory for the count() function. This is still not as good, because persisting the RDD in the disk will block a sizable part of the memory and the access time will increase too.
Some people persist the RDD in the memory for the count() function. This is still not as good, because persisting the RDD in the disk will block a sizable part of the memory and the access time will increase too.