How I began learning Apache Spark in Java

Introduction

This is the first of three articles sharing my experience learning Apache Spark. Here I will go over the QuickStart Tutorial and JavaWordCount Example, including some of the setup, fixes and resources.

  1. https://spark.apache.org/docs/latest/quick-start.html
  2. https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java

Audience

For Java programmers who are interested in learning Apache Spark in Java. No experience is required in Spark or Hadoop, and this will not cover Hadoop.

I’m running my examples on Windows, so my notes may be of more use to Windows users, however Spark runs normally on Linux and Linux documentation is provided by Apache. Similarly, I’m learning Spark in Java, but Spark examples are also available in Scala and Python on the Apache Spark site.

What is Spark?

The Apache Spark project has this to say about itself: “Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general execution graphs. Spark also supports a rich set of higher-level tools including Spark SQL for SQL and structured data processing, MLlib for machine learning, GraphX for graph processing, and Spark Streaming.”
https://spark.apache.org

The Spark Ecosystem

How Spark fits into the Big Data ecosystem is outside the scope of this post, but the following diagram shows Spark’s position in a larger system of components.

Figure 1: Spark Ecosystem

What is Spark?

Spark does map-reduce tasks, manages memory, I/O, data transformation jobs, data filtering, data partitions, streaming, and parallelization and highly distributed computation processes. Spark also has an API for processing regular SQL, as well as ability to parse many data input formats like JSON.

Spark Background

What to expect

Here I will help you through the Apache Spark QuickStart Tutorial and JavaWordCount Example, including some of the setup, fixes and resources.

The Spark ecosystem is daunting.  I talked to a Spark developer, and he said the Apache Spark examples are complicated.

No worries. I will break the code down and share the solutions to problems I came across.

* If you download Apache Spark Examples in Java, you may find that it doesn’t all compile. Some Java code examples were probably translated from Scala and never tested. I will point out some of the modifications necessary to make the code run. In some Apache Java examples you may have to fix the code by changing the DataFrame type to Dataset<Row>.

Before you begin

Prerequisites

  • A Java 8 or later JDK/JRE.
    Confirm the JDK is functional by doing a test application.
  • Apache Maven (for building Java projects)
    Test Maven in your IDE by creating a maven project.
  • IDE An integrated development environment (IDE) will make it easier to work with Java code. (Eclipse and IntelliJ are popular).
    Make sure your IDE and projects use the correct JDK 1.8 or later.
  • Get the readme file which will be read by the application
    https://github.com/apache/spark/blob/master/README.md

Operating Systems

I will install on Windows. The main Apache Spark instructions are for Linux. You can run Spark on Windows, or run Linux on a virtual machine.

Hadoop Software Ecosystem

There are a few necessary components as part of the Spark installation and configuration that are Hadoop-related, and some conventions, but we are not installing Hadoop so don’t worry about that for now.

Part 1: Download / Set up Spark

Do steps 1-6 to prepare Spark Environment

To perform the procedure in this section, you must have installed Java 8 and Maven. See Before You Begin, above.

Operating System Note: These notes are for Windows because the Windows setup is not documented by Apache. Linux and Mac users, your path is straightforward because the online Apache Spark installation instructions are for *nix systems.

1. Download the latest

Get Spark version (for Hadoop 2.7) then extract it using a Zip tool that extracts TGZ files. Here’s the direct link to the Spark 2.4 download file:

https://www.apache.org/dyn/closer.lua/spark/spark-2.4.0/spark-2.4.0-bin-hadoop2.7.tgz

2. Set your environment variables

Create a variable: HADOOP_HOME set to:

<INSTALL_PATH>\spark-2.4.0-bin-hadoop2.7

Figure 2: Create HADOOP_HOME environment variables
Spark Setup

Add Spark applications to the PATH to easily run, add to path variables:

%HADOOP_HOME%\bin

3. Download Hadoop winutils (Windows)

http://public-repo-1.hortonworks.com/hdp-win-alpha/winutils.exe

4. Save WinUtils.exe (Windows)

Install in <INSTALL_PATH>\spark-2.4.0-bin-hadoop2.7\bin\

5. Set up the Hadoop Scratch directory

Create the following folder: C:\tmp\hive

6. Set the Hadoop Hive directory permissions

Open Command console. Navigate to your Spark executables in

<INSTALL_PATH>\spark-2.4.0-bin-hadoop2.7\bin\

Set permissions by typing winutils.exe chmod -R 777 C:\tmp\hive

