Manual

manual

User Manual:

Open the PDF directly: View PDF PDF.
Page Count: 20

DownloadManual
Open PDF In BrowserView PDF
Supercomputing for Big Data – Lab Manual
R.P. Hes

T.C. Leliveld

Contents
Contents

1

Introduction
Before You Start . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .
Goal of this Lab . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . .

1
2
2

Lab 1
Scala . . . . . . . . . . . . . . . . . .
Apache Spark . . . . . . . . . . . .
Resilient Distributed Datasets
Dataframe and Dataset . . . .
SBT . . . . . . . . . . . . . . . . . .
The GDelt Project . . . . . . . . . .
Deliverables . . . . . . . . . . . . .
Questions . . . . . . . . . . . . . . .

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

.
.
.
.
.
.
.
.

3
3
4
4
8
13
17
19
19

Introduction
In this lab we will put the concepts that are central to Supercomputing with Big
Data in some practical context. We will analyze a large open data set and identify a
way of processing it efficiently using Apache Spark and the Amazon Web Services
(AWS). The data set in question is the GDelt 2.0 Global Knowledge Graph (GKG),
which indexes persons, organizations, companies, locations, themes, and even
emotions from live news reports in print, broadcast and internet sources all over
the world. We will use this data to construct a histogram of the topics that are
most popular on a given day, hopefully giving us some interesting insights into
the most important themes in recent history.
Feedback is appreciated! The lab files will be hosted on GitHub. Feel free to make
issues and/or pull requests to suggest or implement improvements.

1

Before You Start
The complete data set we will be looking at in lab 2 weighs in at several terabytes,
so we need some kind of compute and storage infrastructure to run the pipeline.
In this lab we will use Amazon AWS to facilitate this. As a student you are eligible
for credits on this platform. We would like you to register for the GitHub Student
Developer Pack, as soon as you decide to take this course. This gives you access
to around 100 dollars worth of credits. This should be ample to complete lab 2.
Note that you need a credit card to apply1 . Don’t forget to follow to register on
AWS using the referral link from Github.
Make sure you register for these credits as soon as possible! You can always
send an email to the TAs if you run into any trouble.
Before the end of the first week (Sunday 09/09/18), please send your TUDelft
email address to the TAs to register for the lab

Goal of this Lab
The goal of this lab is to:
• familiarize yourself with Apache Spark, the MapReduce programming
model, and Scala as a programming language;
• learn how to characterize your big data problem analytically and practically
and what machines best fit this profile;
• get hands-on experience with cloud-based systems;
• learn about the existing infrastructure for big data and the difficulties with
these; and
• learn how an existing application should be modified to function in a streaming data context.
You will work in groups of two. In this lab manual we will introduce a big data
pipeline for identifying important events from the GDelt Global Knowledge
Graph (GKG).
In lab 1, you will start by writing a Spark application that processes the GDelt
dataset. You will run this application on a small subset of data on your local
computer. You will use this to
1. get familiar with the Spark APIs,
2. analyze the application’s scaling behavior, and
3. draw some conclusions on how to run it efficiently in the cloud.
It is up to you how you want to define efficiently, which can be in terms of performance, cost, or a combination of the two.
You may have noticed that the first lab does not contain any supercomputing, let
alone big data. For lab 2, you will deploy your code on AWS, in an actual big data
1 In case you don’t have a credit card: In previous years, students have used prepaid credit cards
(available online) to register.

2

cluster, in an effort to scale up your application to process the complete dataset,
which measures several terabytes. It is up to you to find the configuration that
will get you the most efficiency, as per your definition in lab 1.
For the final lab, we will modify the code from lab 1 to work in a streaming data
context. You will attempt to rewrite the application to process events in real-time,
in a way that is still scalable over many machines.

Lab 1
In this lab, we will design and develop the code in Spark to process GDelt data,
which will be used in lab 2 to scale the analysis to the entire dataset. We will first
give a brief introduction to the various technologies used in this lab.