Windows OS Spark Setup Sources

https://wiki.apache.org/hadoop/WindowsProblems

https://stackoverflow.com/questions/25481325/how-to-set-up-spark-on-windows

https://jaceklaskowski.gitbooks.io/mastering-apache-spark/spark-tips-and-tricks-running-spark-windows.html

Part 2: QuickStart tutorial – Spark console Intro

Now that we’ve installed Spark, we will test that it works and do a first program in the Scala command console.

The Apache Spark QuickStart tutorial consists of the Interactive Analysis with the Spark Shell and the Self-Contained Applications (regular basic Java applications). In this section you will first start the Spark Shell. Next you’ll do a few lines of code in the Spark’s console in the Scala language.

Preparation

  1. If you haven’t had the chance to do so, read through the Apache Quick Start tutorial at https://spark.apache.org/docs/latest/quick-start.html
  2. Download this file to read in as a data source for the tutorial examples: https://github.com/apache/spark/blob/master/README.md

Start the Spark Shell

  1. Open a cmd console
  2. Navigate to your Spark installation bin folder <INSTALL_PATH>\spark-2.4.0-bin-hadoop2.7\bin\
  3. Run the Spark Shell by typing “spark-shell.cmd” and click Enter. (Windows)
  4. Spark takes some time to load. You will see the following screen in your console confirming that Spark has loaded.

Figure 3: Starting the Spark Shell

Shell Screenshot Figure 3

Spark didn’t start?

  1. Check setup of your development environment, and look at your environment variables and path (Step 1:1-6).
  2. Confirm the Spark install directory and that the Spark startup script spark-shell.cmd is in the “bin” folder
  3. Make sure Java is working and that your version is at least 1.8 (prerequisites)
  4. Verify you have no Java version conflicts from multiple JDK/JREs, check “which Java” to see if a different Java is running than you think. Make sure they are not referenced in the Path.

Code Spark interactive analytics in Scala

Prepare to go through the Apache Spark Quick Start tutorial. Read through the tutorial. https://spark.apache.org/docs/latest/quick-start.html

  1. Get the readme file if you haven’t already at https://github.com/apache/spark/blob/master/README.md
  2. Check out the screenshot below.
  3. Notice SparkSession is a free object provided called “spark”
  4. Try the first two lines of code. You should get a number like 105. Or maybe another number. Check your file itself for reasonability.
    textfile = spark.read.text(“<readme file path>/README.md”)
    textFile.count()

Figure 4: Interactive analytics in Spark Shell

Interactive analytics Figure 4

  1. Try completing the rest of the Interactive Analytics example, referencing the code you see in the Figure above and the Apache Quick Start tutorial at https://spark.apache.org/docs/latest/quick-start.html

Part 3: QuickStart tutorial – Coding an Application

Moving forward into coding a Spark Java application. Start out by generating the framework for the project with Maven, then to download and build the Spark library dependencies as well. Finally we fix, compile, and run a simple Java Spark application.

Prerequisites

  1. If you haven’t had the chance, read read through the Apache Quick Start tutorial at https://spark.apache.org/docs/latest/quick-start.html
  2. Set up a new Java Maven project with a Simple or Quickstart archetype. Run a Java HelloWorld as Maven application and get a clean build.

Steps to Build the Spark Maven application

  1. Open your new maven project configuration file Pom.xml
  2. Copy the Quick Start tutorial maven configuration code below and paste over the <projects> or just add the Spark <dependency> section to your Pom.xml file
<project>
<groupId>edu.berkeley</groupId>
<artifactId>simple-project</artifactId>
<modelVersion>4.0.0</modelVersion>
<name>Simple Project</name>
<packaging>jar</packaging>
<version>1.0</version>
<dependencies>
<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.4.0</version>
</dependency>
</dependencies>
</project>

 

  1. Download your Spark libraries automatically, before you try to compile your code, by running Maven – Update Project.
  2. Confirm that your Maven application still works by running a HelloWorld.java test again.
  3. Create a new Java class called SimpleApp.
  4. Copy the Java code from the QuickStart tutorial and paste it over your empty SimpleApp java class.
  5. Run Maven – Update Project to compile project and update any libraries.
  6. Run your application as a “Java Application” and see what happens. It will break, unless you’ve already figured the next part out.

Fix and Run SimpleApp

Prepare your code to run with steps 1-3

The code needs a couple modifications to run. Set the file path location to the Spark Readme file, and configure Spark for a single workstation.