Scala
Apache Spark, our big data framework of choice for this lab, is implemented in
Scala, a compiled language on the JVM that supports a mix between functional
and object-oriented programming. It is compatible with Java libraries. Some reasons why Spark was written in Scala are:
1. Compiling to the JVM makes the codebase extremely portable and deploying applications as easy as sending the Java bytecode (typically packaged in
a Java ARchive format, or JAR). This simplifies deploying to cloud provider
big data platforms as we don’t need specific knowledge of the operating
system, or even the underlying architecture.
2. Compared to Java, Scala has some advantages in supporting more complex
types, type inference, and anonymous functions2 . Matei Zaharia, Apache
Spark’s original author, has said the following about why Spark was implemented in Scala in a Reddit AMA:
At the time we started, I really wanted a PL that supports a
language-integrated interface (where people write functions
inline, etc), because I thought that was the way people would
want to program these applications after seeing research systems
that had it (specifically Microsoft’s DryadLINQ). However, I
also wanted to be on the JVM in order to easily interact with the
Hadoop filesystem and data formats for that. Scala was the only
somewhat popular JVM language then that offered this kind of
functional syntax and was also statically typed (letting us have
some control over performance), so we chose that. Today there
might be an argument to make the first version of the API in Java
with Java 8, but we also benefitted from other aspects of Scala in
Spark, like type inference, pattern matching, actor libraries, etc.
2 Since Java 8, Java also supports anonymous functions, or lambda expression, but this version
wasn’t released at the time of Spark’s initial release.

3

Apache Spark provides interfaces to Scala, R, Java and Python, but we will be
using Scala to program in this lab. An introduction to Scala can be found on the
Scala language site. You can have a brief look at it, but you can also pick up topics
as you go through the lab.

Apache Spark
Apache Spark provides a programming model for a resilient distributed shared
memory model. To elaborate on this, Spark allows you to program against a unified
view of memory (i.e. RDD or DataFrame), while the processing happens distributed
over multiple nodes/machines/computers/servers being able to compensate for failures
of these nodes.
This allows us to define a computation and scale this over multiple machines without having to think about communication, distribution of data, and potential failures of nodes. This advantage comes at a cost: All applications have to comply
with Spark’s (restricted) programming model.
The programming model Spark exposes is based around the MapReduce
paradigm. This is an important consideration when you would consider using
Spark, does my problem fit into this paradigm?
Modern Spark exposes two APIs around this programming model:
1. Resilient Distributed Datasets
2. Spark SQL Dataframe/Datasets
We will consider both here shortly.

Resilient Distributed Datasets
RDDs are the original data abstraction used in Spark. Conceptually one can think
of these as a large, unordered list of Java/Scala/Python objects, let’s call these
objects elements. This list of elements is divided in partitions (which may still
contain multiple elements), which can reside on different machines. One can operate on these elements with a number of operations, which can be subdivided in
wide and narrow dependencies, see tbl. 1. An illustration of the RDD abstraction
can be seen in fig. 1.
RDDs are immutable, which means that the elements cannot be altered, without
creating a new RDD. Furthermore, the application of transformations (wide or
narrow) is lazy evaluation, meaning that the actual computation will be delayed
until results are requested (an action in Spark terminology). When applying transformations, these will form a directed acyclic graph (DAG), that instructs workers
what operations to perform, on which elements to find a specific result. This can
be seen in fig. 1 as the arrows between elements.

4

RDD A

RDD B
rdd.map((a,b) => (a, b+1))

RDD C
rdd.reduceByKey(_ + _)

(‘b’, 2)
(‘a’, 4)

(‘b’, 3)
(‘a’, 5)

(‘a’, 14)

Partition 2

(‘a’, 5)
(‘e’, 1)

(‘a’, 6)
(‘e’, 2)

(‘e’, 6)

Partition 3

(‘c’, 1)
(‘e’, 3)

(‘c’, 2)
(‘e’, 4)

(‘c’, 2)

(‘a’, 2)
(‘b’, 4)

(‘a’, 3)
(‘b’, 5)

(‘b’, 5)

Partition 1

Machine 1

Machine 2
Partition 4

Narrow Dependency

Wide Dependency

Figure 1: Illustration of RDD abstraction of an RDD with a tuple of characters and
integers as elements.
Table 1: List of wide and narrow dependencies for (pair) RDD operations
Narrow Dependency
map
mapValues
flatMap
filter
mapPartitions
mapPartitionsWithIndex
join with sorted keys

Wide
Dependency
coGroup
flatMap
groupByKey
reduceByKey
combineByKey
distinct
join
intersection
repartition
coalesce
sort

Now that you have an idea of what the abstraction is about, let’s demonstrate
some example code with the Spark shell. If you want to paste pieces of code into the
spark shell from this guide, it might be useful to copy from the github version, and use the
:paste command in the spark shell to paste the code. Hit ctrl+D to stop pasting.
$ spark-shell
2018-08-29 12:56:07 WARN NativeCodeLoader:62 - Unable to load native-hadoop...
Setting default log level to "WARN".
For SparkR,...
To adjust logging level use sc.setLogLevel(newLevel).
5
Spark context Web UI available at http://
Spark context available as 'sc' (master = local[*], app id = local-1535540172727).
Spark session available as 'spark'.
Welcome to
____
__
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\
version 2.3.1

Going back to our shell, let’s first create some sample data that we can demonstrate
the RDD API around. Here we create an infinite list of repeating characters from
‘a’ tot ‘z’.
scala> val charsOnce = ('a' to 'z').toStream
charsOnce: scala.collection.immutable.Stream[Char] = Stream(a, ?)
scala> val chars: Stream[Char] = charsOnce #::: chars
chars: Stream[Char] = Stream(a, ?)

Now we build a collection with the first 200000 integers, zipped with the character
stream. We display the first 30 results.
scala> val rdd = sc.parallelize(chars.zip(1 to 200000), numSlices=20)
rdd: org.apache.spark.rdd.RDD[(Char, Int)] =
ParallelCollectionRDD[0] at parallelize at :26
scala> rdd.take(30)
res2: Array[(Char, Int)] = Array((a,1), (b,2), (c,3), (d,4), (e,5), (f,6),
(g,7), (h,8), (i,9), (j,10), (k,11), (l,12), (m,13), (n,14), (o,15), (p,16),
(q,17), (r,18), (s,19), (t,20), (u,21), (v,22), (w,23), (x,24), (y,25), (z,26),
(a,27), (b,28), (c,29), (d,30))

Let’s dissect what just happened. We created a Scala object that is a list of tuples of Chars and Ints in the statement (chars).zip(1 to 200000). With
sc.parallelize we are transforming a Scala sequence into an RDD. This allows
us to enter Spark’s programming model. With the optional parameter numSlices
we indicate in how many partitions we want to subdivide the sequence.
Let’s apply some (lazily evaluated) transformations to this RDD.
scala> val mappedRDD = rdd.map({case (chr, num) => (chr, num+1)})
mappedRDD: org.apache.spark.rdd.RDD[(Char, Int)] =
MapPartitionsRDD[5] at map at :25

We apply a map to the RDD, applying a function to all the elements in the RDD. The
function we apply pattern matches over the elements as being a tuple of (Char,
Int), and add one to the integer. Scala’s syntax can be a bit foreign, so if this is
confusing, spend some time looking at tutorials and messing around in the Scala
interpreter.
You might have noticed that the transformation completed awfully fast. This is
Spark’s lazy evaluation in action. No computation will be performed until an
action is applied.
scala> val reducedRDD = rdd.reduceByKey(_ + _)
reducedRDD: org.apache.spark.rdd.RDD[(Char, Int)] =
ShuffledRDD[6] at reduceByKey at :25

6

Now we apply a reduceByKey operation, grouping all of the identical keys together and merging the results with the specified function, in this case the + operator.
Now we will perform an action, which will trigger the computation of the transformations on the data. We will use the collect action, which means to gather all
the results to the master, going out of the Spark programming model, back to a
Scala sequence. How many elements do you expect there to be in this sequence
after the previous transformations?
scala> reducedRDD.collect
res3: Array[(Char, Int)] = Array((d,769300000), (x,769253844), (e,769307693),
(y,769261536), (z,769269228), (f,769315386), (g,769323079), (h,769330772),
(i,769138464), (j,769146156), (k,769153848), (l,769161540), (m,769169232),
(n,769176924), (o,769184616), (p,769192308), (q,769200000), (r,769207692),
(s,769215384), (t,769223076), (a,769276921), (u,769230768), (b,769284614),
(v,769238460), (w,769246152), (c,769292307))

Typically, we don’t build the data first, but we actually load it from a database or
file system. Say we have some data in (multiple) files in a specific format. As an
example consider sensordata.csv (in the example folder). We can load it as
follows
// sc.textFile can take multiple files as argument!
scala> val raw_data = sc.textFile("sensordata.csv")
raw_data: org.apache.spark.rdd.RDD[String] =
sensordata.csv MapPartitionsRDD[1] at textFile at :24
scala> raw_data.take(10).foreach(println)
COHUTTA,3/10/14:1:01,10.27,1.73,881,1.56,85,1.94
COHUTTA,3/10/14:1:02,9.67,1.731,882,0.52,87,1.79
COHUTTA,3/10/14:1:03,10.47,1.732,882,1.7,92,0.66
COHUTTA,3/10/14:1:05,9.56,1.734,883,1.35,99,0.68
COHUTTA,3/10/14:1:06,9.74,1.736,884,1.27,92,0.73
COHUTTA,3/10/14:1:08,10.44,1.737,885,1.34,93,1.54
COHUTTA,3/10/14:1:09,9.83,1.738,885,0.06,76,1.44
COHUTTA,3/10/14:1:11,10.49,1.739,886,1.51,81,1.83
COHUTTA,3/10/14:1:12,9.79,1.739,886,1.74,82,1.91
COHUTTA,3/10/14:1:13,10.02,1.739,886,1.24,86,1.79