1. First, your imports in your Java class must compile. This confirms that your Spark libraries are installed properly.

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.Dataset;

2. Download the Readme file if you haven’t already at https://github.com/apache/spark/blob/master/README.md

Copy the file to your project’s resources directory, or some other convenient location. You need to reference the path from the code in SimpleApp, to the readme file on your system.

String logFile = "YOUR_SPARK_HOME/README.md";

3. The Spark Application first needs a Spark Session, which is manually configured. Configure the SimpleApp java code Spark Session to run on your local workstation by setting the master URL to local. This modification may need to be done in most Spark example/tutorial applications. Copy this and keep it handy.

a) Notice that this code is missing from the Spark Session creation statement in SimpleApp:

.config("spark.master", "local")

b) Here is the current code in SimpleApp, which you will modify:

SparkSession spark = SparkSession.builder()
.appName("Simple Application").getOrCreate();

c) Copy the code below and overwrite the code above in SimpleApp.

SparkSession spark = SparkSession .builder()
.appName("SimpleApp") .config("spark.master", "local").getOrCreate();

Now compile/build and run SimpleApp as a Java Application.

Study the Code

Core Spark applications are basically constructed around three main steps.

  1. Ingest – read from a data source and store in a Dataset (extract).The data sources and data storage can be anything —a Hadoop file (HDFS), a csv file, database, memory, or a stream.
  2. Transform – if desired, modify the data, to fit another data model, or do Map Reduce. (In the Wordcount example we will do Map Reduce but won’t transform the data itself.)
  3. Take Action – do something with the data, for example count or store.

Ingesting the file (step 1 above)

This means reading a file (the Readme document) into the SimpleApp and storing it in the Spark Dataset data structure (in memory). Dataset is a kind of Resilient Distributed Dataset (RDD), a high-performance read-only distributed data structure used by Spark.

Dataset<String> logData = spark.read().textFile(logFile).cache();

The above code loads Spark’s Readme.md file into a Dataset data structure called logData, so it can then be transformed, analysed and acted upon.

Read about the Dataset API here: https://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/sql/Dataset.html

Taking Action: Counting

There is no Transformation in this tutorial, because we don’t manipulate the Dataset or create a new Dataset. Instead we’ll be counting. The WordCount Application counts rows in the file containing the letter “a”, and also count rows with the letter “b”. Note that it doesn’t count the actual a’s or b’s.

The logData object type is a container for String objects, Dataset<String>. In the code below, the lambda variable “s” refers to a row. In this way the entire file is filtered row by row and each line with a letter “a” is counted.

The lambda part: function contains() is passed to filter() and then counted. Without coding a loop, all in one line of code. Under the surface at runtime it still has to loop through all the rows (like array elements) checking for letter a or b!

long numAs = logData.filter(s -> s.contains("a")).count();

Follow along with the tutorial and code the same thing to count the number of rows with the letter “b”.

Clean up your resources

Close the Spark session, which takes up memory.

spark.stop();

Run the Spark application

Run the SimpleApp as a Java Application, since that is what it is. The console should show the Spark log and the SimpleApp count output as well (your numbers may differ from mine):

Lines with a: 7, lines with b: 3

Part 4: Spark JavaWordCount Map-Reduce Example

The WordCount example is just one of many Spark learning and reference examples on the Apache Spark Github repository.

https://github.com/apache/spark/blob/master/examples/src/main/java/org/apache/spark/examples/JavaWordCount.java

Preparation

Set up a source file to load and count the words in, preferably a text file with rows of words. You can download all the Apache examples including the resource files to upload from their GitHub site

https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples

Fixing the JavaWordCount class

I modify the code to read in a file instead of from stdin, and I also add the master-url configuration as described above to run on a single workstation.

1. Read in from a File, not std input

Remove the standard-input code block (we’ll read a file)

if (args.length < 1) {
System.err.println("Usage: JavaWordCount <file>");
System.exit(1);
}

2. Modify the Spark Session configuration

Here is the original configuration which you’ll modify. Note it is missing the Spark Master URL setting.

SparkSession spark = SparkSession
.builder()
.appName("JavaWordCount")
.getOrCreate();

Replace the Spark Session code above with the code below.

SparkSession spark = SparkSession .builder()
.appName("JavaWordCount") .config("spark.master", "local").getOrCreate();

3. Set up a file to be read