We can process this data to filter only measurements on 3/10/14:1:01.
scala> val filterRDD = raw_data.map(_.split(","))
.filter(x => x(1) == "3/10/14:1:01")
filterRDD: org.apache.spark.rdd.RDD[Array[String]] =
MapPartitionsRDD[11] at filter at :25
scala> filterRDD.foreach(a => println(a.mkString(" ")))
COHUTTA 3/10/14:1:01 10.27 1.73 881 1.56 85 1.94
LAGNAPPE 3/10/14:1:01 9.59 1.602 777 0.09 88 1.78

7

NANTAHALLA 3/10/14:1:01 10.47 1.712 778 1.96 76 0.78
CHER 3/10/14:1:01 10.17 1.653 777 1.89 96 1.57
THERMALITO 3/10/14:1:01 10.24 1.75 777 1.25 80 0.89
ANDOUILLE 3/10/14:1:01 10.26 1.048 777 1.88 94 1.66
BUTTE 3/10/14:1:01 10.12 1.379 777 1.58 83 0.67
MOJO 3/10/14:1:01 10.47 1.828 967 0.36 77 1.75
CARGO 3/10/14:1:01 9.93 1.903 778 0.55 76 1.44
BBKING 3/10/14:1:01 10.03 0.839 967 1.17 80 1.28

You might have noticed that this is a bit tedious to work with, as we have to convert
everything to Scala objects, and aggregations rely on having a pair RDD, which is
fine when we have a single key, but for more complex aggregations, this becomes
a bit tedious to juggle with.