The files are in the Spark examples resources folder. This is the WordCount exercise, so we want words to count, and we want to read them in from a file. So, you may copy all the resource files in your Spark examples folder; put them in a folder called resources in your maven Java application. You may also download the entire Spark Examples codebase from github.

Once I have the file in place, I code a path String in the form of path-to-file + filename.ext.

String file = "YOUR_SPARK_HOME/README.md";

4. Update the code

Modify the data source. The read() function loads the standard input. Since I’m modifying the code to read a text file instead of std input, I update this line of code:

JavaRDD<String> lines = spark.read().textFile(args[0]).javaRDD();

I recode it to accept a file as the parameter.

JavaRDD<String> lines = spark.read().textFile(file).javaRDD();

5. Run your Spark Application

Confirm that you’re getting an accurate word-count based upon your input file.

Analysing WordCount application

Analyzing the word count of an input file consists of four tasks:

  1. Ingest
  2. Transform (Parse)
  3. Transform (Map)
  4. Transform (Reduce)

The following sections describe each of these tasks in detail.

1. Ingest

First the code reads in a file and parses the lines into a Spark datatype called a JavaRDD, which will be the structure for storing lines of text for the purposes of this example. Note the method at the end of the statement, javaRDD() which returns the data structure object. JavaRDD is short for Java Resilient Distributed Dataset.

JavaRDD<String> lines = spark.read().textFile(file).javaRDD();

2. Transform (Parse)

The row data set will be transformed into a word data set. This statement calls the flatMap function and tokenizes the lines of words into a list of words, based on spaces (defined in a pattern). Here is the lambda idiom for tokenizing the file into words separated by spaces, where SPACE refers to a constant pattern “ “ to perform a split. They rows must be iterated through and the words list copied into another JavaRDD data structure.

JavaRDD<String> words = lines.flatMap(s -> Arrays.asList(SPACE.split(s)).iterator());

3. Transform (Map)

This is a critical Map stage. Every word gets a 1 assigned to it, to account for itself when the words are counted. You can see that at the far end of the statement, the code is putting a 1 into the Tuple2. What’s a Tuple for? Its for any list of things. A Tuple2 contains two objects.

Changing ones variable to wordToOneMap is more descriptive. Go ahead, rename your variables if you helps you understand your code!

Variable name: ones

JavaPairRDD<String, Integer> ones = words.mapToPair(s -> new Tuple2<>(s, 1));

I suggest a better variable name: wordToOneMap

JavaPairRDD<String, Integer> wordToOneMap = words.mapToPair(s -> new Tuple2<>(s, 1));

4. Transform (Reduce)

In the Reduce stage, each word is ultimately paired up with the total count for that word.

The reduceByKey function is applied to the wordToOneMap. The value of each word key is 1, so when they are added up the word is mapped to the count sum, instead of 1.

I also suggest the variable counts should be changed to wordToCountMap.

Old object name: counts

JavaPairRDD<String, Integer> counts = ones.reduceByKey((i1, i2) -> i1 + i2)

New, better object name: wordToCountMap

JavaPairRDD<String, Integer> wordToCountMap = wordToOneMap.reduceByKey((i1, i2) -> i1 + i2)

The WordCount Map Reduce breakdown

  1. The code starts out parsing lines into rows with the read() function.
  2. Each row becomes a list of words with the flatMap() function. At this point we have an array of arrays, or a two-dimensional array. Since we are counting words, we don’t even care which row the word is in.
  3. All the words go into a Tuple pair structure with the mapToPair() function
  4. Finally, in the reduceByKey() the count is stored. For example, instead of twenty instances of the pair (spark, 1) we now how one instance of the pair (spark, 20). The data is compressed.

More Apache Spark Java Examples

More Spark examples can be found on the Apache Spark web site and git hub. There are also more specialized Spark APIs like Streaming, SQL and Machine Learning.

https://github.com/apache/spark/tree/master/examples/src/main/java/org/apache/spark/examples

Conclusion

In conclusion, starting out with Spark is pretty easy if you know what the pitfalls are. Spark has an API for Scala, Python, and Java. There are lots of resources to help you get going. In this article we looked at working with Spark on local, standalone machines. Stand-alone works for learning. However it misses the point of Spark, which is to run highly parallel processing jobs on distributed clusters.

This was the first of three posts sharing my experience learning Apache Spark. Stay tuned for my next article in the series, running Spark on virtual machines and the cloud.

#analytics #bigdata #programmertutorial #howto #hadoop #spark #streaming #mapreduce