Dataframe and Dataset
Our previous example is quite a typical use case for Spark. We have a big data
store of some structured (tabular) format (be it csv, JSON, parquet, or something
else) that we would like to analyse, typically in some SQL-like fashion. Manually
applying operations to rows like this is both labour intensive, and inefficient, as
we have knowledge of the ‘schema’ of data. This is where DataFrames originate
from. Spark has an optimized SQL query engine that can optimize the compute
path as well as provide a more efficient representation of the rows when given a
schema. From the Spark SQL, DataFrames and Datasets Guide:
Spark SQL is a Spark module for structured data processing. Unlike
the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data
and the computation being performed. Internally, Spark SQL uses
this extra information to perform extra optimizations. There are several ways to interact with Spark SQL including SQL and the Dataset
API. When computing a result the same execution engine is used, independent of which API/language you are using to express the computation. This unification means that developers can easily switch
back and forth between different APIs based on which provides the
most natural way to express a given transformation.
Under the hood, these are still immutable distributed collections of data (with the
same compute graph semantics, only now Spark can apply extra optimizations
because of the (structured) format.
Let’s do the same analysis as last time using this API. First we will define a schema.
Let’s take a look at a single row of the csv:
COHUTTA,3/10/14:1:01,10.27,1.73,881,1.56,85,1.94

So first a string field, a date, a timestamp, and some numeric information. We can
thus define the schema as such:

8

val schema =
StructType(
Array(
StructField("sensorname", StringType, nullable=false),
StructField("timestamp", TimestampType, nullable=false),
StructField("numA", DoubleType, nullable=false),
StructField("numB", DoubleType, nullable=false),
StructField("numC", LongType, nullable=false),
StructField("numD", DoubleType, nullable=false),
StructField("numE", LongType, nullable=false),
StructField("numF", DoubleType, nullable=false)
)
)

If we import types first, and then enter this in our interactive shell we get the
following:
scala> :paste
// Entering paste mode (ctrl-D to finish)
import org.apache.spark.sql.types._
val schema =
StructType(
Array(
StructField("sensorname", StringType, nullable=false),
StructField("timestamp", TimestampType, nullable=false),
StructField("numA", DoubleType, nullable=false),
StructField("numB", DoubleType, nullable=false),
StructField("numC", LongType, nullable=false),
StructField("numD", DoubleType, nullable=false),
StructField("numE", LongType, nullable=false),
StructField("numF", DoubleType, nullable=false)
)
)

// Exiting paste mode, now interpreting.
import org.apache.spark.sql.types._
schema: org.apache.spark.sql.types.StructType =
StructType(StructField(sensorname,StringType,false),
StructField(timestamp,TimestampType,false), StructField(numA,DoubleType,false),
StructField(numB,DoubleType,false), StructField(numC,LongType,false),
StructField(numD,DoubleType,false), StructField(numE,LongType,false),
StructField(numF,DoubleType,false))

An overview of the different Spark SQL types can be found online. For the timestamp field we need to specify the format according to the Java date format—in

9

our case MM/dd/yy:hh:mm. Tying this all together we can build a Dataframe like
so.
scala> :paste
// Entering paste mode (ctrl-D to finish)
val df = spark.read
.schema(schema)
.option("timestampFormat", "MM/dd/yy:hh:mm")
.csv("./sensordata.csv")
// Exiting paste mode, now interpreting.
df: org.apache.spark.sql.DataFrame =
[sensorname: string, timestamp: date ... 6 more fields]
scala> df.printSchema
root
|-- sensorname: string (nullable = true)
|-- timestamp: timestamp (nullable = true)
|-- numA: double (nullable = true)
|-- numB: double (nullable = true)
|-- numC: long (nullable = true)
|-- numD: double (nullable = true)
|-- numE: long (nullable = true)
|-- numF: double (nullable = true
scala> df.take(10).foreach(println)
[COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94]
[COHUTTA,2014-03-10 01:02:00.0,9.67,1.731,882,0.52,87,1.79]
[COHUTTA,2014-03-10 01:03:00.0,10.47,1.732,882,1.7,92,0.66]
[COHUTTA,2014-03-10 01:05:00.0,9.56,1.734,883,1.35,99,0.68]
[COHUTTA,2014-03-10 01:06:00.0,9.74,1.736,884,1.27,92,0.73]
[COHUTTA,2014-03-10 01:08:00.0,10.44,1.737,885,1.34,93,1.54]
[COHUTTA,2014-03-10 01:09:00.0,9.83,1.738,885,0.06,76,1.44]
[COHUTTA,2014-03-10 01:11:00.0,10.49,1.739,886,1.51,81,1.83]
[COHUTTA,2014-03-10 01:12:00.0,9.79,1.739,886,1.74,82,1.91]
[COHUTTA,2014-03-10 01:13:00.0,10.02,1.739,886,1.24,86,1.79]

We can perform the same filtering operation as before in a couple of ways. We can
use really error prone SQL queries (not recommended unless you absolutely love
SQL and like debugging these command strings, this took me about 20 minutes
to get right).
scala> df.createOrReplaceTempView("sensor")
scala> val dfFilter = spark.sql("SELECT * FROM sensor
WHERE timestamp=TIMESTAMP(\"2014-03-10 01:01:00\")")
// I think the newline in the multiline string breaks it if you paste it
dfFilter: org.apache.spark.sql.DataFrame =
[sensorname: string, timestamp: timestamp ... 6 more fields]

10

scala> dfFilter.collect.foreach(println)
[COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94]
[NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78]
[THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89]
[BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67]
[CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44]
[LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78]
[CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57]
[ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66]
[MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75]
[BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28]

A slightly more sane and type-safe way would be to do the following.
scala> val dfFilter = df.filter("timestamp = TIMESTAMP(\"2014-03-10 01:01:00\")")
dfFilter: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] =
[sensorname: string, timestamp: timestamp ... 6 more fields]
scala> dfFilter.collect.foreach(println)
[COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94]
[NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78]
[THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89]
[BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67]
[CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44]
[LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78]
[CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57]
[ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66]
[MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75]
[BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28]

But this is still quite error-prone as writing these strings contains no typechecking.
This is not a big deal when writing these queries in an interactive environment on
a small dataset, but can be quite time consuming when there’s a typo at the end of
a long running job that means two hours of your (and the cluster’s) time is wasted.
This is why the Spark community developed the Dataset abstraction. It is a sort of
middle ground between Dataframes and RDDs, where you get some of the type
safety of RDDs by operating on a case class (also known as product type). This
allows us to use the compile-time typechecking on the product types, whilst still
allowing Spark to optimize the query and storage of the data by making use of
schemas.
Let’s dive in some code, first we need to define a product type for a row.
scala> import java.sql.Timestamp
import java.sql.Timestamp
scala> :paste
// Entering paste mode (ctrl-D to finish)

11

case class SensorData (
sensorName: String,
timestamp: Timestamp,
numA: Double,
numB: Double,
numC: Long,
numD: Double,
numE: Long,
numF: Double
)
// Exiting paste mode, now interpreting.
defined class SensorData

Now we can convert a Dataframe (which actually is just an untyped Dataset) to a
typed Dataset using the as method.
scala> :paste
// Entering paste mode (ctrl-D to finish)
val ds = spark.read .schema(schema)
.option("timestampFormat", "MM/dd/yy:hh:mm")
.csv("./sensordata.csv")
.as[SensorData]
// Exiting paste mode, now interpreting.
ds: org.apache.spark.sql.Dataset[SensorData] =
[sensorname: string, timestamp: timestamp ... 6 more fields]

Now we can apply compile time type-checked operations.
scala> val dsFilter = ds.filter(a => a.timestamp ==
new Timestamp(2014 - 1900, 2, 10, 1, 1, 0, 0))
dsFilter: org.apache.spark.sql.Dataset[SensorData] =
[sensorname: string, timestamp: timestamp ... 6 more fields]
scala> dsFilter.collect.foreach(println)
SensorData(COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94)
SensorData(NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78)
SensorData(THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89)
SensorData(BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67)
SensorData(CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44)
SensorData(LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78)
SensorData(CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57)
SensorData(ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66)

12

SensorData(MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75)
SensorData(BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28)

This provides us with more guarantees that are queries are valid (atleast on a type
level).
This was a brief overview of the 2 (or 3) different Spark APIs. You can
always find more information on the programming guides for RDDs and
Dataframes/Datasets and in the Spark documentation

SBT
We showed how to run Spark in interactive mode. Now we will explain how to
build applications, that can be submitted using the spark-submit command.
First, we will explain how to structure a Scala project, using the SBT build tool.
The typical project structure is
├── build.sbt
├── project
│

└── build.properties

└── src
└── main
└── scala
└── example.scala

This is typical for JVM languages. More directories are added under the scala
folder to resemble the package structure.
The project’s name, dependencies, and versioning is defined in the build.sbt
file. An example build.sbt file is
ThisBuild / scalaVersion := "2.11.12"
lazy val example = (project in file("."))
.settings(
name := "Example project",
)

This specifies the Scala version of the project (2.11.12) and the name of the project.
If you run sbt in this folder it will generate the project directory and
build.properties. build.properties contains the SBT version that is
used to build the project with, for backwards compatibility.
Open example.scala and add the following
package example

13

object Example {
def main(args: Array[String]) {
println("Hello world!")
}
}

Run sbt in the root folder (the one where build.sbt is located). This puts you
in interactive mode of SBT. We can compile the sources by writing the compile
command.
$ sbt
[info] Loading project definition from ...
[info] Loading settings for project hello from build.sbt ...
[info] Set current project to Example project ...
[info] sbt server started at ...
sbt:Example project> compile
[success] Total time: 0 s, completed Sep 3, 2018 1:56:10 PM

We can try to run the application by typing run.
sbt:Example project> run
[info] Running example.Example
Hello world!
[success] Total time: 1 s, completed Sep 3, 2018 2:00:08 PM

Now let’s add a function to example.scala.
object Example {
def addOne(tuple: (Char, Int)) : (Char, Int) = tuple match {
case (chr, int) => (chr, int+1)
}
def main(args: Array[String]) {
println("Hello world!")
println(addOne('a', 1))
}
}

In your SBT session we can prepend any command with a tilde (~) to make them
run automatically on source changes.
sbt:Example project> ~run
[info] Compiling 1 Scala source to ...
[info] Done compiling.
[info] Packaging ...
[info] Done packaging.
[info] Running example.Example
Hello world!

14

(a,2)
[success] Total time: 1 s, completed Sep 3, 2018 2:02:48 PM
1. Waiting for source changes in project hello... (press enter to interrupt)

We can also open an interactive session using SBT.
sbt:Example project> console
[info] Starting scala interpreter...
Welcome to Scala 2.11.12 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_102).
Type in expressions for evaluation. Or try :help.
scala> example.Example.addOne('a', 1)
res1: (Char, Int) = (a,2)
scala> println("Interactive environment")
Interactive environment

To build Spark applications with SBT we need to include dependencies (Spark
most notably) to build the project. Modify your build.sbt file like so
ThisBuild / scalaVersion := "2.11.12"
lazy val example = (project in file("."))
.settings(
name := "Example project",
libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1",
libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
)

We can now use Spark in the script. Modify example.scala.
package example
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import java.sql.Timestamp

object ExampleSpark {
case class SensorData (
sensorName: String,
timestamp: Timestamp,
numA: Double,
numB: Double,
numC: Long,
numD: Double,
numE: Long,

15

numF: Double
)
def main(args: Array[String]) {
val schema =
StructType(
Array(
StructField("sensorname", StringType, nullable=false),
StructField("timestamp", TimestampType, nullable=false),
StructField("numA", DoubleType, nullable=false),
StructField("numB", DoubleType, nullable=false),
StructField("numC", LongType, nullable=false),
StructField("numD", DoubleType, nullable=false),
StructField("numE", LongType, nullable=false),
StructField("numF", DoubleType, nullable=false)
)
)
val spark = SparkSession
.builder
.appName("Example")
.getOrCreate()
val sc = spark.sparkContext // If you need SparkContext object
import spark.implicits._
val ds = spark.read
.schema(schema)
.option("timestampFormat", "MM/dd/yy:hh:mm")
.csv("./sensordata.csv")
.as[SensorData]
val dsFilter = ds.filter(a => a.timestamp ==
new Timestamp(2014 - 1900, 2, 10, 1, 1, 0, 0))
dsFilter.collect.foreach(println)
spark.stop
}
}

You can build a JAR using the package command in SBT. This JAR will be located
in the target/scala-version/project_name_version.jar.
You can run the JAR via spark-submit (which will run on local mode).
$ spark-submit target/scala-2.11/example-project_2.11-0.1.0-SNAPSHOT.jar
INFO:...
SensorData(COHUTTA,2014-03-10 01:01:00.0,10.27,1.73,881,1.56,85,1.94)
SensorData(NANTAHALLA,2014-03-10 01:01:00.0,10.47,1.712,778,1.96,76,0.78)
SensorData(THERMALITO,2014-03-10 01:01:00.0,10.24,1.75,777,1.25,80,0.89)

16

SensorData(BUTTE,2014-03-10 01:01:00.0,10.12,1.379,777,1.58,83,0.67)
SensorData(CARGO,2014-03-10 01:01:00.0,9.93,1.903,778,0.55,76,1.44)
SensorData(LAGNAPPE,2014-03-10 01:01:00.0,9.59,1.602,777,0.09,88,1.78)
SensorData(CHER,2014-03-10 01:01:00.0,10.17,1.653,777,1.89,96,1.57)
SensorData(ANDOUILLE,2014-03-10 01:01:00.0,10.26,1.048,777,1.88,94,1.66)
SensorData(MOJO,2014-03-10 01:01:00.0,10.47,1.828,967,0.36,77,1.75)
SensorData(BBKING,2014-03-10 01:01:00.0,10.03,0.839,967,1.17,80,1.28)
INFO:...

By default, Spark’s logging is quite assertive. You can change the log levels to
warn to reduce the output.
For development purposes you can also try running the application from SBT
using the run command. This is a bit iffy, as Spark starts a number of threads and
these don’t exit gracefully when SBT closes its main thread. This can be solved by
running the application in a forked process, which can be enabled by setting fork
in run := true in build.sbt. You will also have to set to change the log levels
programmatically, if desired.
import org.apache.log4j.{Level, Logger}
...

def main(args: Array[String]) {
...
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
...
}

You can also use this logger to log your application which might be helpful for
debugging on the AWS cluster later on.

The GDelt Project
Now that we have introduced the different technologies, we can start talking about
the goal of the first lab. In this lab you will write the application in Spark that
analyzes GDelt and constructs the 10 most talked about topics per day. For the
first lab you will write the prototype that you check on your local machine.
We will use the GDelt version 2 GKG files. The format these files are in is tab
separated values. The exact specification of each columns and details can be found
in the GKG codebook. The schema of the files can be read in headers.csv in the
data folder. The columns that are most relevant are the “date” column and the
“allNames” column.
In the data folder you will also find a script called get_data. This script will
download a number of sample files to your computer, and generate a document
with the paths to all these files.

17

$./get_data 4
...
wget downloading
...
$cat local_index.txt
/path/to/this/repo/SBD-2018/data/segment/20150218230000.gkg.csv
/path/to/this/repo/SBD-2018/data/segment/20150218231500.gkg.csv
/path/to/this/repo/SBD-2018/data/segment/20150218233000.gkg.csv
/path/to/this/repo/SBD-2018/data/segment/20150218234500.gkg.csv

The script will put all downloaded files in the segment folder. wget timestamps
the downloads, so it will not update the files when you want to generate a local
index for 20 files if you had 10 before.
You can use these local files for the first lab assignment to test your application,
and build some understanding of the scaling behaviour on a single machine.
An example output of this system based on 10 segments would be.
DateResult(2015-02-19,List((United States,1497), (Islamic State,1233), (New
York,1058), (United Kingdom,735), (White House,723), (Los Angeles,620), (New
Zealand,590), (Associated Press,498), (San Francisco,479), (Practice Wrestling
Room,420)))
DateResult(2015-02-18,List((Islamic State,1787), (United States,1210), (New
York,727), (White House,489), (Los Angeles,424), (Associated Press,385), (New
Zealand,353), (United Kingdom,325), (Jeb Bush,298), (Practice Wrestling
Room,280)))

Or in JSON.
{"data":"2015-02-19","result":[{"topic":"United
States","count":1497},{"topic":"Islamic State","count":1233},{"topic":"New
York","count":1058},{"topic":"United Kingdom","count":735},{"topic":"White
House","count":723},{"topic":"Los Angeles","count":620},{"topic":"New
Zealand","count":590},{"topic":"Associated Press","count":498},{"topic":"San
Francisco","count":479},{"topic":"Practice Wrestling Room","count":420}]}
{"data":"2015-02-18","result":[{"topic":"Islamic
State","count":1787},{"topic":"United States","count":1210},{"topic":"New
York","count":727},{"topic":"White House","count":489},{"topic":"Los
Angeles","count":424},{"topic":"Associated Press","count":385},{"topic":"New
Zealand","count":353},{"topic":"United Kingdom","count":325},{"topic":"Jeb
Bush","count":298},{"topic":"Practice Wrestling Room","count":280}]}

The exact counts can vary depending on how you count, for instance if a name
is mentioned multiple times per article, do you count it once or multiple times?
Something in between? Do you filter out some names that are false positives (“ParentCategory” seems to be a particular common one)? You are free to implement
it whatever you think is best, and are encouraged to experiment with this. Document your choices in your report.
18

Deliverables
The deliverables for the first lab are:
1. An RDD-based implementation of the GDelt analysis,
2. A Dataframe/Dataset-based implementation of the GDelt analysis,
3. A report containing:
1. Outline of your implementation and approach (½–1 page);
2. Pointwise answers to the questions listed below.
Your report and code will be discussed in a brief oral examination during the lab,
the schedule of which will be posted on Brightspace.
The deadline of this lab will be announced on Brightspace.

Questions
General questions:
1. In typical use, what kind of operation would be more expensive, a narrow
dependency or a wide dependency? Why?
2. What is the shuffle operation and why is it such an important topic in Spark
optimization?
3. In what way can Dataframes and Datasets improve performance both in
compute, but also in the distributing of data compared to RDDs? Will
Dataframes and Datasets always perform better than RDDs?
4. Consider the following scenario. You are running a Spark program on a
big data cluster with 10 worker nodes and a single master node. One of the
worker nodes fails. In what way does Spark’s programming model help you
recover the lost work? (Think about the directed acyclic graph!)
5. Can you think of a problem/computation that does not fit Spark’s
MapReduce-esque programming model efficiently.
6. Why do you think the MapReduce paradigm is such a widely utilized abstraction for distributed shared memory processing and fault-tolerance?
Implementation analysis questions:
1. Do you expect and observe big performance differences between the RDD
and Dataframe/Dataset implementation of the GDelt analysis?
2. How will your application scale when increasing the amount of analyzed
segments? What do you expect the progression in execution time will be
for, 100, 1000, 10000 files?
3. If you extrapolate the scaling behavior on your machine (for instance for 10,
50, 100 segments) to the entire dataset, how much time will it take to process
the entire dataset? Is this extrapolation reasonable for a single machine?
4. Now suppose you had a cluster of identical machines with that you performed the analysis on. How many machines do you think you would need
to process the entire dataset in under an hour? Do you think this is a valid
extrapolation?
19

5. Suppose you would run this analysis for a company. What do you think
would be an appropriate way to measure the performance? Would it be
the time it takes to execute? The amount of money it takes to perform the
analysis on the cluster? A combination of these two, or something else? Pick
something you think would be an interesting metric, as this is the metric you
will be optimizing in the 2nd lab!

20



Source Exif Data:
File Type                       : PDF
File Type Extension             : pdf
MIME Type                       : application/pdf
PDF Version                     : 1.5
Linearized                      : No
Page Mode                       : UseOutlines
Page Count                      : 20
Creator                         : LaTeX with hyperref package
Author                          : R.P. Hes; T.C. Leliveld
Producer                        : XeTeX 0.99998
Create Date                     : 2018:09:04 17:09:01+02:00
EXIF Metadata provided by EXIF.tools

Navigation menu