Big Data SMACK A Guide To Apache Spark, Mesos, Akka, Cassandra, And Kafka

Big%20Data%20SMACK%20A%20Guide%20to%20Apache%20Spark%2C%20Mesos%2C%20Akka%2C%20Cassandra%2C%20and%20Kafka

Big%20Data%20SMACK%20A%20Guide%20to%20Apache%20Spark%2C%20Mesos%2C%20Akka%2C%20Cassandra%2C%20and%20Kafka

Big%20Data%20SMACK%20-%20A%20Guide%20to%20Apache%20Spark%2C%20Mesos%2C%20Akka%2C%20Cassandra%2C%20and%20Kafka

Big%20Data%20SMACK%20A%20Guide%20to%20Apache%20Spark%2C%20Mesos%2C%20Akka%2C%20Cassandra%2C%20and%20Kafka

Big%20Data%20SMACK%20A%20Guide%20to%20Apache%20Spark%2C%20Mesos%2C%20Akka%2C%20Cassandra%2C%20and%20Kafka

User Manual: Pdf

Open the PDF directly: View PDF PDF.
Page Count: 277 [warning: Documents this large are best viewed by clicking the View PDF Link!]

Big Data
SMACK
A Guide to Apache Spark, Mesos,
Akka, Cassandra, and Kaa
Raul Estrada
Isaac Ruiz
Big Data SMACK
A Guide to Apache Spark, Mesos, Akka,
Cassandra, and Kafka
Raul Estrada
Isaac Ruiz
Big Data SMACK: A Guide to Apache Spark, Mesos, Akka, Cassandra, and Kafka
Raul Estrada Isaac Ruiz
Mexico City Mexico City
Mexico Mexico
ISBN-13 (pbk): 978-1-4842-2174-7 ISBN-13 (electronic): 978-1-4842-2175-4
DOI 10.1007/978-1-4842-2175-4
Library of Congress Control Number: 2016954634
Copyright © 2016 by Raul Estrada and Isaac Ruiz
This work is subject to copyright. All rights are reserved by the Publisher, whether the whole or part of the
material is concerned, specifically the rights of translation, reprinting, reuse of illustrations, recitation,
broadcasting, reproduction on microfilms or in any other physical way, and transmission or information
storage and retrieval, electronic adaptation, computer software, or by similar or dissimilar methodology now
known or hereafter developed.
Trademarked names, logos, and images may appear in this book. Rather than use a trademark symbol with
every occurrence of a trademarked name, logo, or image we use the names, logos, and images only in an
editorial fashion and to the benefit of the trademark owner, with no intention of infringement of the trademark.
The use in this publication of trade names, trademarks, service marks, and similar terms, even if they are
not identified as such, is not to be taken as an expression of opinion as to whether or not they are subject to
proprietary rights.
While the advice and information in this book are believed to be true and accurate at the date of publication,
neither the authors nor the editors nor the publisher can accept any legal responsibility for any errors or
omissions that may be made. The publisher makes no warranty, express or implied, with respect to the
material contained herein.
Managing Director: Welmoed Spahr
Acquisitions Editor: Susan McDermott
Developmental Editor: Laura Berendson
Technical Reviewer: Rogelio Vizcaino
Editorial Board: Steve Anglin, Pramila Balen, Laura Berendson, Aaron Black, Louise Corrigan,
Jonathan Gennick, Robert Hutchinson, Celestin Suresh John, Nikhil Karkal, James Markham,
Susan McDermott, Matthew Moodie, Natalie Pao, Gwenan Spearing
Coordinating Editor: Rita Fernando
Copy Editor: Kim Burton-Weisman
Compositor: SPi Global
Indexer: SPi Global
Cover Image: Designed by Harryarts - Freepik.com
Distributed to the book trade worldwide by Springer Science+Business Media New York, 233 Spring Street,
6th Floor, New York, NY 10013. Phone 1-800-SPRINGER, fax (201) 348-4505, e-mail
orders-ny@springer-sbm.com ,
or visit
www.springer.com . Apress Media, LLC is a California LLC and the sole member (owner) is Springer
Science + Business Media Finance Inc (SSBM Finance Inc). SSBM Finance Inc is a Delaware corporation.
For information on translations, please e-mail
rights@apress.com , or visit www.apress.com .
Apress and friends of ED books may be purchased in bulk for academic, corporate, or promotional use.
eBook versions and licenses are also available for most titles. For more information, reference our Special
Bulk Sales–eBook Licensing web page at
www.apress.com/bulk-sales .
Any source code or other supplementary materials referenced by the author in this text is available to
readers at
www.apress.com . For detailed information about how to locate your book’s source code, go to
www.apress.com/source-code/ .
Printed on acid-free paper
I dedicate this book to my mom and all the masters out there.
—Raúl Estrada
For all Binnizá people.
—Isaac Ruiz
v
Contents at a Glance
About the Authors ...................................................................................................xix
About the Technical Reviewer ................................................................................xxi
Acknowledgments ................................................................................................xxiii
Introduction ...........................................................................................................xxv
Part I: Introduction ................................................................................................ 1
Chapter 1: Big Data, Big Challenges ...................................................................... 3
Chapter 2: Big Data, Big Solutions ......................................................................... 9
Part II: Playing SMACK ........................................................................................ 17
Chapter 3: The Language: Scala .......................................................................... 19
Chapter 4: The Model: Akka ................................................................................ 41
Chapter 5: Storage: Apache Cassandra ............................................................... 67
Chapter 6: The Engine: Apache Spark ................................................................. 97
Chapter 7: The Manager: Apache Mesos ........................................................... 131
Chapter 8: The Broker: Apache Kafka ................................................................ 165
Part III: Improving SMACK ................................................................................. 205
Chapter 9: Fast Data Patterns ............................................................................ 207
Chapter 10: Data Pipelines ................................................................................ 225
Chapter 11: Glossary ......................................................................................... 251
Index ..................................................................................................................... 259
vii
Contents
About the Authors ...................................................................................................xix
About the Technical Reviewer ................................................................................xxi
Acknowledgments ................................................................................................xxiii
Introduction ...........................................................................................................xxv
Part I: Introduction ................................................................................................ 1
Chapter 1: Big Data, Big Challenges ...................................................................... 3
Big Data Problems ............................................................................................................ 3
Infrastructure Needs ........................................................................................................ 3
ETL ................................................................................................................................... 4
Lambda Architecture ........................................................................................................ 5
Hadoop ............................................................................................................................. 5
Data Center Operation ...................................................................................................... 5
The Open Source Reign ..........................................................................................................................6
The Data Store Diversifi cation ................................................................................................................6
Is SMACK the Solution? .................................................................................................... 7
Chapter 2: Big Data, Big Solutions ......................................................................... 9
Traditional vs. Modern (Big) Data ..................................................................................... 9
SMACK in a Nutshell ....................................................................................................... 11
Apache Spark vs. MapReduce ........................................................................................ 12
The Engine......................................................................................................................14
The Model ....................................................................................................................... 15
The Broker ...................................................................................................................... 15
CONTENTS
viii
The Storage ....................................................................................................................16
The Container ................................................................................................................. 16
Summary ........................................................................................................................16
Part II: Playing SMACK ........................................................................................ 17
Chapter 3: The Language: Scala .......................................................................... 19
Functional Programming ................................................................................................ 19
Predicate ..............................................................................................................................................19
Literal Functions ...................................................................................................................................20
Implicit Loops .......................................................................................................................................20
Collections Hierarchy ..................................................................................................... 21
Sequences ............................................................................................................................................21
Maps .....................................................................................................................................................22
Sets.......................................................................................................................................................23
Choosing Collections ...................................................................................................... 23
Sequences ............................................................................................................................................23
Maps .....................................................................................................................................................24
Sets.......................................................................................................................................................25
Traversing ....................................................................................................................... 25
foreach .................................................................................................................................................25
for .........................................................................................................................................................26
Iterators ................................................................................................................................................27
Mapping ......................................................................................................................... 27
Flattening ....................................................................................................................... 28
Filtering .......................................................................................................................... 29
Extracting ....................................................................................................................... 30
Splitting .......................................................................................................................... 31
Unicity ............................................................................................................................ 32
Merging .......................................................................................................................... 32
Lazy Views ...................................................................................................................... 33
Sorting ............................................................................................................................ 34
CONTENTS
ix
Streams .......................................................................................................................... 35
Arrays ............................................................................................................................. 35
ArrayBuffers ...................................................................................................................36
Queues ........................................................................................................................... 37
Stacks ............................................................................................................................ 38
Ranges ........................................................................................................................... 39
Summary ........................................................................................................................40
Chapter 4: The Model: Akka ................................................................................ 41
The Actor Model ............................................................................................................. 41
Threads and Labyrinths ........................................................................................................................42
Actors 101 ............................................................................................................................................42
Installing Akka ................................................................................................................44
Akka Actors ....................................................................................................................51
Actors ...................................................................................................................................................51
Actor System ........................................................................................................................................53
Actor Reference ....................................................................................................................................53
Actor Communication ...........................................................................................................................54
Actor Lifecycle ......................................................................................................................................56
Starting Actors ......................................................................................................................................58
Stopping Actors ....................................................................................................................................60
Killing Actors .........................................................................................................................................61
Shutting down the Actor System ..........................................................................................................62
Actor Monitoring ...................................................................................................................................62
Looking up Actors .................................................................................................................................63
Actor Code of Conduct ..........................................................................................................................64
Summary ........................................................................................................................66
Chapter 5: Storage: Apache Cassandra ............................................................... 67
Once Upon a Time... ........................................................................................................ 67
Modern Cassandra................................................................................................................................67
NoSQL Everywhere ......................................................................................................... 67
CONTENTS
x
The Memory Value .......................................................................................................... 70
Key-Value and Column .........................................................................................................................70
Why Cassandra? ............................................................................................................. 71
The Data Model .....................................................................................................................................72
Cassandra 101 ............................................................................................................... 73
Installation ............................................................................................................................................73
Beyond the Basics .......................................................................................................... 82
Client-Server ........................................................................................................................................82
Other Clients .........................................................................................................................................83
Apache Spark-Cassandra Connector ....................................................................................................87
Installing the Connector ........................................................................................................................87
Establishing the Connection .................................................................................................................89
More Than One Is Better ................................................................................................. 91
cassandra.yaml ....................................................................................................................................92
Setting the Cluster ................................................................................................................................93
Putting It All Together ..................................................................................................... 95
Chapter 6: The Engine: Apache Spark ................................................................. 97
Introducing Spark ........................................................................................................... 97
Apache Spark Download ......................................................................................................................98
Let’s Kick the Tires ...............................................................................................................................99
Loading a Data File .............................................................................................................................100
Loading Data from S3 .........................................................................................................................100
Spark Architecture........................................................................................................ 101
SparkContext ......................................................................................................................................102
Creating a SparkContext .....................................................................................................................102
SparkContext Metadata ......................................................................................................................103
SparkContext Methods .......................................................................................................................103
Working with RDDs....................................................................................................... 104
Standalone Apps .................................................................................................................................106
RDD Operations ..................................................................................................................................108
CONTENTS
xi
Spark in Cluster Mode .................................................................................................. 112
Runtime Architecture ..........................................................................................................................112
Driver ..................................................................................................................................................113
Executor ..............................................................................................................................................114
Cluster Manager .................................................................................................................................115
Program Execution .............................................................................................................................115
Application Deployment ......................................................................................................................115
Running in Cluster Mode ....................................................................................................................117
Spark Standalone Mode .....................................................................................................................117
Running Spark on EC2 ........................................................................................................................120
Running Spark on Mesos ....................................................................................................................122
Submitting Our Application .................................................................................................................122
Confi guring Resources .......................................................................................................................123
High Availability ..................................................................................................................................123
Spark Streaming .......................................................................................................... 123
Spark Streaming Architecture ............................................................................................................124
Transformations ..................................................................................................................................125
24/7 Spark Streaming ........................................................................................................................129
Checkpointing .....................................................................................................................................129
Spark Streaming Performance ...........................................................................................................129
Summary ...................................................................................................................... 130
Chapter 7: The Manager: Apache Mesos ........................................................... 131
Divide et Impera (Divide and Rule) ............................................................................... 131
Distributed Systems ..................................................................................................... 134
Why Are They Important? ....................................................................................................................135
It Is Diffi cult to Have a Distributed System ................................................................... 135
Ta-dah!! Apache Mesos ................................................................................................ 137
Mesos Framework ........................................................................................................ 138
Architecture ........................................................................................................................................138
CONTENTS
xii
Mesos 101 .................................................................................................................... 140
Installation ..........................................................................................................................................140
Teaming ..............................................................................................................................................146
Let’s Talk About Clusters .............................................................................................. 156
Apache Mesos and Apache Kafka.......................................................................................................157
Mesos and Apache Spark ...................................................................................................................161
The Best Is Yet to Come ......................................................................................................................163
Summary ............................................................................................................................................163
Chapter 8: The Broker: Apache Kafka ................................................................ 165
Kafka Introduction ........................................................................................................ 165
Born in the Fast Data Era ....................................................................................................................167
Use Cases ...........................................................................................................................................168
Kafka Installation .......................................................................................................... 169
Installing Java .....................................................................................................................................169
Installing Kafka ...................................................................................................................................170
Importing Kafka ..................................................................................................................................170
Kafka in Cluster ............................................................................................................ 171
Single Node–Single Broker Cluster ....................................................................................................171
Single Node–Multiple Broker Cluster .................................................................................................175
Multiple Node–Multiple Broker Cluster ...............................................................................................176
Broker Properties ................................................................................................................................177
Kafka Architecture ........................................................................................................ 178
Log Compaction ..................................................................................................................................180
Kafka Design ......................................................................................................................................180
Message Compression .......................................................................................................................180
Replication ..........................................................................................................................................181
Kafka Producers ........................................................................................................... 182
Producer API .......................................................................................................................................182
Scala Producers ..................................................................................................................................182
Producers with Custom Partitioning ...................................................................................................186
Producer Properties ............................................................................................................................189
CONTENTS
xiii
Kafka Consumers ......................................................................................................... 190
Consumer API .....................................................................................................................................190
Simple Scala Consumers ....................................................................................................................191
Multithread Scala Consumers ............................................................................................................194
Consumer Properties ..........................................................................................................................197
Kafka Integration .......................................................................................................... 198
Integration with Apache Spark ...........................................................................................................198
Kafka Administration .................................................................................................... 199
Cluster Tools .......................................................................................................................................199
Adding Servers ...................................................................................................................................200
Summary ...................................................................................................................... 203
Part III: Improving SMACK ................................................................................. 205
Chapter 9: Fast Data Patterns ............................................................................ 207
Fast Data ...................................................................................................................... 207
Fast Data at a Glance..........................................................................................................................208
Beyond Big Data .................................................................................................................................209
Fast Data Characteristics ...................................................................................................................209
Fast Data and Hadoop ........................................................................................................................210
Data Enrichment .................................................................................................................................211
Queries ...............................................................................................................................................211
ACID vs. CAP ................................................................................................................. 212
ACID Properties ...................................................................................................................................212
CAP Theorem ......................................................................................................................................213
Consistency ........................................................................................................................................213
CRDT ...................................................................................................................................................214
Integrating Streaming and Transactions ...................................................................... 214
Pattern 1: Reject Requests Beyond a Threshold .................................................................................214
Pattern 2: Alerting on Predicted Trends Variation ...............................................................................215
When Not to Integrate Streaming and Transactions ...........................................................................215
Aggregation Techniques .....................................................................................................................215
CONTENTS
xiv
Streaming Transformations .......................................................................................... 216
Pattern 3: Use Streaming Transformations to Avoid ETL.....................................................................216
Pattern 4: Connect Big Data Analytics to Real-Time Stream Processing ............................................217
Pattern 5: Use Loose Coupling to Improve Reliability .........................................................................218
Points to Consider...............................................................................................................................218
Fault Recovery Strategies ............................................................................................ 219
Pattern 6: At-Most-Once Delivery .......................................................................................................219
Pattern 7: At-Least-Once Delivery ......................................................................................................220
Pattern 8: Exactly-Once Delivery ........................................................................................................220
Tag Data Identifi ers ...................................................................................................... 220
Pattern 9: Use Upserts over Inserts ....................................................................................................221
Pattern 10: Tag Data with Unique Identifi ers ......................................................................................221
Pattern 11: Use Kafka Offsets as Unique Identifi ers ...........................................................................222
When to Avoid Idempotency ...............................................................................................................223
Example: Switch Processing...............................................................................................................223
Summary ...................................................................................................................... 224
Chapter 10: Data Pipelines ................................................................................ 225
Data Pipeline Strategies and Principles ....................................................................... 225
Asynchronous Message Passing ........................................................................................................226
Consensus and Gossip ........................................................................................................................226
Data Locality .......................................................................................................................................226
Failure Detection ................................................................................................................................226
Fault Tolerance/No Single Point of Failure ..........................................................................................226
Isolation ..............................................................................................................................................227
Location Transparency ........................................................................................................................227
Parallelism ..........................................................................................................................................227
Partition for Scale ...............................................................................................................................227
Replay for Any Point of Failure ...........................................................................................................227
Replicate for Resiliency ......................................................................................................................228
CONTENTS
xv
Scalable Infrastructure .......................................................................................................................228
Share Nothing/Masterless ..................................................................................................................228
Dynamo Systems Principles ...............................................................................................................228
Spark and Cassandra ................................................................................................... 229
Spark Streaming with Cassandra .......................................................................................................230
Saving Data ........................................................................................................................................232
Saving Datasets to Cassandra ............................................................................................................232
Saving a Collection of Tuples ..............................................................................................................233
Saving a Collection of Objects ............................................................................................................234
Modifying CQL Collections ..................................................................................................................234
Saving Objects of Cassandra User-Defi ned Types ..............................................................................235
Converting Scala Options to Cassandra Options ................................................................................236
Saving RDDs as New Tables ...............................................................................................................237
Akka and Kafka ............................................................................................................ 238
Akka and Cassandra ..................................................................................................... 241
Writing to Cassandra ..........................................................................................................................241
Reading from Cassandra ....................................................................................................................242
Connecting to Cassandra ....................................................................................................................244
Scanning Tweets .................................................................................................................................245
Testing TweetScannerActor ................................................................................................................246
Akka and Spark ............................................................................................................ 248
Kafka and Cassandra ................................................................................................... 249
CQL Types Supported ..........................................................................................................................250
Cassandra Sink ...................................................................................................................................250
Summary ...................................................................................................................... 250
Chapter 11: Glossary ......................................................................................... 251
ACID .............................................................................................................................. 251
agent ............................................................................................................................ 251
API ................................................................................................................................ 251
BI .................................................................................................................................. 251
CONTENTS
xvi
big data ........................................................................................................................ 251
CAP ............................................................................................................................... 251
CEP ............................................................................................................................... 252
client-server ................................................................................................................. 252
cloud............................................................................................................................. 252
cluster .......................................................................................................................... 252
column family ............................................................................................................... 252
coordinator ................................................................................................................... 252
CQL ............................................................................................................................... 252
CQLS ............................................................................................................................. 252
concurrency.................................................................................................................. 253
commutative operations ............................................................................................... 253
CRDTs ........................................................................................................................... 253
dashboard .................................................................................................................... 253
data feed ...................................................................................................................... 253
DBMS............................................................................................................................ 253
determinism ................................................................................................................. 253
dimension data ............................................................................................................. 254
distributed computing. ................................................................................................. 254
driver ............................................................................................................................ 254
ETL ............................................................................................................................... 254
exabyte ......................................................................................................................... 254
exponential backoff ...................................................................................................... 254
failover ......................................................................................................................... 254
fast data ....................................................................................................................... 255
gossip ........................................................................................................................... 255
graph database ............................................................................................................ 255
HDSF............................................................................................................................. 255
CONTENTS
xvii
HTAP ............................................................................................................................. 255
IaaS .............................................................................................................................. 255
idempotence ................................................................................................................ 256
IMDG ............................................................................................................................. 256
IoT ................................................................................................................................ 256
key-value ...................................................................................................................... 256
keyspace ...................................................................................................................... 256
latency .......................................................................................................................... 256
master-slave................................................................................................................. 256
metadata ...................................................................................................................... 256
NoSQL ........................................................................................................................... 257
operational analytics .................................................................................................... 257
RDBMS ......................................................................................................................... 257
real-time analytics ....................................................................................................... 257
replication .................................................................................................................... 257
PaaS ............................................................................................................................. 257
probabilistic data structures ........................................................................................ 258
SaaS ............................................................................................................................. 258
scalability ..................................................................................................................... 258
shared nothing ............................................................................................................. 258
Spark-Cassandra Connector ........................................................................................ 258
streaming analytics ...................................................................................................... 258
synchronization ............................................................................................................ 258
unstructured data ......................................................................................................... 258
Index ..................................................................................................................... 259
xix
About the Authors
Raul Estrada has been a programmer since 1996 and a Java developer since the year 2000. He loves
functional languages like Elixir, Scala, Clojure, and Haskell. With more than 12 years of experience in high
availability and enterprise software, he has designed and implemented architectures since 2003. He has
been enterprise architect for BEA Systems and Oracle Inc., but he also enjoys mobile programming and
game development. Now he is focused on open source projects related to data pipelining like Apache Spark,
Apache Kafka, Apache Flink, and Apache Beam.
Isaac Ruiz has been a Java programmer since 2001, and a consultant and an architect since 2003. He has
participated in projects in different areas and varied scopes (education, communications, retail, and others).
He specializes in systems integration, particularly in the financial sector. Ruiz is a supporter of free software
and he likes to experiment with new technologies (frameworks, languages, and methods).
xxi
About the Technical Reviewer
Rogelio Vizcaino has been a programming professionally for ten years, and hacking a little longer than that.
Currently he is a JEE and solutions architect on a consultancy basis for one of the major banking institutions
in his country. Educated as an electronic systems engineer, performance and footprint are more than
desirable treats” in software to him. Ironically, the once disliked tasks in database maintenance became his
mainstay skills through much effort in the design and development of both relational and non-relational
databases since the start of his professional practice—and the good luck of finding great masters to work
with during the journey. With most of his experience in the enterprise financial sector, Vizcainos heart is
with the Web. He keeps track of web-related technologies and standards, where he discovered the delights of
programming back in the late 1990s. Vizcaino considers himself a programmer before an architect, engineer,
or developer; “programming” is an all-encompassing term and should be used with pride. Above all, he likes
to learn, to create new things, and to fix broken ones.
xxiii
Acknowledgments
We want to say thanks to our acquisitions editor, Susan McDermott, who believed in this project from the
beginning; without her help, it would not have started.
We also thank Rita Fernando and Laura Berendson; without their effort and patience, it would not have
been possible to write this book.
We want to thank our technical reviewer, Rogelio Vizcaino; without him, the project would not have
been a success.
We also want to thank all the heroes who contribute open source projects, specifically with Spark,
Mesos, Akka, Cassandra and Kafka, and special recognition to those who develop the open source
connectors between these technologies.
We also thank all the people who have educated us and shown us the way throughout our lives.
xxv
Introduction
During 2014, 2015, and 2016, surveys show that among all software developers, those with higher wages are
the data engineers, the data scientists, and the data architects.
This is because there is a huge demand for technical professionals in data; unfortunately for large
organizations and fortunately for developers, there is a very low offering.
Traditionally, large volumes of information have been handled by specialized scientists and people
with a PhD from the most prestigious universities. And this is due to the popular belief that not all of us have
access to large volumes of corporate data or large enterprise production environments.
Apache Spark is disrupting the data industry for two reasons. The first is because it is an open source
project. In the last century, companies like IBM, Microsoft, SAP, and Oracle were the only ones capable of
handling large volumes of data, and today there is so much competition between them, that disseminating
designs or platform algorithms is strictly forbidden. Thus, the benefits of open source become stronger
because the contributions of so many people make free tools more powerful than the proprietary ones.
The second reason is that you do not need a production environment with large volumes of data or
large laboratories to develop in Apache Spark. Apache Spark can be installed on a laptop easily and the
development made there can be exported easily to enterprise environments with large volumes of data.
Apache Spark also makes the data development free and accessible to startups and little companies.
If you are reading this book, it is for two reasons: either you want to be among the best paid IT
professionals, or you already are and you want to learn how today’s trends will become requirements in the
not too distant future.
In this book, we explain how dominate the SMACK stack, which is also called the Spark++, because it
seems to be the open stack that will most likely succeed in the near future.
PART I
Introduction
3
© Raul Estrada and Isaac Ruiz 2016
R. Estrada and I. Ruiz, Big Data SMACK, DOI 10.1007/978-1-4842-2175-4_1
CHAPTER 1
Big Data, Big Challenges
In this chapter, we expose the modern architecture challenges facing the SMACK stack (Apache Spark,
Mesos, Akka, Cassandra, and Kafka). Also, we present dynamic processing environment problems to see
which conditions are suitable and which are not.
This chapter covers the following:
Why we need a pipeline architecture for big data
The Lambda Architecture concept
ETL and its dark side
Big Data Problems
We live in the information era, where almost everything is data. In modern organizations, there is a suitable
difference between data engineers and data architects. Data engineers are experts who perfectly know the
inner workings and handling of the data engines. The data architect well understands all the data sources—
internal and external. Internal sources are usually owned systems. External sources are systems outside the
organization. The first big data problem is that the number of data sources increases with time.
A few years ago, a big company’s IT department could survive without data architects or data engineers.
Today’s challenge is to find good architects. The main purpose of architecture is always resilience. If the data
architect doesn’t have a data plan, then the data sources and the data size will become unmanageable.
The second problem is obtaining a data sample. When you are a data analyst (the person charged
with the compilation and analysis of numerical information), you need data samples—that is, data from
production environments. If the size of the data and/or the number of data sources increases, then obtaining
data samples becomes a herculean task.
The third big data problem is that the validity of an analysis becomes obsolete as time progresses.
Today, we have all the information. The true value of data is related to time. Within minutes, a
recommendation, an analysis, or a prediction can become useless.
The fourth problem is related to the return on investment of an analysis. The analysis velocity is directly
proportional to the return on investment. If the data analyst can’t get data in a timely way, then analysis costs
increase and the earnings decrease.
Electronic supplementary material The online version of this chapter (doi: 10.1007/978-1-4842-2175-4_1 )
contains supplementary material, which is available to authorized users.
CHAPTER 1 BIG DATA, BIG CHALLENGES
4
Infrastructure Needs
Modern companies require a scalable infrastructure . The costs of your data center are always in accordance
with your business size. There is expensive hardware and costly software. And nowadays, when it comes to
open source software, people’s first thoughts are the high costs of consulting or the developer’s price tag. But
there is good news: today, big data solutions are not exclusive to large budgets.
Technologies must be distributed. Nowadays, when we talk about distributed software , we are no longer
talking about multiple processors; instead, we are talking about multiple data centers. This is the same
system, geographically dispersed.
If your business grows, your data should fit those needs. This is scalability. Most people are afraid of
the term big data , and spend valuable economic resources to tackle a problem that they don’t have. In
a traditional way, your business growth implies your data volumes’ growth. Here, the good news is scale
linearly with cheap hardware and inexpensive software.
Faster processing speed is not related to processor cycles per second, but the speed of all your
enterprise process. The now is everything, opportunities are unique, and few situations are repeatable.
When we talk about complex processing, we are not talking about the “Big O” of an algorithm. This is
related to the number of actors involved in one process.
The data flow is constant. The days when businesses could store everything in warehouses are gone.
The businesses that deliver responses the next day are dying. The now is everything. Data warehouses are
dying because stored data becomes rotten, and data caducity is shorter every day. The costs associated with
a warehouse are not affordable today.
And finally, there is visible and reproducible analysis. As we have mentioned, data analysts need fresh
and live data to satisfy their needs. If data becomes opaque, the business experiences a lack of management.
ETL
ETL stands for extract, transform, load . And it is, even today, a very painful process. The design and
maintenance of an ETL process is risky and difficult. Contrary to what many enterprises believe, they serve
the ETL and the ETL doesn’t serve anyone. It is not a requirement; it is a set of unnecessary steps.
Each step in ETL has its own risk and introduces errors. Sometimes, the time spent debugging the ETL
result is longer than the ETL process itself. ETL always introduces errors. Everyone dedicated to ETL knows
that having no errors is an error. In addition, everyone dedicated to ETL knows that applying ETL onto
sensitive data is playing with the company’s stability.
Everybody knows that when there is a failure in an ETL process, data duplication odds are high.
Expensive debugging processes (human and technological) should be applied after an ETL failure. This
means looking for duplicates and restoring information.
The tools usually cost millions of dollars. Big companies know that ETL is good business for them, but not
for the client. The human race has invested a lot of resources (temporal and economic) in making ETL tools.
The ETL decreases throughput. The performance of the entire company decreases when the ETL
process is running, because the ETL process demands resources: network, database, disk space, processors,
humans, and so forth.
The ETL increases complexity. Few computational processes are as common and as complicated. When
a process requires ETL, the consultants know that the process will be complex, because ETL rarely adds
value to a business’s “line of sight” and requires multiple actors, steps, and conditions.
ETL requires intermediary files writing. Yes, as if computational resources were infinite, costless, and easily
replaceable. In today’s economy, the concept of big intermediary files is an aberration that should be removed.
The ETL involves parsing and reparsing text files. Yes, the lack of appropriate data structures leads to
unnecessary parsing processes. And when they finish, the result must be reparsed to ensure the consistency
and integrity of the generated files.
Finally, the ETL pattern should be duplicated over all our data centers. The number doesn’t matter;
the ETL should be replicated in every data center.
CHAPTER 1 BIG DATA, BIG CHALLENGES
5
The good news is that no ETL pipelines are typically built on the SMACK stack. ETL is the opposite of
high availability, resiliency, and distribution. As rule of thumb, if you write a lot of intermediary files, you
suffer ETL; as if your resources—computational and economic—were infinite.
The first step is to remove the extract phase. Today we have very powerful tools (for example, Scala) that
can work with binary data preserved under strongly typed schemas (instead of using big text dumps parsed
among several heterogeneous systems). Thus, it is an elegant weapon for a more civilized big data age.
The second step is to remove the load phase. Today, your data collection can be done with a modern
distributed messaging system (for example, Kafka) and you can make the distribution to all your clients in
real time. There is no need to batch “load.
Lambda Architecture
Lambda Architecture is a data processing architecture designed to handle massive quantities of data by
taking advantage of both batch and stream processing methods. As you saw in previous sections, today’s
challenge is to have the batch and streaming at the same time.
One of the best options is Spark. This wonderful framework allows batch and stream data processing in the
same application at the same time. Unlike many Lambda solutions, SMACK satisfies these two requirements: it
can handle a data stream in real time and handle despair data models from multiple data sources.
In SMACK, we persist in Cassandra, the analytics data produced by Spark, so we guarantee the access
to historical data as requested. In case of failure, Cassandra has the resiliency to replay our data before the
error. Spark is not the only tool that allows both behaviors at the same time, but we believe that Apache
Spark is the best.
Hadoop
Apache Hadoop is an open-source software framework written in Java for distributed storage and the
distributed processing of very large data sets on computer clusters built from commodity hardware.
There are two main components associated with Hadoop: Hadoop MapReduce and Hadoop
Distributed File System ( HDFS ). These components were inspired by the Google file system.
We could talk more about Hadoop, but there are lots of books specifically written on this topic. Hadoop was
designed in a context where size, scope, and data completeness are more important than speed of response.
And here you face with a crucial decision: if the issue that you need to solve is more like data
warehousing and batch processing, Apache Hadoop could be your solution. On the other hand, if the issue
is the speed of response and the amount of information is measured in speed units instead of data size units,
Apache Spark is your solution.
Data Center Operation
And we take this space to briefly reflect on how the data center operation has changed.
Yesterday, everything scaled up; today, everything scales out. A few years ago, the term data center
meant proprietary use of specialized and expensive supercomputers. Today’s challenge is to be competitive
using commodity computers connected with a non-expensive network.
The total cost of ownership determines all. Business determines the cost and size of the data center.
Modern startups always rise from a small data center. Buying or renting an expensive data center just to see
if your startup is a good idea has no meaning in the modern economy.
The M in SMACK is a good solution to all your data center needs. With Apache Mesos, you can
abstract” all the resources from all the interconnected small computers to build a supercomputer with the
linear sum of each machine’s resources: CPU cores, memory, disk, and network.
CHAPTER 1 BIG DATA, BIG CHALLENGES
6
The Open Source Reign
A few years ago, dependency on a vendor was a double-edged sword. On one hand, large companies hired
proprietary software firms to later blame the manufacturer for any failure in their systems. But, on the other
hand, this dependence—all the processes, development, and maintenance—became slow and all the issues
were discussed with a contract in hand.
Many large companies don’t implement open source solutions for fear that no one else can provide the
same support as large manufacturers. But weighing both proposals, the vendor lock-in and the external bug
fixing is typically more expensive than open source solutions.
In the past, the big three-letter monopolies dictated the game rules. Today, the rules are made “by and
for” the developers, the transparency is guaranteed by APIs defined by the same community. Some groups—
like the Apache Software Foundation and the Eclipse Foundation—provide guides, infrastructure, and tools
for sustainable and fair development of these technologies.
Obviously, nothing is free in this life; companies must invest in training their staff on open source
technologies.
The Data Store Diversification
Few people see this, but this is the beginning of the decline of the relational databases era. Since 2010,
and the emergence of NoSQL and NoETL, there has been tough criticism of traditional systems, which is
redefining the leader board.
Due to modern business needs, having everything stored in a relational database will go from being the
standard way to the old-fashioned and obsolete way. Simple daily problems like recording the data, multiple
store synchronization, and expensive store size are promoting NoSQL and NoETL solutions.
When moving data, gravity and location matter. Data gravity is related to the costs associated with
moving a huge amount of data from one point to another. Sometimes, the simple everyday task of restoring a
backup can be a daunting task in terms of time and money.
Data allocation is a modern concept related to moving the computation resources where the data is
located, rather than moving the data to where the computation is. It sounds simple, but due to the hardware
(re)evolution, the ability to perform complex calculations on new and powerful client machines doesn’t
impact customer perception on the performance of the entire system.
DevOps (development operations) is a term coined by Andrew Clay Shafer and Patrick Debois at the
Agile Conference in 2008.
1 Since then, DevOps has become a movement, a culture, and a lifestyle where
software developers and information technology professionals charged with data center operation can live
and work in harmony. How is this achieved? Easy: by dissolving the differences between them.
Today DevOps is one of the most profitable IT specializations. Modern tools like Docker and Spark
simplify the movement between testing and production environments. The developers can have production
data easily and the testing environments are almost mirrored with production environments.
As you will see in Chapter 7 , today’s tendency is containerize the development pipeline from
development to production.
1 http://ieeexplore.ieee.org/xpl/mostRecentIssue.jsp?punumber=4599439
CHAPTER 1 BIG DATA, BIG CHALLENGES
7
Is SMACK the Solution?
Even today, there are very few companies fully using SMACK. That is, many major companies use a flavor
of SMACK—just use one, two, or three letters of the SMACK stack . As previously mentioned, Spark has
many advantages over Hadoop. Spark also solves problems that Hadoop cannot. However, there are some
environments where Hadoop has deep roots and where workflow is completely batch based. In these
instances, Hadoop is usually a better choice.
Several SMACK letters have become a requirement for some companies that are in pilot stages and aim
to capitalize all the investment in big data tools and training. The purpose of this book is to give you options.
The goal is not to make a full pipeline architecture installation of all the five technologies.
However, there are many alternatives to the SMACK stack technologies. For example, Yarn may be an
alternative to Mesos. For batch processing, Apache Flink can be an alternative to Spark. The SMACK stack
axiom is to build an end-to-end pipeline and have the right component in the correct position, so that
integration can be done quickly and naturally, instead of having expensive tools that require a lot of effort to
cohabit among them.
9
© Raul Estrada and Isaac Ruiz 2016
R. Estrada and I. Ruiz, Big Data SMACK, DOI 10.1007/978-1-4842-2175-4_2
CHAPTER 2
Big Data, Big Solutions
In Chapter 1 , we answered the Why? . In this chapter, we will answer the How? . When you understand the
Why, the answer to the How happens in only a matter of time.
This chapter covers the following topics:
Traditional vs. modern (big) data
SMACK in a nutshell
S park, the engine
M esos, the container
A kka, the model
C assandra, the storage
K afka, the broker
Traditional vs. Modern (Big) Data
Is time quantized? Is there an indivisible amount of time that cannot be divided? Until now, the correct
answer to these questions was “Nobody knows.” The only certain thing is that on a human scale, life doesn’t
happen in batch mode.
Many systems are monitoring a continuous stream of events: weather events, GPS signals, vital signs,
logs, device metrics…. The list is endless. The natural way to collect and analyze this information is as a
stream of data.
Handling data as streams is the correct way to model this behavior, but until recently, this methodology
was very difficult to do well. The previous rates of messages were in the range of thousands of messages per
second—the new technologies discussed in this book can deliver rates of millions of messages per second.
The point is this: streaming data is not a matter for very specialized computer science projects; stream-
based data is becoming the rule for data-driven companies.
Table 2-1 compares the three approaches: traditional data, traditional big data, and modern big data.
CHAPTER 2 BIG DATA, BIG SOLUTIONS
10
Table 2-1. Traditional Data, Traditional Big Data, and Modern Big Data Approaches
CONCEPT TRADITIONAL DATA TRADITIONAL BIG DATA MODERN BIG DATA
Person • IT oriented • IT oriented • Business oriented
Roles • Developer • Data engineer • Business user
• Data architect • Data scientist
Data Sources Relational Relational Relational
Files Files Files
• Message queues • Message queues • Message queues
• Data service • Data service
NoSQL
Data Processing • Application server • Application server • Application server
ETL ETL ETL
Hadoop Hadoop
Spark
Metadata • Limited by IT • Limited by model Automatically
generated
• Context enriched
• Business oriented
• Dictionary based
User interface Self-made Self-made Self-made
Developer skills
required
Developer skills
required
• Built by business users
• Tools guided
Use Cases • Data migration • Data lakes • Self-service
• Data movement • Data hubs • Internet of Things
Replication Data warehouse
offloading
• Data as a Service
Open Source
Technologies
• Fully embraced • Minimal • TCO rules
Tools Maturity High Medium Low
Enterprise Enterprise Evolving
Business Agility Low Medium Extremely high
Automation level Low Medium High
Governance • IT governed • Business governed • End-user governed
Problem Resolution • IT personnel solved • IT personnel solved • Timely or die
Collaboration Medium Low Extremely high
Productivity/Time to
Market
• Slower • Slower • Highly productive
(continued)
CHAPTER 2 BIG DATA, BIG SOLUTIONS
11
Modern technologies and architectures allow you to build systems more easily and efficiently, and
to produce a better model of the way business processes take place. We will explain the real value of a
streaming architecture. The possibilities are vast.
Apache Spark is not a replacement for Hadoop. Spark is a computing engine, whereas Hadoop is a
complete stack for storage, cluster management, and computing tools. Spark runs well over Hadoop.
Hadoop is a ten-year-old technology. Today, we see the rising of many deployments that are not on
Hadoop, including deployments on NoSQL stores (like Cassandra) and deployments directly against cloud
storage (e.g., Amazon S3). In this aspect, Spark is reaching a broader audience than Hadoop.
SMACK in a Nutshell
If you poll several IT people, we agree on a few things, including that we are always searching for a new acronym.
SMACK, as you already know, stands for Spark, Mesos, Akka, Cassandra, and Kafka. They are all open
source technologies and all are Apache software projects, except Akka. The SMACK acronym was coined by
Mesosphere, a company that, in collaboration with Cisco, bundled these technologies together in a product
called Infinity, which was designed to solve some big data challenges where the streaming is fundamental.
1
Big data architecture is required in the daily operation of many companies, but there are a lot of sources
talking about each technology separately.
Let’s discuss the full stack and how to make the integration.
This book is a cookbook on how to integrate each technology in the most successful big data stack. We talk
about the five main concepts of big data architecture and how to integrate/replace/reinforce every technology:
Spark: The engine
Mesos: The container
Akka: The model
Cassandra: The storage
Kafka: The message broker
Figure 2-1 represents the reference diagram for the whole book.
Table 2-1. (continued)
CONCEPT TRADITIONAL DATA TRADITIONAL BIG DATA MODERN BIG DATA
• Faster time to market
Integration Analysis Minimal Medium Modeled by analytical
transformations
Real-time • Minimal real time • Minimal real time • In real time or die
Data Access • Primarily batch • Batch • Micro batch
1 https://mesosphere.com/
CHAPTER 2 BIG DATA, BIG SOLUTIONS
12
Apache Spark vs. MapReduce
MapReduce is a programming model for processing large data sets with a parallel and distributed algorithm
on a cluster.
As we will see later, in functional programming, there are two basic methods: map() , which is dedicated
filtering and sorting, and reduce() , which is dedicated to doing an operation. As an example, to serve a
group of people at a service window, you must first queue (map) and then attend them (reduce).
The term MapReduce was coined in 1995, when the Message Passing Interface was used to solve
programming issues, as we will discuss later. Obviously, when Google made the implementation, it had only
one use case in mind: web search.
It is important to note that Hadoop born in 2006 and grew up in an environment where MapReduce
reigned. MapReduce was born with two characteristics that mark its life: high latency and batch mode; both
make it incapable to withstand modern challenges.
As you can see in Table
2-2 , Spark is different.
Figure 2-1. SMACK at a glance
CHAPTER 2 BIG DATA, BIG SOLUTIONS
13
Table 2-2. Apache Spark /MapReduce Comparison
CONCEPT Apache Spark MapReduce
Written in Scala/Akka Java
Languages Supported Java, Scala, Python, and R are first-
class citizens.
Everything should be written using Java.
Storage Model Keeps things in memory Keeps things in disk. Takes a long time
to write things to disk and read them
back, making it slow and laborious.
I/O Model Keeps things in memory without I/O.
Operates on the same data quickly.
Requires a lot of I/O activity over disk.
Recovery Runs the same task in seconds or
minutes. Restart is not a problem.
Records everything in disk, allowing
restart after failure
Knowledge The abstraction is high; codification
is intuitive.
Could write MapReduce jobs
intelligently, avoiding overusing
resources, but requires specialized
knowledge of the platform.
Focus Code describes how to process data.
Implementation details are hidden.
Apache Hive programming goes
into code to avoid running too many
MapReduce jobs.
Efficiency Abstracts all the implementation to
run it as efficiently as possible.
Programmers write complex code to
optimize each MapReduce job.
Abstraction Abstracts things like a good high-
level programming language.
It is a powerful and expressive
environment.
Code is hard to maintain over time.
Libraries Adds libraries for machine learning,
streaming, graph manipulation, and
SQL.
Programmers need third-party tools
and libraries, which makes work
complex.
Streaming Real-time stream processing out of
the box.
Frameworks like Apache Storm needed;
increased complexity.
Source Code Size Scala programs have dozens of lines
of code (LOC).
Java programs have hundreds of LOC.
Machine Learning Spark ML If you want to do machine learning, you
have to separately integrate Mahout,
H2O, or Onyx. You have to learn how it
works, and how to build it on.
Graphs Spark GraphX If you want to do graph databases, you
have to select from Giraph, TitanDB,
Neo4J, or some other technologies.
Integration is not seamless.
CHAPTER 2 BIG DATA, BIG SOLUTIONS
14
Apache Spark has these advantages:
Spark speeds up application development 10 to 100 times faster, making applications
portable and extensible.
Scala can read Java code. Java code can be rewritten in Scala in a much smaller form
factor that is much easier to read, repurpose, and maintain.
When the Apache Spark core is improved, all the machine learning and graphs
libraries are improved too.
Integration is easier: the applications are easier to maintain and costs go down.
If an enterprise bets on one foundation, Spark is the best choice today.
Databricks (a company founded by the Apache Spark creators) lists the following use cases for Spark:
ETL and data integration
Business intelligence and interactive analytics
Advanced analytics and machine learning
Batch computation for high performance
Real-time stream processing
Some of the new use cases are just the old use cases done faster; although some use cases are totally
new. There are some scenarios that just can’t be done with acceptable performance on MapReduce.
The Engine
It is important to recall that Spark is better at OLAP (online analytical processing), which are batch jobs
and data mining. Spark is not suitable for OLTP (online transaction processing), such as numerous atomic
transactions; for this type of processing, we strongly recommend Erlang (a beautiful language inspired in the
actor’s model).
Apache Spark has five main components:
Spark Core
Spark SQL
Spark Streaming
Spark MLib
Spark GraphX
Each Spark library typically has an entire book dedicated to it. In this book, we try to simply tackle the
Apache Spark essentials to meet the SMACK stack.
The role of Apache Spark on the SMACK stack is to act as the processor and provide real-time data
analysis. It addresses the aggregation and analysis layers.
There are few open source alternatives to Spark. As we’ve mentioned, Apache Hadoop is the classic
approach. The strongest modern adversary is the Apache Flink project, which is good to keep in mind.
CHAPTER 2 BIG DATA, BIG SOLUTIONS
15
The Model
Akka is a model , a toolkit, and a runtime for building distributed, resilient, and highly concurrent
message-driven applications on the Java virtual machine. In 2009, the Akka toolkit was released as open
source. Language bindings exist for both Java and Scala. We need to first analyze Akka in order to understand
the Spark architecture. Akka was designed based on the actor concurrency models:
Actors are arranged hierarchically
Asynchronous message (data) passing
Fault tolerant
Customizable failure and detection strategies
Hierarchical supervision
Adaptive, predictive
Parallelized
Load balance
There are many Akka competitors; we make a special mention of Reactor. The actor model is the
foundation of many frameworks and languages. The main languages that are based on the actor model
(called functional languages ) are Lisp, Scheme, Erlang, Haskell, and recently, Scala, Clojure, F#, and Elixir
(a modern implementation of Erlang).
The Broker
Apache Kafka is a publish/subscribe message broker redesigned as a distributed commit log. In SMACK,
Kafka is the data ingestion point, mainly on the application layer. Kafka takes data from applications and
streams and processes them into the stack. Kafka is a distributed messaging system with high throughput. It
handles massive data load and floods. It is the valve that regulates the pressure.
Apache Kafka inspects incoming data volume, which is fundamental for partitioning and distribution
among the cluster nodes. Apache Kafkas features include the following:
Automatic broker failover
Very high performance distributed messaging
Partitioning and Distribution across the cluster nodes
Data pipeline decoupling
A massive number of consumers are supported
Massive data load handling
Kafka is the champion among a lot of competitors in MOM (message-oriented middleware). In the
MQ family, this includes ActiveMQ, ZeroMQ, IronMQ, and RabbitMQ. The best of all is RabbitMQ, which is
made with Erlang.
The best alternative to Kafka is Apache Storm, which has a lot of integration with Apache Hadoop. Keep
it in mind. Apache Kafka is here to stay.
CHAPTER 2 BIG DATA, BIG SOLUTIONS
16
The Storage
Apache Cassandra is a distributed database. It is the perfect choice when you need to escalate and need
hyper-high availability with no sacrifice in performance. Cassandra was originally used on Facebook in 2008
to handle large amounts of data. It became a top-level Apache project in 2010. Cassandra handles the stack’s
operational data. Cassandra can also be used to expose data to the application layer.
The following are the main features of Apache Cassandra:
Extremely fast and scalable
Multi data center, no single point of failure
Survives when multiple nodes fault
Easy to operate
Flexible data modeling
Automatic and configurable replication
Ideal for real-time ingestion
Has a great Apache based community
There are a lot of Cassandra competitors, including DynamoDB (powered by Amazon; it’s contending
in the NoSQL battlefield), Apache HBase (the best-known database implementation of Hadoop), Riak
(made by the Basho samurais; it’s a powerful Erlang database), CouchBase, Apache CouchDB, MongoDB,
Cloudant, and Redis.
The Container
Apache Mesos is a distributed systems kernel that is easy to build and effective to run. Mesos is an
abstraction layer over all computer resources (CPU, memory, storage) on the machines (physical or
virtual), enabling elastic distributed systems and fault tolerance. Mesos was designed with the Linux kernel
principles at a higher abstraction level. It was first presented as Nexus in 2009. In 2011, it was relaunched by
Matei Zaharia under its current name. Mesos is the base of three frameworks:
Apache Aurora
Chronos
Marathon
In SMACK, Mesos orchestrates components and manages resources. It is the secret for horizontal
cluster scalation. Usually, Apache Mesos is combined with Kubernetes (the competitor used by the Google
Cloud Platform) or with Docker (as you will see, more than a competitor, it is a complement to Mesos). The
equivalent in Hadoop is Apache Yarn.
Summary
This chapter, like the previous one, was full of theory. We reviewed the fundamental SMACK diagram as
well as Spark’s advantages over traditional big data technologies such as Hadoop and MapReduce. We also
visited every technology in the SMACK stack, briefly presented each tool’s potential, and most importantly,
we discussed the actual alternatives for each technology. The upcoming chapters go into greater depth
on each of these technologies. We will explore the connectors and the integration practices, and link
techniques, as well as describe alternatives to every situation.
PART II
Playing SMACK
19
© Raul Estrada and Isaac Ruiz 2016
R. Estrada and I. Ruiz, Big Data SMACK, DOI 10.1007/978-1-4842-2175-4_3
CHAPTER 3
The Language: Scala
The main part of the SMACK stack is Spark, but sometimes the S is for Scala. You can develop in Spark in
four languages: Java, Scala, Python, and R. Because Apache Spark is written in Scala, and this book is focused
on streaming architecture, we are going to show examples in only the Scala language.
Other Apache Spark books present their examples in the four languages, but for the SMACK stack ,
simply discussing Scala is enough to develop a robust streaming pipeline. It is important to mention that all
the Java programs run in Scala.
If you came here without previous Scala knowledge, welcome to the crash course. It is always good to
learn a new programming language. We are not going to study Scala as the first programming language,
however. This chapter is organized as a series of exercises in the language. If you already know Scala, try to
follow the exercises to improve your knowledge.
As said by many, programming is just about algorithms and data structures. This chapter covers all the
Scala data structures. The next chapter covers the algorithms—that is, the Akka actor model.
Functional Programming
Our goal in this chapter is not to learn Scala, but to reach the fully functional thinking in all of its pure
expression. It is an open secret that each SMACK technology is independent and autonomous from the
others. However, each could be developed (replaced) in Java or Scala.
The truth is that each and every one of the SMACK technologies can be developed ad hoc. Yes, the sun
shines for everyone in the streaming pipeline world. You can develop from scratch any SMACK technology
or replace one as your project needs.
How to write an entire Apache Akka project is beyond this book’s scope, but you should understand
how it works to make good architectural decisions.
You need to be clear on these rules:
Scala collections and Java collections are different
Spark collections and Scala collections are different
There are three fundamentals (among many others) in functional programming :
Predicates
Literal functions
Implicit loops
Predicate
A predicate is a multiple parameter function with just one boolean value as a return.
CHAPTER 3 THE LANGUAGE: SCALA
20
This is an example (with body definition):
def isEven (i: Int) = if (i % 2 == 0) true else false
Here is another example (without body definition):
def isPrime (p: Long)
Note that the function has no parameters, but this is weird. If a function doesn’t receive an input, then
this implies it is obtaining its data from a global variable or a shared context; both are strongly discouraged
(even prohibited) in functional programming. Yes, we know that it could take a random number or take the
system time to make its decisions, but these are special cases.
Literal Functions
In functional programming, functions are first-class citizens. In the 21st century it may sound archaic, but
programming languages that discriminate against functions still exist, usually because they are low-level
languages.
The rule of thumb is to think of it as algebra. In algebra, functions can be composed; you can make
operations with functions and pass functions as other functions parameters. If you have problems with algebra,
then sorry, this book (and programming) is not for you.... Just kidding. In this case, you can think of functions
as traditional object-oriented programming (OOP) objects. So following that idea, you define a higher-order
function in mathematics and computer science as a function that does at least one of the following:
Takes functions as arguments (as parameters)
Returns a function as a result
For example, the isEven function could be rewritten as this:
( i: Int) => i % 2 == 0
In this code, the => symbol should be thought of as a transformer .
This is a high-order function because it returns a function. Simple, isn’t it?
Yes, in mathematics, as in life, definitions are difficult but necessary to support and generalize our
theories. With examples, everything is clear.
Implicit Loops
As a final step, the isEven function could be rewritten as this:
_ % 2 == 0
The _ symbol denotes the parameter, or the thing (object, function, entity) to be used as input.
Combined with the filter method, over a list, we find expressions like these:
scala> val oneToTen = List.range(1, 10 )
oneToTen: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> val evens = nums.filter(_ % 2 == 0)
evens: List[Int] = List(2, 4, 6, 8)
The third line contains an implicit loop. Yes, in functional programming we try to avoid loops. If your
code has a lot of for s and while s, it could probably be simplified.
CHAPTER 3 THE LANGUAGE: SCALA
21
Functional is elegant and concise, but of course, there are some memory tricks that can be issued
and solved through structured programming. Throughout history, code readability has proved to be more
effective in economic terms (time and money) than hardware optimization, which has become cheaper.
Collections Hierarchy
At the top of the Scala collections hierarchy there is the Traversable class (as shown in Figure 3-1 ).
All Traversable trait children have the implementation for this method:
def foreach[U](f: Elem => U)
The Iterable trait has implementation in terms of an iterator:
def foreach[U](f: Elem => U): Unit = {
val ite = iterator
while (ite.hasNext) f(ite.next())
}
As you can see, the Iterable trait has three children: Seq , Set , and Map .
Sequences
The Seq trait represents sequences .
As shown in Figure
3-2 , Seq has three children: IndexedSeq , LinearSeq , and Buffer .
Figure 3-1. The Scala collections top hierarchy
CHAPTER 3 THE LANGUAGE: SCALA
22
A sequence is an iterable that has a length and whose elements start from zero and have fixed index
positions.
LinearSeq and IndexedSeq don’t add any new operations, but each has different performance.
LinearSeq is the list. As you know from functional programming, it has head , tail , and isEmpty
operations. It is very efficient with apply, length, and update operations.
IndexedSeq is the array. As you know from structured programming, it has the index operations. So, if
you have an array of rooms, and you write Room(101) , you access the 101st room.
Buffer is an important mutable sequence. Buffers allow you to update existing elements and to insert,
remove, and add new elements at the end.
M a p s
A map is an iterable consisting of pairs. Each pair consists of a key and a value (also called mappings or
associations ). The Map family is shown in Figure
3-3 .
Scala offers an implicit conversion that lets you write key -> value as an alternate syntax for the (key, value).
For example, Map("uno" -> 1, "dos" -> 2, "tres" -> 3) is the same as Map(("uno", 1), ("dos", 2),
("tres", 3)) , but is easier to read.
Figure 3-2. The Seq children
Figure 3-3. T h e M a p f a m i l y
CHAPTER 3 THE LANGUAGE: SCALA
23
Sets
A set is an iterable that contains no duplicate elements. As you can see in Figure 3-4 , the Set hierarchy is
similar to the Map family.
Choosing Collections
Many programmers argue that the Scala type system is difficult and cumbersome. In fact, as you saw, you
have to choose only one of these three types:
Sequence
Map
S e t
The actual decision is to choose between the mutable and immutable versions.
Sequences
There are only two sequences: the LinearSeq (list) and the IndexedSeq (array). The true effort is to learn the
names used, not the hierarchy itself (see Table
3-1 ).
Immutable Sequences
LinearSeq
List : The list as we know from the functional world.
Queue : The FIFO data structure of the traditional computer science books.
Stack : The LIFO data structure of the traditional computer science books.
Stream : Infinite, lazy and persistent; our everyday flow.
Figure 3-4. T h e S e t f a m i l y
Table 3-1. The Sequence Collections
Immutable Mutable
IndexedSeq Vector ArrayBuffer
LinearSeq List ListBuffer
CHAPTER 3 THE LANGUAGE: SCALA
24
IndexedSeq
Range : A limited list of integers.
String : The well-known and limited char sequence.
Vector : Immutable, indexed, the sedan model of the lists.
Mutable Sequences
LinearSeq
LinkedList : Those traditionally used as an introduction to the C/C++ pointers.
DoubleLinkedList : LinkedList with the “previous” method implemented.
ListBuffer : The List version of the indexed Array.
MutableList : A list for those non-functional rebels.
Queue : The FIFO for non-functional guys.
Stack : The LIFO for non-functional fellas.
IndexedSeq
Array : A list which length is constant and every element is not.
ArrayBuffer : An indexed array that always fits memory needs.
ArrayStack : LIFO implementation when performance matters.
StringBuilder : Efficient string manipulation for those with a limited memory budget.
M a p s
You have to choose either a mutable map or a sorted map.
Mutable maps
HashMap : A map whose internal implementation is a hash table.
LinkedHashMap : Elements are returned as they were inserted.
ListMap : Elements are returned as the inverse of how they were inserted.
Map : The map as everybody knows it; key-value pairs.
Immutable maps
HashMap : A map whose internal implementation is a tree.
ListMap : Elements are returned as the inverse of how they were inserted.
Map : The map as everybody knows it; key-value pairs.
SortedMap : The keys are stored in a sorted order.
TreeMap : A sorted map; the red-black tree of the traditional computer
science books.
CHAPTER 3 THE LANGUAGE: SCALA
25
Sets
You have to choose either a mutable set or a sorted set.
Mutable sets
BitSet : Used to save memory, but only integers are allowed.
HashSet : A set implemented using a hash table.
LinkedHashSet : The elements are returned as they were inserted.
TreeSet : The AVL tree of the traditional computer science books.
Set : The mutable vanilla set.
SortedSet : The mutable TreeSet, but ordered.
Immutable sets
BitSet : To save (more) memory, only integers are allowed.
HashSet : A set implemented using a tree.
ListSet : A set for the public; a list for those who knows it.
TreeSet : An immutable set implemented using a tree.
Set : The immutable vanilla set.
SortedSet : The immutable TreeSet but ordered.
Traversing
foreach is the standard method for traversing collections in Scala. Its complexity is O(n); that is, the
computation time has a linear relation with the number of elements in the input. We also have the
traditional for and the iterators, as in Java.
f o r e a c h
In Scala, the foreach method takes a function as argument. This function must have only one parameter and
it doesn’t return anything (this is called a procedure ). It operates in every element of the collection, one at a
time. The parameter type of the function must match the type of every element in the collection.
scala> val zahlen = Vector("Eins", "Zwei", "Drei")
zahlen: scala.collection.immutable.Vector[String] = Vector(Eins, Zwei, Drei)
scala> zahlen.foreach(s => print(s))
EinsZweiDrei
This function takes one character and prints it:
scala> def printAChar ( c: Char) { print ( c) }
printAChar: (c: Char)Unit
CHAPTER 3 THE LANGUAGE: SCALA
26
The function is applied to a string (a sequence of chars):
scala> "SMACK".foreach( c => printAChar(c) )
SMACK
The type inference is a useful tool in these modern times:
scala> "SMACK".foreach( printAChar )
SMACK
This is same as the preceding example but with a literal function:
scala> "SMACK".foreach( (c: Char) => print(c) )
SMACK
This is same as the preceding example but uses a type inference and literal functions:
scala> "SMACK".foreach( print )
SMACK
This example uses an implicit loop:
scala> "SMACK: Spark Mesos Akka Cassandra Kafka".split(" ")
Array[String] = Array(SMACK:, Spark, Mesos, Akka, Cassandra, Kafka)
for
As in all modern functional programming languages, we can explore all the elements of a collection with a
for loop .
Remember, foreach and for are not designed to produce new collections. If you want a new collection,
use the for / yield combo.
As we stated earlier, if it’s iterable, then it’s traversable (inheritance 101):
scala> val smack = Traversable("Spark", "Mesos", "Akka", "Cassandra", "Kafka")
smack: Traversable[String] = List(Spark, Mesos, Akka, Cassandra, Kafka)
scala> for (f <- smack) println(f)
Spark
Mesos
Akka
Cassandra
Kafka
scala> for (f <- smack) println( f.toUpperCase )
SPARK
MESOS
AKKA
CASSANDRA
KAFKA
To build a new collection, use the for/yield construct:
CHAPTER 3 THE LANGUAGE: SCALA
27
scala> val smack = Array("Spark", "Mesos", "Akka", "Cassandra", "Kafka")
smack: Array[java.lang.String] = Array(Spark, Mesos, Akka, Cassandra, Kafka)
scala> val upSmack = for (s <- smack) yield s.toUpperCase
upSmack: Array[java.lang.String] = Array(SPARK, MESOS, AKKA, CASSANDRA, KAFKA)
This for/yield construct is called for comprehension .
Now, let’s iterate a map with a for loop:
scala> val smack = Map("S" ->"Spark", "M" -> "Mesos", "A" -> "Akka", "C" ->"Cassandra", "K"
-> "Kafka")
smack: scala.collection.immutable.Map[String,String] = Map(A -> Akka, M -> Mesos, C ->
Cassandra, K -> Kafka, S -> Spark)
scala> for ((k,v) <- smack) println(s"letter: $k, means: $v")
letter: A, means: Akka
letter: M, means: Mesos
letter: C, means: Cassandra
letter: K, means: Kafka
letter: S, means: Spark
I t e r a t o r s
To iterate a collection in Java, you use hasNext() and next() . In Scala, however, they don’t exist, because
there are the map and foreach methods.
You only use iterators in Scala when reading very large streams; a file is the most common example. As
a rule of thumb, you use iterators when it’s not convenient to load all the data structure in memory.
Once it has been used, an iterator remains “exhausted,” as shown in the following:
scala> val iter = Iterator("S","M","A","C","K")
iter: Iterator[String] = non-empty iterator
scala> iter.foreach(println)
S
M
A
C
K
scala> iter.foreach(println)
As you can see, the last line didn’t produce any output, because the iterator is exhausted.
Mapping
Another way to transform collections different from the for/yield is by using the map method call with a
function as argument, as follows:
scala> val smack = Vector("spark", "mesos", "akka", "cassandra", "kafka")
smack: scala.collection.immutable.Vector[String] = Vector(spark, mesos, akka, cassandra, kafka)
CHAPTER 3 THE LANGUAGE: SCALA
28
// the long way
scala> val cap = smack.map(e => e.capitalize)
cap: scala.collection.immutable.Vector[String] = Vector(Spark, Mesos, Akka, Cassandra,
Kafka)
// the short way
scala> val cap = smack.map(_.capitalize)
cap: scala.collection.immutable.Vector[String] = Vector(Spark, Mesos, Akka, Cassandra,
Kafka)
//producing a Vector of Int
scala> val lens = smack.map(_.size)
lens: scala.collection.immutable.Vector[Int] = Vector(5, 5, 4, 9, 5)
//producing a Vector of XML elements
scala> val elem = smack.map(smack => <li>{smack}</li>)
elem: scala.collection.immutable.Vector[scala.xml.Elem] = Vector(<li>spark</li>, <li>mesos</
li>, <li>akka</li>, <li>cassandra</li>, <li>kafka</li>)
Unfortunately, Scala has type inference; that is, there is no a general rule for the collection type returned
after a mapping operation.
You can say that you are a seasoned Scala functional programmer if you can identify the comprehension
to be used: for / yield or map .
scala> val smack = List("spark", "mesos", "akka", "cassandra", "kafka")
smack: List[String] = List(spark, mesos, akka, cassandra, kafka)
// capitalize with map
scala> val m = smack.map(_.capitalize)
m: List[String] = List(Spark, Mesos, Akka, Cassandra, Kafka)
// capitalize with for/yield
scala> val y = for (s <- smack) yield s.capitalize
y: List[String] = List(Spark, Mesos, Akka, Cassandra, Kafka)
Flattening
In functional programming, the flattening process occurs when you convert a list of lists (also called sequence
of sequences or multilist ) into one list. The following is an example:
scala> val allies = List(List("Java","Scala"), List("Javascript","PHP"))
allies: List[List[String]] = List(List(Java, Scala), List(Javascript, PHP))
scala> val languages = allies.flatten
languages: List[String] = List(Java, Scala, Javascript, PHP)
The power of (functional) programming is the expressive power and simplicity. Here we capitalize, flat,
and sort all in one sentence:
scala> val jargon = allies.flatten.map(_.toUpperCase).sorted
jargon: List[String] = List(JAVA, JAVASCRIPT, PHP, SCALA)
CHAPTER 3 THE LANGUAGE: SCALA
29
When you work with connected nodes, flattening helps with the network:
val webFriends = List("Java", "JS")
val javaFriends = List("Scala", "Clojure", "Ceylon")
val jsFriends = List("PHP", "Ceylon")
val friendsOfFriends = List( javaFriends, jsFriends)
scala> val uniqueFriends = friendsOfFriends.flatten.distinct
uniqueFriends: List[String] = List(Scala, Clojure, Ceylon, PHP)
As you may guess, flattening a string produces a list of its chars:
scala> val stuff = List("SMACK", "Scala")
stuff: List[String] = List(SMACK, Scala)
scala> stuff.flatten
List[Char] = List(S, M, A, C, K, s, c, a, l, a)
If a collection contains elements of type None , flattening removes them.
If a collection contains elements of type Some , flattening strips them:
scala> val boxes = Vector(Some("Something"), None, Some(3.14), None)
boxes: scala.collection.immutable.Vector[Option[Any]] = Vector(Some(Something), None,
Some(3.14), None)
scala> boxes.flatten
res1: scala.collection.immutable.Vector[Any] = Vector(Something, 3.14)
Filtering
In functional programming, filtering traverses a collection and builds a new collection with elements that
match specific criteria. This criteria must be a predicate. You apply the predicate to each collection element,
for example:
scala> val dozen = List.range(1, 13)
dozen: List[Int] = List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
scala> val multiplesOf3 = dozen.filter(_ % 3 == 0)
multiplesOf3: List[Int] = List(3, 6, 9, 12)
scala> val languages = Set("Java", "Scala", "Clojure", "Ceylon")
languages: scala.collection.immutable.Set[String] = Set(Java, Scala, Clojure, Ceylon)
scala> val c = languages.filter(_.startsWith("C"))
c: scala.collection.immutable.Set[String] = Set(Clojure, Ceylon)
scala> val s = languages.filter(_.length < 6)
s: scala.collection.immutable.Set[String] = Set(Java, Scala)
CHAPTER 3 THE LANGUAGE: SCALA
30
Filtering has the following two rules:
1 . The filter doesn’t modify the collection. You must keep the result in a new one.
2 . Only the elements whose predicate returns true are kept.
Extracting
In this section, we are going to examine the methods to extract subsequences. The following are examples.
// We declare an array of Int from 1 to 9
scala> val magic = (0 to 9).toArray
magic: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
// Without the first N elements
scala> val d = magic.drop(3)
d: Array[Int] = Array(3, 4, 5, 6, 7, 8, 9)
// Without the elements matching a predicate
scala> val dw = magic.dropWhile(_ < 4)
dw: Array[Int] = Array(4, 5, 6, 7, 8, 9)
// Without the last N elements
scala> val dr = magic.dropRight(4)
dr: Array[Int] = Array(0, 1, 2, 3, 4, 5)
// Just the first N elements
scala> val t = magic.take(5)
t: Array[Int] = Array(0, 1, 2, 3, 4)
// Just the first elements matching a predicate (from the left)
scala> val tw = magic.takeWhile(_ < 4)
tw: Array[Int] = Array(0, 1, 2, 3)
// Just the last N elements
scala> val tr = magic.takeRight(3)
tr: Array[Int] = Array(7, 8, 9)
// the subsequence between the index A and B
scala> val sl = magic.slice(1,7)
sl: Array[Int] = Array(1, 2, 3, 4, 5, 6)
T h e List methods are used to achieve functional purity.
// head, the first element
scala> val h = magic.head
h: Int = 0
// the head boxed (to prevent errors)
scala> val hb = magic.headOption
hb: Option[Int] = Some(0)
CHAPTER 3 THE LANGUAGE: SCALA
31
// the list without the last element
scala> val in = magic.init
in: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8)
// the last element
scala> val ta = magic.last
ta: Int = 9
// the last boxed (to prevent errors)
scala> val lo = magic.lastOption
lo: Option[Int] = Some(9)
// all the list without the first element (known as tail)
scala> val t = magic.tail
t: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
Splitting
For those fans of the database perspective, there are methods to discriminate lists. We split samples into two
groups, as follows.
// Here, a sample list
scala> val sample = List(-12, -9, -3, 12, 18, 15)
sample: List[Int] = List(-12, -9, -3, 12, 18, 15)
// lets separate our sample in two groups
scala> val teens = sample.groupBy(_ > 10)
teens: scala.collection.immutable.Map[Boolean,List[Int]] = Map(false -> List(-12, -9, -3),
true -> List(12, 18, 15))
// to access the generated groups
scala> val t = teens(true)
t: List[Int] = List(12, 18, 15)
scala> val f = teens(false)
f: List[Int] = List(-12, -9, -3)
// partition does the same as groupBy but it returns a List with two Lists
scala> val teens = sample.partition(_ > 10)
teens: (List[Int], List[Int]) = (List(12, 18, 15),List(-12, -9, -3))
// span the list, in one list with the longest index who meets the predicate
scala> val negs = sample.span(_ < 0)
negs: (List[Int], List[Int]) = (List(-12, -9, -3),List(12, 18, 15))
// splitAt generates two lists, one before the index at N, and the rest
scala> val splitted = sample.splitAt(2)
splitted: (List[Int], List[Int]) = (List(-12, -9),List(-3, 12, 18, 15))
// partition can assign the result to a Tuple
scala> val (foo, bar) = sample.partition(_ > 10)
foo: List[Int] = List(12, 18, 15)
bar: List[Int] = List(-12, -9, -3)
CHAPTER 3 THE LANGUAGE: SCALA
32
Unicity
If you want to remove duplicates in a collection, only use unique elements. The following are some
examples.
scala> val duplicated = List("A", "Y", "Y", "X", "X", "Z")
duplicated: List[String] = List(A, Y, Y, X, X, Z)
// The first option is using distinct
scala> val u = duplicated.distinct
u: List[String] = List(A, Y, X, Z)
// the second is is converting the Collection to a Set, duplicates not allowed
scala> val s = duplicated.toSet
s: scala.collection.immutable.Set[String] = Set(A, Y, X, Z)
Merging
For merging and subtracting collections , use ++ and -- . The following show some of examples.
// The ++= method could be used in any mutable collection
scala> val nega = collection.mutable.ListBuffer(-30, -20, -10)
nega: scala.collection.mutable.ListBuffer[Int] = ListBuffer(-30, -20, -10)
// The result is assigned to the original collection, and it is mutable
scala> nega ++= Seq(10, 20, 30)
res0: nega.type = ListBuffer(-30, -20, -10, 10, 20, 30)
scala> val tech1 = Array("Scala", "Spark", "Mesos")
tech1: Array[String] = Array(Scala, Spark, Mesos)
scala> val tech2 = Array("Akka", "Cassandra", "Kafka")
tech2: Array[String] = Array(Akka, Cassandra, Kafka)
// The ++ method merge two collections and return a new variable
scala> val smack = tech1 ++ tech2
smack: Array[String] = Array(Scala, Spark, Mesos, Akka, Cassandra, Kafka)
We have the classic Set operations from Set Theory.
scala> val lang1 = Array("Java", "Scala", "Ceylon")
lang1: Array[String] = Array(Java, Scala, Ceylon)
scala> val lang2 = Array("Java", "JavaScript", "PHP")
lang2: Array[String] = Array(Java, JavaScript, PHP)
// intersection, the elements in both collections
scala> val inter = lang1.intersect(lang2)
inter: Array[String] = Array(Java)
// union, the elements in both collections
CHAPTER 3 THE LANGUAGE: SCALA
33
scala> val addition = lang1.union(lang2)
addition: Array[String] = Array(Java, Scala, Ceylon, Java, JavaScript, PHP)
// to discriminate duplicates we use distinct
scala> val substraction = lang1.union(lang2).distinct
substraction: Array[String] = Array(Java, Scala, Ceylon, JavaScript, PHP)
T h e diff method results depend on which sequence it’s called on (in set theory, A-B is different from B-A):
// difference, the elements in one set that are not in the other
scala> val dif1 = lang1 diff lang2
dif1: Array[String] = Array(Scala, Ceylon)
scala> val dif2 = lang2 diff lang1
dif2: Array[String] = Array(JavaScript, PHP)
Lazy Views
In functional programming, we call something “lazy” when it doesn’t appear until it is needed. A lazy view is
a version of a collection computed and returned when it is actually needed.
By contrast, in Java, all the memory is allocated immediately when the collection is created.
The difference between these two lines could save a lot of memory:
scala> 0 to 25
res0: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10,
11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
scala> (0 to 25).view
res1: scala.collection.SeqView[Int,scala.collection.immutable.IndexedSeq[Int]] = SeqView(...)
To force the memory allocation of a view, use the force instruction:
scala> val v = (0 to 25).view
v: scala.collection.SeqView[Int,scala.collection.immutable.IndexedSeq[Int]] = SeqView(...)
scala> val f = v.force
f: scala.collection.immutable.IndexedSeq[Int] = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11,
12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25)
Mixing views with the map method significantly improves the performance of your programs. In the
following example, increasing the bounds causes your CPU to struggle.
scala> (0 to 100).map { _ * 3 }
res0: scala.collection.immutable.IndexedSeq[Int] = Vector(0, 3, 6, 9, 12, 15, 18, 21, 24,
27, 30, 33, 36, 39, 42, 45, 48, 51, 54, 57, 60, 63, 66, 69, 72...
scala> (0 to 100).view.map { _ * 3 }
res1: scala.collection.SeqView[Int,Seq[_]] = SeqViewM(...)
CHAPTER 3 THE LANGUAGE: SCALA
34
Good programmers (functional or SQL) know well the views benefits:
Performance (the reason that you’re reading this book)
The data structure is similar to database views
Database views were created to allow modifications on big result sets and tables without compromising
the performance.
// lets create an array
scala> val bigData = Array("B", "I", "G", "-", "D", "A", "T", "A")
bigData: Array[String] = Array(B, I, G, -, D, A, T, A)
// and a view over the first elements
scala> val view = bigData.view.slice(0, 4)
view: scala.collection.mutable.IndexedSeqView[String,Array[String]] = SeqViewS(...)
// we modify the VIEW
scala> view(0) = "F"
scala> view(1) = "A"
scala> view(2) = "S"
scala> view(3) = "T"
// voilá, our original array was modified
scala> bigData
res0: Array[String] = Array(F, A, S, T, D, A, T, A)
Sorting
To sort, you use the sorted method with the < , <= , > , and >= operators. The following are some examples.
// sorting Strings
scala> val foo = List("San Francisco", "London", "New York", "Tokio").sorted
foo: List[String] = List(London, New York, San Francisco, Tokio)
// sorting numbers
scala> val bar = List(10, 1, 8, 3.14, 5).sorted
bar: List[Double] = List(1.0, 3.14, 5.0, 8.0, 10.0)
// ascending
scala> List(10, 1, 8, 3.14, 5).sortWith(_ < _)
res0: List[Double] = List(1.0, 3.14, 5.0, 8.0, 10.0)
// descending
scala> List(10, 1, 8, 3.14, 5).sortWith(_ > _)
res0: List[Double] = List(10.0, 8.0, 5.0, 3.14, 1.0)
// ascending alphabetically
scala> List("San Francisco", "London", "New York", "Tokio").sortWith(_ < _)
res0: List[String] = List(London, New York, San Francisco, Tokio)
CHAPTER 3 THE LANGUAGE: SCALA
35
// descending alphabetically
scala> List("San Francisco", "London", "New York", "Tokio").sortWith(_ > _)
res0: List[String] = List(Tokio, San Francisco, New York, London)
// ascending by length
scala> List("San Francisco", "London", "New York", "Tokio").sortWith(_.length < _.length)
res0: List[String] = List(Tokio, London, New York, San Francisco)
// descending by length
scala> List("San Francisco", "London", "New York", "Tokio").sortWith(_.length > _.length)
res0: List[String] = List(San Francisco, New York, London, Tokio)
Streams
Just as views are the lazy version of collections, streams are the lazy version of lists. Here we taste some
stream power:
scala> val torrent = (0 to 900000000).toStream
torrent: scala.collection.immutable.Stream[Int] = Stream(0, ?)
scala> torrent.head
res0: Int = 0
scala> torrent.tail
res1: scala.collection.immutable.Stream[Int] = Stream(1, ?)
scala> torrent.take(3)
res2: scala.collection.immutable.Stream[Int] = Stream(0, ?)
scala> torrent.filter(_ < 100)
res3: scala.collection.immutable.Stream[Int] = Stream(0, ?)
scala> torrent.filter(_ > 100)
res4: scala.collection.immutable.Stream[Int] = Stream(101, ?)
scala> torrent.map{_ * 2}
res5: scala.collection.immutable.Stream[Int] = Stream(0, ?)
scala> torrent(5)
res6: Int = 5
Arrays
Scala is a strong typed language. It determines the array type if it’s not specified.
// in numeric, the biggest data type determines the Collection type
scala> Array(6.67e-11, 3.1415, 333F, 666L)
res0: Array[Double] = Array(6.67E-11, 3.1415, 333.0, 666.0)
CHAPTER 3 THE LANGUAGE: SCALA
36
// we can force manually the type
scala> Array[Number] (6.67e-11, 3.1415, 333F, 666L)
res0: Array[Number] = Array(6.67E-11, 3.1415, 333.0, 666)
There are several ways to create and initialize arrays:
// from Range
scala> val r = Array.range(0, 16)
r: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15)
// from Range with step
scala> val rs = Array.range(-16, 16, 3)
rs: Array[Int] = Array(-16, -13, -10, -7, -4, -1, 2, 5, 8, 11, 14)
// with fill
scala> val f = Array.fill(3)("ha")
f: Array[String] = Array(ha, ha, ha)
// with tabulate
scala> val t = Array.tabulate(9)(n => n * n)
t: Array[Int] = Array(0, 1, 4, 9, 16, 25, 36, 49, 64)
// from List
scala> val a = List("Spark", "Mesos", "Akka", "Cassandra", "Kafka").toArray
a: Array[String] = Array(Spark, Mesos, Akka, Cassandra, Kafka)
// from String
scala> val s = "ELONGATION".toArray
s: Array[Char] = Array(E, L, O, N, G, A, T, I, O, N)
// Scala Arrays corresponds to Java Arrays
scala> val bigData = Array("B", "I", "G", "-", "D", "A", "T", "A")
bigData: Array[String] = Array(B, I, G, -, D, A, T, A)
scala> bigData(0) = "F"
scala> bigData(1) = "A"
scala> bigData(2) = "S"
scala> bigData(3) = "T"
scala> bigData
bigData: Array[String] = Array(F, A, S, T, D, A, T, A)
ArrayBuffers
An ArrayBuffer is an array with dynamic size. The following are some examples.
// initialization with some elements
val cities = collection.mutable.ArrayBuffer("San Francisco", "New York")
// += to add one element
cities += "London"
CHAPTER 3 THE LANGUAGE: SCALA
37
// += to add multiple elements
cities += ("Tokio", "Beijing")
// ++= to add another collection
cities ++= Seq("Paris", "Berlin")
// append, to add multiple elements
cities.append("Sao Paulo", "Mexico")
Queues
The queue follows the first-in, first-out (FIFO) data structure. The following are some examples.
// to use it we need to import it from collection mutable
scala> import scala.collection.mutable.Queue
import scala.collection.mutable.Queue
// here we create a Queue of Strings
scala> var smack = new Queue[String]
smack: scala.collection.mutable.Queue[String] = Queue()
// += operator, to add an element
scala> smack += "Spark"
res0: scala.collection.mutable.Queue[String] = Queue(Spark)
// += operator, to add multiple elements
scala> smack += ("Mesos", "Akka")
res1: scala.collection.mutable.Queue[String] = Queue(Spark, Mesos, Akka)
// ++= operator, to add a Collection
scala> smack ++= List("Cassandra", "Kafka")
res2: scala.collection.mutable.Queue[String] = Queue(Spark, Mesos, Akka, Cassandra, Kafka)
// the Queue power: enqueue
scala> smack.enqueue("Scala")
scala> smack
res3: scala.collection.mutable.Queue[String] =
Queue(Spark, Mesos, Akka, Cassandra, Kafka, Scala)
// its counterpart, dequeue
scala> smack.dequeue
res4: String = Spark
// dequeue remove the first element of the queue
scala> smack
res5: scala.collection.mutable.Queue[String] = Queue(Mesos, Akka, Cassandra, Kafka, Scala)
// dequeue, will take the next element
scala> val next = smack.dequeue
next: String = Mesos
CHAPTER 3 THE LANGUAGE: SCALA
38
// we verify that everything run as the book says
scala> smack
res6: scala.collection.mutable.Queue[String] = Queue(Akka, Cassandra, Kafka, Scala)
T h e dequeueFirst and dequeueAll methods dequeue the elements matching the predicate.
scala> val smack = Queue("Spark", "Mesos", "Akka", "Cassandra", "Kafka")
smack: scala.collection.mutable.Queue[String] = Queue(Spark, Mesos, Akka, Cassandra, Kafka)
// remove the first element containing a k
scala> smack.dequeueFirst(_.contains("k"))
res0: Option[String] = Some(Spark)
scala> smack
res1: scala.collection.mutable.Queue[String] = Queue(Mesos, Akka, Cassandra, Kafka)
// remove all the elements beginning with A
scala> smack.dequeueAll(_.startsWith("A"))
res2: scala.collection.mutable.Seq[String] = ArrayBuffer(Akka)
scala> smack
res3: scala.collection.mutable.Queue[String] = Queue(Mesos, Cassandra, Kafka)
Stacks
The stack follows the last-in, first-out (LIFO) data structure. The following are some examples.
// to use it we need to import it from collection mutable
scala> import scala.collection.mutable.Stack
import scala.collection.mutable.Stack
// here we create a Stack of Strings
scala> var smack = Stack[String]()
smack: scala.collection.mutable.Stack[String] = Stack()
// push, to add elements at the top
scala> smack.push("Spark")
res0: scala.collection.mutable.Stack[String] = Stack(Spark)
scala> smack.push("Mesos")
res1: scala.collection.mutable.Stack[String] = Stack(Mesos, Spark)
// push, to add multiple elements
scala> smack.push("Akka", "Cassandra", "Kafka")
res2: scala.collection.mutable.Stack[String] = Stack(Kafka, Cassandra, Akka, Mesos, Spark)
// pop, to take the last element inserted
scala> val top = smack.pop
top: String = Kafka
scala> smack
res3: scala.collection.mutable.Stack[String] = Stack(Cassandra, Akka, Mesos, Spark)
CHAPTER 3 THE LANGUAGE: SCALA
39
// top, to access the last element without extract it
scala> smack.top
res4: String = Cassandra
// "Cassandra" is still on the top
scala> smack
res5: scala.collection.mutable.Stack[String] = Stack(Cassandra, Akka, Mesos, Spark)
// size, the Seq method to know the number of elements
scala> smack.size
res6: Int = 4
// isEmpty, another Seq method
scala> smack.isEmpty
res7: Boolean = false
// clear, to empty all the stack suddenly
scala> smack.clear
scala> smack
res9: scala.collection.mutable.Stack[String] = Stack()
Ranges
Ranges are most commonly used with loops, as shown in the following examples.
// to, to make a range from a to b (upper limit is included)
scala> 0 to 6
res0: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3, 4, 5, 6)
// until, to make a range from 0 to 7 (upper limit not included)
scala> 0 until 6
res1: scala.collection.immutable.Range.Inclusive = Range(0, 1, 2, 3, 4, 5)
// by, to specify a step (in this case, every 3)
scala> 0 to 21 by 3
res2 scala.collection.immutable.Range = Range(0, 3, 6, 9, 12, 15, 18, 21)
// to, also function with chars
scala> 'a' to 'k'
res3: scala.collection.immutable.NumericRange.Inclusive[Char] = NumericRange(a, b, c, d, e,
f, g, h, i, j, k)
// a Range toList
scala> val l = (0 to 16).toList
l: List[Int] = List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)
// a Range toArray
scala> val a = (0 to 16).toArray
a: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)
CHAPTER 3 THE LANGUAGE: SCALA
40
// a Range toSet
scala> val s = (0 to 10).toSet
s: scala.collection.immutable.Set[Int] = Set(0, 5, 10, 1, 6, 9, 2, 7, 3, 8, 4)
// Array has a range method (upper limit excluded)
scala> val a = Array.range(0, 17)
a: Array[Int] = Array(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)
// Vector has a range method (upper limit excluded)
scala> val v = Vector.range(0, 10)
v: collection.immutable.Vector[Int] = Vector(0, 1, 2, 3, 4, 5, 6, 7, 8, 9)
// List has a range method (upper limit excluded)
scala> val l = List.range(0, 17)
l: List[Int] = List(0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16)
// A list with numbers in a range with a step of 5
scala> val l = List.range(0, 50, 5)
l: List[Int] = List(0, 5, 10, 15, 20, 25, 30, 35, 40, 45)
// An ArrayBuffer with characters in a range
scala> val ab = collection.mutable.ArrayBuffer.range('a', 'f')
ab: scala.collection.mutable.ArrayBuffer[Char] = ArrayBuffer(a, b, c, d, e)
// An old fashioned for loop using a range
scala> for (i <- 1 to 5) println(i)
1
2
3
4
5
Summary
Since all the examples in this book are in Scala, we need to reinforce it before beginning our study. This
chapter provided a review of Scala. We studied the fundamental parts of the language. Programming is about
data structures and algorithms. In this chapter, we discussed the Scala type system (the data structures) and
the principal concepts of functional programming.
The use of object-oriented programming (OOP) in past decades was an era of reusable software
components. Things no longer work that way. Now components interoperate by exchanging immutable data
structures (lists, maps, and sets), which is more like functional programming.
In the next chapter, we review an actor model implementation called Akka. To fully understand the
examples, you need to know the Scala programming language.
41
© Raul Estrada and Isaac Ruiz 2016
R. Estrada and I. Ruiz, Big Data SMACK, DOI 10.1007/978-1-4842-2175-4_4
CHAPTER 4
The Model: Akka
Welcome to the chapter on the SMACK stack model . The A stands for Akka. If the previous chapter’s
objective was to develop functional thinking, this chapter’s objective is to develop actor model thinking.
The chapter on Scala was focused on moving your mind from a structured programming paradigm to
functional programming thinking. This chapter shifts from the object-oriented paradigm to actors-based
programming.
This chapter has three parts:
Actor model
Actor communication
Actor lifecycle
The actor model is fundamental to understanding the SMACK operation. So, by the end of this chapter,
we hope that you can model in terms of actors.
The Actor Model
The Sámi people were the first to inhabit northern Scandinavia. Until the Middle Age, its culture and way of
life (fishing, hunting, and trading) dominated the north of Sweden, Norway, Finland, and the Kola Peninsula
in Russia. In Sámi mythology, the goddess Akka represented beauty and goodness in the world. According
to Sámi people, Akkas representation on Earth is a beautiful mountain in Laponia, located in northern
Sweden.
In the platforms context, the letters A and K stand for actor kernel . It is for this reason that the platform
is called Akka and its symbol is the Akka mountain (see Figure
4-1 ).
Figure 4-1. The original and modern Akka logos, representing the Akka mountain
CHAPTER 4 THE MODEL: AKKA
42
The actor model is a mathematical model developed by Carl Hewitt, Peter Bishop, and Richard Steiger at
MIT in 1973 and presented in a paper called “A Universal Modular Actor Formalism for Artificial Intelligence”.
1
So, you may argue that if the actor model is more than 40 years old, why have we been dealing with
another paradigm all of this time? The answer is not simple. When the actor model was developed, hardware
(and memory) was very expensive. Today, it’s just the opposite: hardware is dirt cheap and programmers
are expensive.
To land this idea, consider computer science history. If hardware is very expensive, then to program,
you have to optimize and deal with low-level concepts and implementations related to the hardware. So you
have to think in terms of interruptions, assembly language, and pointers to (physical) memory locations.
As programming language has a higher level, we can ignore the details related to hardware and start talking
in terms that have nothing to do with implementation but with abstraction. Think in concepts as a recursive call,
or function composition, which is hard to do if you have to deal with low-level hardware implementations.
Threads and Labyrinths
Between 1980 and 2003, we experienced the rise and dominance of object-oriented languages. These years
were the dark ages of functional programming. Functional languages were spoken only in academic and
scientific environments, barely related to industry.
An interesting problem arose with object-oriented programming: the implementation of concurrency
and parallelism. These two concepts are the Achilles’ heel of structured and object-oriented programming.
Imagine an implementation of threads in C ++ or Java; complexity is vast and proneness to error is very large.
Concurrency is not easy; making more than one thing with a program is related to dealing with race
conditions, semaphores, mutexes, locks, shared data, and all the stuff related to multithreading. This
includes basic issues to determine precisely what a program with several threads is doing, or when a variable
is being accessed from multiple threads, or what its value is at a given point in time, or how to know if there
are two threads in standby, and if this condition is going to release them (and when) or if it is a deadlock.
Unfortunately, thread-based concurrency gives more challenges than solutions.
Today there is a lot of technical debt in the proprietary thread implementations of concurrency issues.
No one wants to touch huge systems because the code is complex and the chances of a change breaking
everything are very high.
In this context, functional programming experienced rebirth. With the release of Scala in 2003, F#
in 2005, Clojure in 2007, and Elixir in 2012, the actor model approach was declared the winner in solving
concurrency issues.
Actors 101
Actors are objects. An actor is an object that sends and receives messages. According to the actor
specification, the order of the received messages is not relevant; but in Akka, there is an implementation
called a mailbox , which is a stack where messages are consumed.
What the actor does with a received message depends on how you solve a specific problem. The actor
could handle the message internally, it could send a message to another actor, it could create another actor,
or it could take an action with the message. An Akka actor is a high-level abstraction.
The following are the main comparison points between OOP and actors:
Unit . In OOP, the smallest processing unit is the object; in the actor model, it is the
actor. We already know that in Akka, the actors are objects but an actor is a more
bounded representation of reality than an object.
1 Carl Hewitt, Peter Bishop, and Richard Steiger, A Universal Modular Actor Formalism for Artificial Intelligence
(
http://dl.acm.org/citation.cfm?id=1624804 ,1973).
CHAPTER 4 THE MODEL: AKKA
43
Encapsulation . In both models, the smallest processing unit encapsulates state and
behavior. In OOP, the state is determined by the value of the attributes in a given
time, and behavior is ruled by the class methods. In the actor model, the state is
determined by the messages; if there are no messages in the mailbox, the actor will
wait indefinitely for messages.
Access . In OOP, executing object methods from outside of the object is allowed
(but not recommended), as well as access and modify object fields from outside
of the object. In the actor model, access to the actor's methods or fields is strictly
prohibited; all the communication must be done through messages.
Globals . In OOP, for example, there is the Singleton pattern, which is a class with a
single instance. Global variables and class variables exist, but they are discouraged. In
the actor model, global variables don’t exist. A shared global state doesn’t exist either.
Messages . In OOP, the messages between Objects could be mutable. In the actor
model, the messages between actors are strictly immutable.
Exceptions . In OOP exists the traditional and well-known try-catch approach,
which is the most complex way to handle exceptions because you have to manage
all the possible values of the variables involved. In the actor model, the “let it crash
approach exists; if something fails, let it fail. The exception scenario could affect only
the actor involved, not the complete environment.
Concurrency and parallelism . In OPP, the most used approach is the thread model,
which is a complex solution to the problem. In the actor model, you don’t have to
worry about concurrency and parallelism, because if everything follows the actor
convention, there is no problem with parallelism.
The following are the Lightbend recommendations for the actor model:
Think of actors as employees. Think of each actor model as a company.
Think of the actor’s siblings as people in the same hierarchical level.
Think of the actor’s children as the employee's subordinates.
An actor has one (and only one) supervisor—the actor who created it.
Actor model success is the delegation to subordinates.
The Akka implementation of the actor model has these peculiarities:
When you create an actor, Akka provides an ActorRef.
Actors run in real Java threads. Some actors could share the same thread.
There are three mailbox types: Unbounded, Bounded, and Priority.
Actors can scan their mailboxes to look for specific messages.
There is a “dead letter” mailbox with all the actors’ terminated messages.
The Lightbend Reactive Platform (
www.lightbend.com ) is a family of five members, described as follows:
Scala : The programming language. The Reactive Platform fully supports both Java
and Scala, so you can choose what is best for you.
Akka : Message-driven runtime. At the center of the Reactive Platform is Akka, a
message-driven middleware or runtime with Scala and Java APIs.
CHAPTER 4 THE MODEL: AKKA
44
Spark : Apache Spark, which is written in Scala and Akka, is a fast data engine to fuel
Reactive applications.
Lagom : Reactive microservices framework. An opinionated framework for building
web microservices.
Play : Just-hit-reload web framework. The Play Framework is the just-hit-reload web
development framework with Scala and Java APIs.
We have always said that Apache Spark is the Scala “killer app.
In this book, we only cover Scala, Akka, and Spark, but if you are an enthusiastic web service developer
or web developer, don’t pass on the opportunity to explore Lagom and Play, respectively, more deeply.
Installing Akka
Well, enough theory , let's get our feet wet.
The first thing you have to do is go to
http://akka.io/downloads/ , as shown in Figure 4-2 .
Figure 4-2. The Akka download page
Then download the Lightbend Activator according to your platform and operating system. Lightbend is
the company behind Akka; it builds and maintains the Akka message-driven runtime. Follow the installation
instructions from the web page.
CHAPTER 4 THE MODEL: AKKA
45
After downloading and extracting the package, go to the directory (in this example, we use version 1.3.10):
%> cd activator-dist-1.3.10/bin
Then, execute the activator shell:
%> activator ui
Now go to http://127.0.0.1:8888 . You'll see a web page like the one shown in Figure 4-3 .
Figure 4-3. Lightbend Activator main page
Now select the Hello Akka! application and click the “Create app” button, as shown in Figure 4-4 .
CHAPTER 4 THE MODEL: AKKA
46
Figure 4-4. Creating an Akka application from a template
Figure 4-5. IntelliJ IDEA Community Edition
Now open your IDE. In this case, we used the IntelliJ IDEA Community Edition, as shown in Figure 4-5 .
CHAPTER 4 THE MODEL: AKKA
47
Select Open. Enter the directory in which you created the project (see Figure 4-6 ).
Figure 4-6. IntelliJ IDEA open existing project dialog
CHAPTER 4 THE MODEL: AKKA
48
Figure 4-7. IntelliJ IDEA import SBT project dialog
We select both modules inside our project (see Figure
4-7 ).
CHAPTER 4 THE MODEL: AKKA
49
Now you have a fully functional Hello World! Akka project (see Figure 4-8 ).
Figure 4-8. Hello World! Akka project on IntelliJ IDEA
CHAPTER 4 THE MODEL: AKKA
50
Figure 4-9. Lightbend Activator, the Typesafe web IDE
You can build, code, run, and test your Akka applications from your browser.
As you can see in Figure
4-10 , there are a lot of project templates to play with. The Akka world is vast,
and it’s beyond the scope of this book to cover the entire Reactive universe.
As you can see in Figure
4-9 , the Lightbend Activator is a full web IDE.
CHAPTER 4 THE MODEL: AKKA
51
Akka Actors
It is essential to recall that before Scala version 2.10, there was a scala.actors package. The Scala actors
library was deprecated in March 2013 and replaced by Akka. Scala actor models prior to version 2.10 should
no longer be used.
A c t o r s
For our first example, we use a multilanguage greeter; that is, we enter a particular language and the
program responds by replying “Good day,” in the language specified.
import akka.actor.Actor
import akka.actor.ActorSystem
import akka.actor.Props
class GreeterActor extends Actor {
def receive = {
case "en" => println("Good day")
case "es" => println("Buen dia")
case "fr" => println("Bonjour")
case "de" => println("Guten Tag")
case "pt" => println("Bom dia")
case _ => println(":(")
}
}
Figure 4-10. Lightbend Reactive Platform
CHAPTER 4 THE MODEL: AKKA
52
object Main extends App {
// build the ActorSystem
val actorSystem = ActorSystem("MultilangSystem")
// instantiate the actor
val greeter = actorSystem.actorOf(Props[GreeterActor], name = "GreeterActor")
// send the actor some messages
greeter ! "en"
greeter ! "es"
greeter ! "fr"
greeter ! "de"
greeter ! "pt"
greeter ! "zh-CN"
// shut down the actor system
actorSystem.shutdown
}
When we run this program, the output is as follows:
$ sbt run
[info] Running Main
Good day
Buen dia
Bonjour
Guten Tag
Bom dia
:(
Process finished with exit code 0
Here is the step-by-step explanation:
When using Akka actors, you need to import the akka.actor._ package. You at least
need these classes: Actor , ActorSystem , and Props .
You must define a class of type Actor . In this case, we called it GreeterActor . You can
use any name that you want.
The actor main performance must be defined under the actor receive() method.
The structure of the receive() method is typical in functional programming; it is
called a match expression . The lines should always go from the most specific case to
the most general case. The most specific case is at the top and the more general case
is at the bottom.
The last line in the match expression should be (as good practice; not required) the
default case; it says what to do if the pattern didn’t find a match.
Recall that the underscore operator ( _ ) in Scala means whatever. If the message
doesn’t find a match, an UnhandledMessage exception is thrown, which represents
poor programming practices.
CHAPTER 4 THE MODEL: AKKA
53
You create a Main object that extends a Scala app to give your actor a scenario that
displays their histrionic abilities.
You first need an actor system. Any name with alphanumeric characters is good;
hyphens are allowed but not as the first character.
To liven up your actor, invoke the actorOf method in the actor system. The actor will
start asynchronously.
If you want to interact, send messages with the ! operator.
To finish the play, you must call the shutdown method in the actor system.
Actor System
Imagine the actor system as a theater company, an actors union, or a circus:
There is a hierarchy; each actor always has a supervisor and an actor can have
siblings and children.
Actors belonging to the same system of actors share dispatchers, deployments, and
addresses.
The actor system is the meeting point where the actors are created and searched.
Internally, the actor system is a thread controller; the actor system decides when to
create threads for an application.
If the system does not turn off the actors (with the shutdown method), the application
will not end. As long as the actor system is running, the application will continue
running.
Actor Reference
Imagine the actor reference as the actor agent; that is, someone who represents them and receives letters
from his fans:
The actor systems actorOf method has two main tasks: start the actor
asynchronously and return the ActorRef requested.
The ActorRef is a handler, so you cannot directly access the actor and break the actor
system encapsulation rules.
The ActorRef follows the facade pattern over the actor; that is, it serves as a way to
communicate with the actor without directly accessing the actor. Thus, you never
access actor variables and methods directly, as dictated by the encapsulation
principles.
The ActorRef is immutable; you cannot change it because it is only a reference.
An actor has one (and only one) ActorRef. An ActorRef refers to one (and only one)
actor. It is a one-to-one relationship.
To comply with the Akka actor model, the ActorRef is serializable and server
independent, so you can distribute, copy, and pass references (to the actors’ fans)
across the network.
CHAPTER 4 THE MODEL: AKKA
54
Actor Communication
Actor communication is always more easily explained by example. Just remember that to send a message to
an actor, you use the ! operator.
The ! operator always works with ActorRefs; never between Actor class instances.
In Akka, when you send a message to an actor with the ! operator, the actor that receives the message
also receives a reference to the actor that sent the message; this reference is accessed by the sender variable.
And it helps to send the actor invoker response messages. Recall that sender is a reserved word; use it wisely.
import akka.actor._
case object SendANewCat
case object LiveALife
case object BackToHeaven
case object LifeSpended {
var remaining = 0// a default value
}
class God(indulged: ActorRef) extends Actor {
def receive = {
case SendANewCat =>
println("GOD: Go!, you have seven lives")
indulged ! LiveALife
case LifeSpended =>
if ( LifeSpended.remaining == 0){
println("GOD: Time to Return!")
indulged ! BackToHeaven
context.stop(self)
}
else {
println("GOD: one live spent, " + LifeSpended.remaining + " remaining.")
indulged ! LiveALife
}
case _ => println("GOD: Sorry, I don't understand")
}
}
class Cat extends Actor {
var lives = 7 // All the cats born with 7 lives
def receive = {
case LiveALife =>
println("CAT: Thanks God, I still have " + lives + " lives")
lives -= 1
LifeSpended.remaining = lives
sender ! LifeSpended
case BackToHeaven =>
println("CAT: No more lives, going to Heaven")
context.stop(self)
case _ => println("CAT: Sorry, I don't understand")
}
}
CHAPTER 4 THE MODEL: AKKA
55
object CatLife extends App {
val system = ActorSystem("CatLifeSystem")
val sylvester = system.actorOf(Props[Cat], name = "Sylvester")
val catsGod = system.actorOf(Props(new God(sylvester)), name = "CatsGod")
// God sends a Cat
catsGod ! SendANewCat
system.terminate()
}
Running our example, we get this output:
GOD: Go!, you have seven lives
CAT: Thanks God, I still have 7 lives
GOD: one live spent, 6 remaining.
CAT: Thanks God, I still have 6 lives
GOD: one live spent, 5 remaining.
CAT: Thanks God, I still have 5 lives
GOD: one live spent, 4 remaining.
CAT: Thanks God, I still have 4 lives
GOD: one live spent, 3 remaining.
CAT: Thanks God, I still have 3 lives
GOD: one live spent, 2 remaining.
CAT: Thanks God, I still have 2 lives
GOD: one live spent, 1 remaining.
CAT: Thanks God, I still have 1 lives
GOD: Time to Return!
CAT: No more lives, going to Heaven
Process finished with exit code 0
Here is an actor communication example analysis:
It is always advisable to model messages as classes within your application; in our
example, we have four objects that we use as a message:
SendANewCat case object
LiveALife case object
BackToHeaven case object
LifeSpended case object
CatLife is the application in which we have the main application. The first line
creates the actor system and calls it CatLifeSystem.
On the next line, we create an ActorRef for the Cat class. An ActorRef to a cat actor is
loaded in the sylvester variable.
We then create the god actor. Note that the constructor receives a reference to his
indulged cat. This was used to show the relationship between the actors; we could
have declared a constructor with no arguments, but this was only for demonstration
purposes.
CHAPTER 4 THE MODEL: AKKA
56
Then, we send a message to god requesting a new cat.
When the god actor receives the message, it starts the cat’s life, until we reach the life
limit, then we stop this actor context.
The context object is available to all actors in the actor system. It is used to stop the
actors together.
It is important to recall that cat, god, and indulged are ActorRefs and are not
Actor class instances. An actor should never be accessed directly; always
through messages.
If we access the actors directly, the environment becomes unreliable in making high
concurrency and parallelism. The message system and encapsulation always ensure
that we are doing a parallelizable and concurrent environment because there are no
shared variables or locks. All transactions are ACID.
Actor Lifecycle
In addition to the constructor, an actor has the following lifecycle methods, which are all described in
Table
4-1 :
receive
preStart
postStop
preRestart
postRestart
Table 4-1. Actor Lifecycle Methods
Method Description
constructor Called when a class is instantiated, as in Java.
preStart Called immediately after the actor started.
postStop Called immediately after the actor stopped. Typically for cleaning work.
preRestart Called immediately after the actor restarted. Usually, a restart causes an exception.
The preRestart receives Throwable and the message as parameters; the old object
receives these parameters.
postRestart Called immediately after the actor restarted. Usually, a restart causes an exception.
The postRestart receives a Throwable as parameter; the new object receives this
parameter.
CHAPTER 4 THE MODEL: AKKA
57
In the following example code, The Hulk (the green superhero) is used to show the lifecycle methods:
import akka.actor._
case object GetAngry
class Hulk extends Actor {
println("in the Hulk constructor")
override def preStart {
println("in the Hulk preStart")
}
override def postStop {
println("in the Hulk postStop")
}
override def preRestart(reason: Throwable, message: Option[Any]) {
println("in the Hulk preRestart")
println(s" preRestart message: ${message.getOrElse("")}")
println(s" preRestart reason: ${reason.getMessage}")
super.preRestart(reason, message)
}
override def postRestart(reason: Throwable) {
println("in the Hulk postRestart")
println(s" postRestart reason: ${reason.getMessage}")
super.postRestart(reason)
}
def receive = {
case GetAngry => throw new Exception("ROAR!")
case _ => println("Hulk received a message...")
}
}
object LifecycleTest extends App {
val system = ActorSystem("LifeCycleSystem")
val hulk = system.actorOf(Props[Hulk], name = "TheHulk")
println("sending Hulk a message")
hulk ! "hello Hulk"
Thread.sleep(5000)
println("making Hulk get angry")
hulk ! GetAngry
Thread.sleep(5000)
println("stopping Hulk")
system.stop(hulk)
println("shutting down Hulk system")
system. terminate()
}
CHAPTER 4 THE MODEL: AKKA
58
The following is the output when the program is run:
[info] Running LifecycleTest
sending Hulk a message
in the Hulk constructor
in the Hulk preStart
Hulk received a message...
making Hulk get angry
in the Hulk preRestart
[ERROR] [01/01/2015 01:01:01.964] [LifeCycleSystem-akka.actor.default-dispatcher-6]
[akka://LifeCycleSystem/user/TheHulk] ROAR!
java.lang.Exception: ROAR!
at Hulk$$anonfun$receive$1.applyOrElse(chapter04_03.scala:31)
at akka.actor.Actor$class.aroundReceive(Actor.scala:480)
at Hulk.aroundReceive(chapter04_03.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:525)
at akka.actor.ActorCell.invoke(ActorCell.scala:494)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
preRestart message: GetAngry
preRestart reason: ROAR!
in the Hulk postStop
in the Hulk constructor
in the Hulk postRestart
postRestart reason: ROAR!
in the Hulk preStart
As an exercise, make the trace source code vs. the program output.
Starting Actors
You have already seen how to create actors from the actor system . To create actors from another actor, you
must use the following context:
class GodWanabe extends Actor {
val = context.actorOf creature (Props [Creature] name = "Creature")
// Add the code for its creation ...
}
Let’s look at the actor lifecycle control between actors with an example based on characters from The
Simpsons . Mr. Burns is the boss and has a nuclear power plant. He hires two employees, Homer Simpson
and Frank Grimes, but then only fires Frank Grimes.
import akka.actor._
case class Hire(name: String)
case class Name(name: String)
class Boss extends Actor {
def receive = {
CHAPTER 4 THE MODEL: AKKA
59
case Hire(name) =>
// here the boss hire personnel
println(s"$name is about to be hired")
val employee = context.actorOf(Props[Employee], name = s"$name")
employee ! Name(name)
case _ => println(s"The Boss can't handle this message.")
}
}
class Employee extends Actor {
var name = "Employee name"
override def postStop {
println(s"I'm ($name) and Mr. Burns fired me: ${self.path}")
}
def receive = {
case Name(name) => this.name = name
case _ => println(s"The Employee $name can't handle this message.")
}
}
object StartingActorsDemo extends App {
val actorSystem = ActorSystem("StartingActorsSystem")
val mrBurns = actorSystem.actorOf(Props[Boss], name = "MrBurns")
// here the boss hires people
mrBurns ! Hire("HomerSimpson")
mrBurns ! Hire("FrankGrimes")
// we wait some office cycles
Thread.sleep(4000)
// we look for Frank and we fire him
println("Firing Frank Grimes ...")
val grimes = actorSystem.actorSelection("../user/MrBurns/FrankGrimes")
// PoisonPill, an Akka special message
grimes ! PoisonPill
println("now Frank Grimes is fired")
}
The following is the output when we run this program:
[info] Running StartingActorsDemo
HommerSimpson is about to be hired
FrankGrimes is about to be hired
Firing Frank Grimes ...
now Frank Grimes is fired
I'm (FrankGrimes) and Mr. Burns fired me: akka://StartingActorsSystem/user/MrBurns/FrankGrimes
Process finished with exit code -1
CHAPTER 4 THE MODEL: AKKA
60
Let’s analyze the starting actors example code:
1 . Create and use the Name and Hire utility classes to send messages between
actors.
2 . When the employee actor receives the Name message, assigns it to the name
variable.
3 . When the boss receives a Hire message, it uses the context.actorOf method to
hire new employees.
4 . As usual, the main program creates the actor system.
5 . The main program then creates the boss actor using the actor system reference.
6 . The main program sends the boss two Hire messages, with HomerSimpson and
FrankGrimes as names.
7 . After a pause (4 seconds), look for Frank Grimes in the actor system, then send
him the PoisonPill message, which is an Akka actor system special message that
asynchronously sends the stop signal to an actor. Use the postStop method to
print a message after PoisonPill.
Stopping Actors
As you saw previously, there are four ways to stop an actor:
Calling system.stop(ActorRef) from the ActorSystem level
Calling context.stop(ActorRef) from inside an actor
Sending an actor the PoisonPill message
Programming a gracefulStop
Table 4-2 summarizes the ways to stop an actor.
Table 4-2. Ways to Stop an Actor
Message Characteristics
stop When the stop method is received, the actor processes only the current message (if any).
All the messages are discarded: the queued messages in the actors mailbox and the
newly arriving.
PoisonPill Once the PoisonPill message is received, it is queued in the actor’s mailbox as any
normal message. Once the PoisonPill message is processed, the actor stops.
gracefulStop This method allows actors to end gracefully, waiting for the timeout signal. If you need
a specific set of instructions before stopping the actor, this is the right way
CHAPTER 4 THE MODEL: AKKA
61
Some aspects to consider when stopping actors:
The stop message is asynchronous. The stop method could return before the actor is
actually stopped; use it wisely.
The shutdown process has two subprocesses. First, it suspends the actor’s mailbox.
Second, it sends the stop message to all the actor children; the father actor has to
wait for all its children to stop.
When you can’t process any more messages, these messages are sent to the dead
letters mailbox. You can access them with the deadLetters method in the actor
system.
When an actor is stopped, the postStop lifecycle method is invoked. Normally it is
used to clean up resources.
Here is an example of code using system.stop :
import akka.actor._
class Scapegoat extends Actor {
def receive = {
case s:String => println("Message received: " + s)
case _ => println("What?")
}
}
object StopExample extends App {
val system = ActorSystem("StopExample")
val sg = system.actorOf(Props[Scapegoat], name = "ScapeGoat")
sg ! "ready?"
// stop our crash dummy
system.stop(sg)
system.terminate()
}
Killing Actors
The following code shows how to kill actors . It is a very violent way; discretion is advised. Normally, if you
want to stop an actor gracefully, you use the methods described earlier.
import akka.actor._
class ScapeGoat extends Actor {
def receive = {
case s:String => println("Message received: " + s)
case _ => println("Uh?")
}
override def preStart {
println("In preStart method")
}
CHAPTER 4 THE MODEL: AKKA
62
override def postStop {
println("In postStop method")
}
override def preRestart(reason: Throwable, message: Option[Any]) {
println("In preRestart method")
}
override def postRestart(reason: Throwable) {
println("In postRestart method")
}
}
object Abbatoir extends App {
val system = ActorSystem("Abbatoir")
val sg = system.actorOf(Props[ScapeGoat], name = "ScapeGoat")
sg ! "say goodbye"
// finish him!
sg ! Kill
system. terminate()
}
This is the code output:
In preStart method
Message received: say goodbye
In postStop method
Process finished with exit code 0
Shutting down the Actor System
As you have already seen in the examples, this is the method to shut down the actor system:
system.terminate()
Because of its importance, we dedicated this section to this method. Remember, if you don’t call the
shutdown method in your application, the program will run indefinitely.
Actor Monitoring
This code shows how an actor asks to be notified when a child actor dies:
import akka.actor._
class Child extends Actor {
def receive = {
case _ => println("Child received a message")
}
}
CHAPTER 4 THE MODEL: AKKA
63
class Dad extends Actor {
// Dad actor create a child actor
val child = context.actorOf(Props[Child], name = "Son")
context.watch(child)
def receive = {
case Terminated(child) => println("This will not end here -_-")
case _ => println("Dad received a message")
}
}
object ChildMonitoring extends App {
val system = ActorSystem("ChildMonitoring")
// we create a Dad (and it will create the Child)
val dad = system.actorOf(Props[Dad], name = "Dad")
// look for child, then we kill it
val child = system.actorSelection("/user/Dad/Son")
child ! PoisonPill
Thread.sleep(3000)
println("Revenge!")
system. terminate()
}
Running this code produces the following result:
This will not end here
Revenge!
Process finished with exit code 0
Through the watch() method, an actor knows when a subordinate stops. This is very useful because it
lets the supervisor handle the situation.
Note that when an exception occurs within an actor, the actor does not kill himself. In Akka, an
exception makes an actor restart automatically.
Looking up Actors
In the previous example, you saw how to find a specific actor:
val child = system.actorSelection("/user/Dad/Son")
The actorSelection method is available under the actor system and within each actor instance
through the context variable.
You can also look for actors with a relative path; for example, from siblings:
// From an actor brother
val bro = context.actorSelection("../myBrother")
CHAPTER 4 THE MODEL: AKKA
64
The actorSelection method in the actor system can be used to find actors:
val child = system.actorSelection("akka://MonitoringTest/user/Dad/Son")
val child = system.actorSelection(Sec("user", "Dad", "Son"))
With the actorSelection method, you can also look for a sibling:
val child = system.actorSelection(Sec("..." "Son"))
Actor Code of Conduct
At this point, you have seen everything that you need to write actors. To achieve concurrent programming,
it is important to maintain a performance style; that is, a code of ethics among actors. If you keep this code of
ethics, the source code will be easy to debug and won’t have typical multithreaded programming problems,
such as deadlocks and race conditions. In this section, we present the fundamental principles for a good
performance.
Actors Do Not Block Each Other
A written good actor does not block others while processing a message. When an actor blocks another actor,
the first actor cannot attend to a request. If the actor is locked while working on the first request, you cannot
attend to the second request. The worst case scenario is when actors block each other; this is known as
deadlock : the first actor is waiting for the second one to do something and the second one is waiting for the
first one to do something.
Rather than block messages, the code of an actor must prioritize messages as they arrive so that a lock is
never generated. Normally, when you do not know how to handle a lock, good practices indicate that it is the
right time to delegate. You always have to delegate; an actor should not block a message on itself.
Another good practice is to never use Thread.sleep , and to try to avoid the Thread class in your
programs. The actor programming replaces any thread operation. If you need an actor to wait to perform a
task, ideally the actor should be delegated to another lighter actor the time handling. The use of the Thread
class causes more problems than it remedies.
When you need an actor to perform an answer waiting operation, the original actor, let's call it Actor A,
must attend requests—that is its primary function. So if it requires a standby condition, you must generate
an Actor B to standby for the answer and do nothing more. This way, Actor A is free to meet requests, which
is its primary function.
Communication is only via Messages
The key to understanding how the actor model addresses the difficulties of the shared data and lock model is to
provide a space where operations are safe. This sequential space is within each option in the receive method.
In other words, the actors allow you to program multithreaded programs through single-threaded programs
that communicate with each other through asynchronous messages. This multithread abstraction model works
as long as the only form of communication among the stakeholders is through sending messages.
For example, let’s say that there are two actors: GoodPerformer and BadPerformer . Suppose that
GoodPerformer sends a good and nice message to BadPerformer, and as a courtesy, GoodPerformer sends
a reference to himself in the message. Well, suppose that BadPerformer misuses this reference and invokes
methods on GoodPerformer instead of sending messages to GoodPerformer through the ! operator. This
is where the drama begins, because the invoked methods may read an instance of GoodPerformer being
used by another thread. Or worse, the method invoked can modify GoodPerformer’s own variables and
decompose its state.
CHAPTER 4 THE MODEL: AKKA
65
If you continued this lack of privacy, BadPerformer would write synchronization methods on
GoodPerformers variables, which would become “shared data,” not only between them, but among all who
could invoke them. This shared data and locks models have brought ruin to many systems.
On the other hand, if for practical purposes you need to share state—for example, maintain code
clarity with other non-functional programmers, you can achieve this state in Scala. The difference between
Erlang and Scala is that Erlang never lets you communicate in a way different from sending messages
between actors. Scala designers did this to preserve the hybrid state language. We can pass endless hours in
discussion on whether this is correct or not.
Note We are not saying that you, the reader, should be involved in one of these discussions (of course
not, programmers never enter these discussions). Although now you may be vowing to never share status
or provoke lock conditions. But, we share this example in case you are involved in an argument with purist
programmers.
Imagine that you would need the shared mutable map data structure. That is, it is mutable because you
need to insert a pair (key, value) on the map to obtain the value of a given key, get a key set having specific
value, and so forth—common operations on a mutable map. The actor model states that you must build a
wrapper on the map; that is, an actor contains the map and manages all requests. Only the actor can access
the map, no one else, and that actor only receives and responds to messages, nothing more.
Everything is going well so far, but practical programmers will tell you that for this type of challenge,
there is the ConcurrentHashMap class on Java Concurrency utilities. One of its benefits is that it allows you
to send status change messages to multiple actors (a broadcast), which greatly simplifies life and makes
the code more understandable; however, it does not meet the actor model. Another difference is that the
responses of actors are an asynchronous model; the ConcurrentHashMap response model is synchronous,
simple, and immediate as most understand them.
Messages must be Immutable
Because the Akka actor model provides a single-threaded model, you never need to worry about whether
the objects used in the implementation of these methods are thread-safe . In the actor model, this is called
the share nothing model ; data is confined in a thread instead of being shared by many threads.
But there is one exception to “share nothing,” which is when the message that you send is shared data
among several actors; as a result, you have to worry whether messages are thread-safe. In general, they
should always be thread-safe.
In all good practices you want to avoid unnecessary complexity. The best and simplest way to ensure
that objects in messages are thread-safe is to ensure the use of immutable objects within messages.
Instances of any class having only val fields, which themselves only refer to immutable objects, are
immutable. Besides val, you can use all the immutable classes offered by Scala, such as tuples, strings, lists,
immutable sets, immutable maps, and so on.
Suppose an actor sends a mutable and unsynchronized object as a message (at this point, you could
say that it’s like cussing). And after that, this object is never read nor written again. It might work, but you
are invoking misfortune, because in the future, some code maintainer could debug and see that this object
is shared, and may try to improve scalability, or worse, try to reuse and modify the values for reuse, which
could lead to a bug, which can lead to concurrency disaster.
In general, the best way to arrange your data is to keep all unsynchronized, mutable objects fully
contained within the actors, and therefore accessed only by the owner actor. Whenever objects are
transferred between actors (not messages), you must 100% guarantee what those objects are doing at any
point in time and anywhere in the system.
CHAPTER 4 THE MODEL: AKKA
66
In the actor model, whenever you want to modify a variable that is not your own, you must at least
send a message to the variable owner to warn that you are making changes. Moreover, you must wait for
confirmation that the values can be modified.
If you still want to continue sending objects between actors but without messages, a good alternative
is to send a copy of the object. This at least guarantees that the original object will not be modified by a
foreign entity. A very good example is when you have arrays indiscriminately sent among objects; two
array methods are really good: arr.clone (to send a copy) and arr.toList (to send a copy as a list, which
is also immutable).
Messages must be Self-Contained
When you return the value of a method, the caller has the advantage of knowing what it was doing before
invoking this method, and can take the return value and continue what it was doing.
With actors, things are not so simple. When an actor makes a request, the answer may not be
immediate; it may take a long time. So as conditions are non-blocking, the actor can continue doing other
work while it waits for the response. The problem is that when the answer arrives, how do you know what
was the actor doing when it made the invocation?
One commonly used method to simplify the logic of the actors in a program includes sending
redundant information in messages. If the request is an immutable object, you can cheaply include a
reference to the request in the response. This makes the message larger, but simplifies the actor logic.
Another way to increase redundancy in messages is by building a case class for each type of message.
While such a wrapper is not strictly necessary in many cases, it makes actor programs easier to understand.
Code with case classes are always easier to understand than code using tuples, for example.
Summary
You learned how to build scalable, robust, concurrent programs using the Akka actor model, avoiding the
problems of traditional approaches based on synchronized access and shared and mutable states.
You reviewed Akkas main concepts:
Actor model
Actor communication
Actor lifecycle
You also explored the actor Code of Conduct.
In the following chapters, you will need Scala/Akka power to code SMACK pipeline applications.
67
© Raul Estrada and Isaac Ruiz 2016
R. Estrada and I. Ruiz, Big Data SMACK, DOI 10.1007/978-1-4842-2175-4_5
CHAPTER 5
Storage: Apache Cassandra
Congratulations! You are almost halfway through this journey. You are at the point where it is necessary to
meet the component responsible for information persistence; the sometimes neglected “data layer” will
take on a new dimension when you have finished this chapter. It’s time to meet Apache Cassandra, a NoSQL
database that provides high availability and scalability without compromising performance.
Note We suggest that you have your favorite terminal ready to follow the exercises. This will help you
become familiar with the tools faster.
Once Upon a Time...
Before you start, let’s do a little time traveling to ancient Greece to meet the other Cassandra. In Greek
mythology, there was a priestess who was chastised for her treason to the god Apollo. She asked for the
gift of prophecy in exchange for a carnal encounter; however, she failed to fulfill her part of the deal. For
this, she received this punishment: she would have the gift of prophecy, but no one would ever believe her
prophecies. A real tragedy . This priestess’s name was Cassandra.
Perhaps the modern Cassandra, the Apache project, has come to claim the ancient Cassandra. With
modern Cassandra, it is probably best to believe what she tells you and do not be afraid to ask.
Modern Cassandra
Modern Cassandra represents the persistence layer in our reference implementation.
First, let’s have a short overview of NoSQL, and then continue to the installation and learn how to
integrate Cassandra on the map.
NoSQL Everywhere
Fifteen years ago, nobody imagined the amount of information that a modern application would have
to manage; the Web was only beginning to take its shape today. Computer systems were becoming more
powerful, defying the Moores law,
1 not only in large data centers but also in desktop computers, warning us
that the free lunch is over. 2
1 https://en.wikipedia.org/wiki/Moore%27s_law .
2 http://www.gotw.ca/publications/concurrency-ddj.htm .
CHAPTER 5 STORAGE: APACHE CASSANDRA
68
In this scenario, those who drove the change had to be innovative in the way that they looked for
alternatives to a relational database management system (RDBMS). Google, Facebook, and Twitter had to
experiment with creating their own data models—each with different architectures—gradually building
what is known today as NoSQL.
The diversity of NoSQL tools is so broad that it is difficult to make a classification. But there was one
audacious guy who did, and he proposed that a NoSQL tool must meet the following characteristics :
Non-relational
Open source
Cluster-friendly
Twenty-first-century web
Schemaless
That guy was Martin Fowler and he exposes this in his book with Pramod J. Sadalage, NoSQL Distilled
(Addison-Wesley Professional, 2012).
3
At the GOTO conference in 2013,
4 Fowler presented the “Introduction to NoSQL,” a very educational
presentation well worth checking out.
But how is that NoSQL improves data access performance over traditional RDBMS? It has much to do
with the way NoSQL handles and abstracts data; that is, how it has defined the data model.
Following Martin Fowler’s comments, if you use this criterion, you can classify NoSQL (as shown in
Figure
5-1 ) with distinct types according to this data model: document, column-family, graph, key-value.
Figure 5-1. NoSQL classification according to the data model used
3 http://martinfowler.com/books/nosql.html
4 https://www.youtube.com/watch?v=qI_g07C_Q5I
CHAPTER 5 STORAGE: APACHE CASSANDRA
69
Another NoSQL-specific feature is that the data model does not require a data schema, which allows a
greater degree of freedom and faster data access.
As seen in Figure
5-2 , the data models can be grouped as aggregated-oriented and schemaless.
Figure 5-2. Another data model classification
Figure 5-3. A simple way to determine when to use NoSQL
The amount of data to be handled and the need of a mechanism to ease the development are indicators
of when to use NoSQL. Martin Fowler recommends that you use these two major criteria for when to start
using NoSQL, as shown in Figure
5-3 .
Finally, you must remember that there is no silver bullet, and although the rise of SQL is not large, you
must be cautious in choosing when to use it.
CHAPTER 5 STORAGE: APACHE CASSANDRA
70
The Memory Value
Many of the advantages of NoSQL are based on the fact that a lot of data management is performed in
memory, which gives excellent performance to data access.
Note The processing performance of main memory is 800 times faster than HDD, 40 times faster than a
common SSD, and seven times faster than the fastest SSD.
5
Surely, you already know that the memory access is greater than disk access, but with this speed, you
want to do everything in memory. Fortunately, all of these advantages are abstracted by Cassandra and you
just have to worry about what and how to store.
Key-Value and Column
There are two particular data models that to discuss: key-value and column-family. It is common practice
that NoSQL use several data models to increase its performance. Cassandra makes use of key-value and
column-family data models.
Key-Value
The simplest data model is key-value . You have probably already used this paradigm within a programming
language. In a nutshell, it is a hash table.
Given a key, you can access the content (value), as demonstrated in Figure
5-4 .
Figure 5-4. You can imagine this data model as a big hash. The key allows access to certain <content>. This
<content> can be different types of data, which makes it a much more flexible structure.
5 Ki Sun Song, “Introduction to In-Memory Data Grid; Main Features.” http://www.cubrid.org/blog/dev-platform/
introduction-to-in-memory-data-grid-main-features/
CHAPTER 5 STORAGE: APACHE CASSANDRA
71
Column-Family
An important part of this data model is that the storage and fetch processes are made from columns and not
from rows. Also, a lot of the data model is done in memory, and you already know how important that is.
What’s a column? It is a tuple containing key-value pairs. In the case of several NoSQL, this tuple is
formed by three pairs: name/key, value, and one timestamp.
In this model, several columns (the family) are grouped by a key called a row-key . Figure 5-5 shows this
relationship.
Figure 5-5. A key (row-key) can access the column family. This group exemplifies the data model column-family
Figure 5-6. When all nodes have the same role, having data redundancy is much easier to maintain a
replication, which always help maintain the availability of data.
The main advantage of this model is that it substantially improves write operations, which improves
their performance in distributed environments.
6
Why Cassandra?
Cassandra implements “no single points of failure,” which is achieved with redundant nodes and data.
Unlike legacy systems based on master-slave architectures, Cassandra implements a masterless “ring”
architecture (see Figure
5-6 ).
6 Comparative study of NoSQL document, column store databases, and evaluation of Cassandra. http://airccse.org/
journal/ijdms/papers/6414ijdms02.pdf
CHAPTER 5 STORAGE: APACHE CASSANDRA
72
With this architecture, all nodes have an identical role: there is no master node. All nodes communicate
with each other using a scalable and distributed protocol called gossip.
7
This architecture, together with the protocol, collectively cannot have a single point of failure. It offers
true continuous availability.
The Data Model
At this point, you can say that the Cassandra data model is based primarily on managing columns. As
mentioned earlier, some NoSQL combine multiple data models, as is the case with Cassandra (see Figure
5-7 ).
Figure 5-7. Cassandra uses a model of combined data; key-value uses this to store and retrieve the columns
Table 5-1. Cassandra Data Model and RDBMS Equivalences
Definition RDBMS Equivalent
Schema/Keyspace A collection of column families. Schema/database
Table/Column-Family A set of rows. Table
Row An ordered set of columns. Row
Column A key/value pair and timestamp. Column (name, value)
Cassandra has some similarity to an RDBMS; these similarities facilitate use and adoption, although
you must remember that they do not work the same way.
Table 5-1 provides some comparisons that help us better understand Cassandras concepts.
7 https://docs.datastax.com/en/cassandra/2.1/cassandra/architecture/architectureGossipAbout_c.html
CHAPTER 5 STORAGE: APACHE CASSANDRA
73
Figure 5-8 illustrates the relationships among these concepts.
Figure 5-8. Relationships among column, row, column-family, and keyspace
Cassandra 101
Installation
This section explains how to install Apache Cassandra on a local machine. The following steps were
performed on a Linux machine. At the time of this writing, the stable version is 3.4, released on March 8, 2016.
Prerequisites
Apache Cassandra requires Java version 7 or 8, preferably the Oracle/Sun distribution. The documentation
indicates that it is also compatible with OpenJDK, Zing, and IBM distributions.
File Download
The first step is to download the .zip file distribution downloaded from http://cassandra.apache.org/
download/ .
On the Apache Cassandra project home page (see Figure
5-9 and http://cassandra.apache.org ), the
project logo reminds us of the seeing ability of the mythological character.
CHAPTER 5 STORAGE: APACHE CASSANDRA
74
Locate the following file:
apache-cassandra-3.4-bin.tar.gz
It’s important to validate file integrity. You do not want it to fail while it’s running. This particular file has
the following values to validate its integrity:
[MD5] e9f490211812b7db782fed09f20c5bb0
[SHA1]7d010b8cc92d5354f384b646b302407ab90be1f0
It’s easy to make this validation. Any flavor of Linux gives the md5 and sha1sum commands, as shown in
the following:
%> md5sum apache-cassandra-3.4-bin.tar.gz
e9f490211812b7db782fed09f20c5bb0 apache-cassandra-3.4-bin.tar.gz
%> sha1sum apache-cassandra-3.4-bin.tar.gz
7d010b8cc92d5354f384b646b302407ab90be1f0 apache-cassandra-3.4-bin.tar.gz
%> ls -lrt apache-cassandra-3.4-bin.tar.gz
-rw-r--r--. 1 rugi rugi 34083682 Mar 7 22:06 apache-cassandra-3.4-bin.tar.gz
Once you have validated the files integrity, you can unzip it and continue.
Figure 5-9. Cassandra project home page
CHAPTER 5 STORAGE: APACHE CASSANDRA
75
Start
Starting Apache Cassandra is easy; you only execute the following:
./cassandra -f
INFO 18:06:58 Starting listening for CQL clients on localhost/127.0.0.1:9042 (unencrypted)...
INFO 18:06:58 Not starting RPC server as requested. Use JMX (StorageService->startRPCServer())
or nodetool (enablethrift) to start it
INFO 18:07:07 Scheduling approximate time-check task with a precision of 10 milliseconds
INFO 18:07:07 Created default superuser role 'cassandra'
With this command, your server Apache Cassandra is ready to receive requests. You must always keep
in mind that Apache Cassandra runs with a client-server approach. You have launched the server; the server
is responsible for receiving requests from clients and then giving them answers. So now you need to validate
that clients can send requests.
The next step is to use the CLI validation tool, an Apache Cassandra client.
Validation
Now, let’s execute the CLI tool.
%> ./cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.4 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
cqlsh>
The first step is to create a keyspace, an analogy with relational databases in which you define the
database, per se.
%>CREATE KEYSPACE mykeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };
Once the keyspace is defined, indicate that you will use it.
USE mykeyspace;
This is a familiar sentence, isn’t?
Apache Cassandra, like other NoSQL frameworks, tries to use analogies with the SQL statements that
you already know.
And if you have already defined the database, do you remember what is next? Now you create a test table.
%> CREATE TABLE users ( user_id int PRIMARY KEY, fname text, lname text);
And, having the table, inserting records is simple, as you can see in the following:
%>INSERT INTO users (user_id, fname, lname) VALUES (1745, 'john', 'smith');
%>INSERT INTO users (user_id, fname, lname) VALUES (1744, 'john', 'doe');
%>INSERT INTO users (user_id, fname, lname) VALUES (1746, 'john', 'smith');
CHAPTER 5 STORAGE: APACHE CASSANDRA
76
You make a simple query, like this:
%>SELECT * FROM users;
And, you should have the following results (or similar, if you already modified the data with the inserts):
user_id | fname | lname
---------+-------+-------
1745 | john | smith
1744 | john | doe
1746 | john | smith
With Apache Cassandra, you can create indexes on the fly:
CREATE INDEX ON users (lname);
To facilitate searches on specific fields, do this:
SELECT * FROM users WHERE lname = 'smith';
This is the result:
user_id | fname | lname
---------+-------+-------
1745 | john | smith
1746 | john | smith
And that’s it. This is enough to validate that communication between your CLI and your Cassandra
server is working properly.
Here is the complete output from the previous commands:
{16-03-21 14:05}localhost:~/ opt/apache/cassandra/apache-cassandra-3.4/bin rugi% cd /home/
rugi/opt/apache/cassandra/apache-cassandra-3.4/bin
{16-03-21 14:05}localhost:~/opt/apache/cassandra/apache-cassandra-3.4/bin rugi% ./cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.4 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
cqlsh> CREATE KEYSPACE mykeyspace WITH REPLICATION = { 'class' : 'SimpleStrategy',
'replication_factor' : 1 };
cqlsh> USE mykeyspace;
cqlsh:mykeyspace> CREATE TABLE users ( user_id int PRIMARY KEY, fname text, lname text);
cqlsh:mykeyspace> INSERT INTO users (user_id, fname, lname) VALUES (1745, 'john', 'smith');
cqlsh:mykeyspace> INSERT INTO users (user_id, fname, lname) VALUES (1744, 'john', 'doe');
cqlsh:mykeyspace> INSERT INTO users (user_id, fname, lname) VALUES (1746, 'john', 'smith');
cqlsh:mykeyspace> SELECT * FROM users;
user_id | fname | lname
---------+-------+-------
1745 | john | smith
1744 | john | doe
1746 | john | smith
(3 rows)
CHAPTER 5 STORAGE: APACHE CASSANDRA
77
cqlsh:mykeyspace> CREATE INDEX ON users (lname);
cqlsh:mykeyspace> SELECT * FROM users WHERE lname = 'smith';
user_id | fname | lname
---------+-------+-------
1745 | john | smith
1746 | john | smith
(2 rows)
cqlsh:mykeyspace> exit
{16-03-21 15:24}localhost:~/opt/apache/cassandra/apache-cassandra-3.4/bin rugi%
You should have two terminals open: one with the server running and the other one with CLI
running. If you check the first one, you will see how CLI is processing the requests. Figure
5-10 shows the
server running.
Figure 5-10. Cassandra server running
CHAPTER 5 STORAGE: APACHE CASSANDRA
78
CQL
CQL (Cassandra Query Language) is a language similar to SQL. The queries on a keyspace are
made in CQL.
CQL Shell
There are several ways to interact with a keyspace ; in the previous section, you saw how to do it using a shell
called CQL shell (CQLs) . Later you will see other ways to interact with the keyspace.
CQL shell is the primary way to interact with Cassandra; Table
5-2 lists the main commands.
Figure 5-11. CQL running on CQLs
Figure 5-11 is a screenshot of running the test commands described earlier.
CHAPTER 5 STORAGE: APACHE CASSANDRA
79
Table 5-2. Shell Command Summary
Command Description
cqlsh Starts the CQL interactive terminal.
CAPTURE Captures the command output and appends it to a file.
CONSISTENCY Shows the current consistency level; or given a level, sets it.
COPY Imports and exports CSV (comma-separated values) data to and from Cassandra.
DESCRIBE Provides information about the connected Cassandra cluster or about the data
objects stored in the cluster.
EXPAND Formats the output of a query vertically.
EXIT Terminates cqlsh.
PAGING Enables or disables query paging.
SHOW Shows the Cassandra version, host, or tracing information for the current cqlsh
client session.
SOURCE Executes a file containing CQL statements.
TRACING Enables or disables request tracing.
For more detailed information on shell commands, you should visit the following web page:
http://docs.datastax.com/en/cql/3.1/cql/cql_reference/cqlshCommandsTOC.html
Let’s try some of these commands. First, activate the shell, as follows;
{16-04-15 23:54}localhost:~/opt/apache/cassandra/apache-cassandra-3.4/bin rugi% ./cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
[cqlsh 5.0.1 | Cassandra 3.4 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
The describe command can work in specific tables, in all the keyspaces, or in one specific keyspace:
cqlsh> describe keyspaces
system_schema system system_distributed
system_auth mykeyspace system_traces
cqlsh> describe mykeyspace
CREATE KEYSPACE mykeyspace WITH replication = {'class': 'SimpleStrategy', 'replication_
factor': '1'} AND durable_writes = true;
CREATE TABLE mykeyspace.users (
user_id int PRIMARY KEY,
fname text,
lname text
CHAPTER 5 STORAGE: APACHE CASSANDRA
80
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
'max_threshold': '32', 'min_threshold': '4'}
AND compression = {'chunk_length_in_kb': '64',
'class': 'org.apache.cassandra.io.compress. LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
CREATE INDEX users_lname_idx ON mykeyspace.users (lname);
T h e show command is also simple to test to see the version number:
cqlsh> show version
[cqlsh 5.0.1 | Cassandra 3.4 | CQL spec 3.4.0 | Native protocol v4]
cqlsh>
As you can see, these commands are very easy to use.
CQL Commands
CQL is very similar to SQL, as you have already seen in the first part of this chapter. You have created a
keyspace, made inserts, and created a filter.
CQL, like SQL, is based on sentences/statements. These sentences are for data manipulation and work
with their logical container, the keyspace. As in SQL statements, they must end with a semicolon (;).
Table 5-3 lists all the language commands.
CHAPTER 5 STORAGE: APACHE CASSANDRA
81
Table 5-3. CQL Command Summary
Command Description
ALTER KEYSPACE Changes the property values of a keyspace.
ALTER TABLE Modifies the column metadata of a table.
ALTER TYPE Modifies a user-defined type. Cassandra 2.1 and later.
ALTER USER Alters existing user options.
BATCH Writes multiple DML statements.
CREATE INDEX Defines a new index on a single column of a table.
CREATE KEYSPACE Defines a new keyspace and its replica placement strategy.
CREATE TABLE Defines a new table.
CREATE TRIGGER Registers a trigger on a table.
CREATE TYPE Creates a user-defined type. Cassandra 2.1 and later.
CREATE USER Creates a new user.
DELETE Removes entire rows or one or more columns from one or more rows.
DESCRIBE Provides information about the connected Cassandra cluster or about the data
objects stored in the cluster.
DROP INDEX Drops the named index.
DROP KEYSPACE Removes the keyspace.
DROP TABLE Removes the named table.
DROP TRIGGER Removes registration of a trigger.
DROP TYPE Drops a user-defined type. Cassandra 2.1 and later.
DROP USER Removes a user.
GRANT Provides access to database objects.
INSERT Adds or updates columns.
LIST PERMISSIONS Lists permissions granted to a user.
LIST USERS Lists existing users and their superuser status.
REVOKE Revokes user permissions.
SELECT Retrieves data from a Cassandra table.
TRUNCATE Removes all data from a table.
UPDATE Updates columns in a row.
USE Connects the client session to a keyspace.
For more detailed information of CQL commands, you can visit the following web page:
http://docs.datastax.com/en/cql/3.1/cql/cql_reference/cqlCommandsTOC.html
Let’s play with some of these commands.
{16-04-16 6:19}localhost:~/opt/apache/cassandra/apache-cassandra-3.4/bin rugi% ./cqlsh
Connected to Test Cluster at 127.0.0.1:9042.
CHAPTER 5 STORAGE: APACHE CASSANDRA
82
[cqlsh 5.0.1 | Cassandra 3.4 | CQL spec 3.4.0 | Native protocol v4]
Use HELP for help.
Use the keyspace created at beginning, as follows:
cqlsh> use mykeyspace;
The DESCRIBE command can be applied to almost any object to discover the keyspace tables.
cqlsh:mykeyspace> describe tables users
Or in a specific table.
cqlsh:mykeyspace> describe users
CREATE TABLE mykeyspace.users (
user_id int PRIMARY KEY,
fname text,
lname text
) WITH bloom_filter_fp_chance = 0.01
AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
AND comment = ''
AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy',
'max_threshold': '32', 'min_threshold': '4'}
AND compression = { 'chunk_length_in_kb': '64',
'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
AND crc_check_chance = 1.0
AND dclocal_read_repair_chance = 0.1
AND default_time_to_live = 0
AND gc_grace_seconds = 864000
AND max_index_interval = 2048
AND memtable_flush_period_in_ms = 0
AND min_index_interval = 128
AND read_repair_chance = 0.0
AND speculative_retry = '99PERCENTILE';
CREATE INDEX users_lname_idx ON mykeyspace.users (lname);
cqlsh:mykeyspace> exit
Beyond the Basics
You already know that Apache Cassandra runs on a client-server architecture . The client-server architecture
is used by nearly everyone every day; it is the base of what you know as the Internet.
Client-Server
By definition, the client-server architecture allows distributed applications, since the tasks are divided into
two main parts:
The service providers: the servers
The service petitioners : the clients
CHAPTER 5 STORAGE: APACHE CASSANDRA
83
In this architecture, several clients are allowed to access the server. The server is responsible for
meeting requests and it handles each one according its own rules. So far, you have only used one client,
managed from the same machine—that is, from the same data network.
Figure 5-12 shows our current client-server architecture in Cassandra.
Figure 5-12. The native way to connect to a Cassandra server is via CQLs.
Figure 5-13. To access a Cassandra server, a driver is required
CQL shell allows you to connect to Cassandra, access a keyspace, and send CQL statements to the
Cassandra server. This is the most immediate method, but in daily practice, it is common to access the
keyspaces from different execution contexts (other systems and other programming languages).
Other Clients
You require other clients, different from CQLs, to do it in the Apache Cassandra context. You require
connection drivers .
Drivers
A driver is a software component that allows access to a keyspace to run CQL statements.
Figure 5-13 illustrates accessing clients through the use of a driver. A driver can access a keyspace and
also allows the execution of CQL sentences.
Fortunately, there are a lot of these drivers for Cassandra in almost any modern programming language.
You can see an extensive list at
http://wiki.apache.org/cassandra/ClientOptions .
Currently, there are different drivers to access a keyspace in almost all modern programming
languages. Typically, in a client-server architecture, there are clients accessing the server from different
clients, which are distributed in different networks, therefore, Figure
5-13 may now look like what’s shown
in Figure
5-14 .
CHAPTER 5 STORAGE: APACHE CASSANDRA
84
The Figure 5-14 illustrates that given the distributed characteristics that modern systems require, the
clients actually are in different points and access the Cassandra server through public and private networks.
Your implementation needs will dictate the required clients.
All languages offer a similar API through the driver. Consider the following code snippets in Java, Ruby,
and Node.
J a v a
The following snippet was tested with JDK 1.8.x.
Get Dependence
With java, it is easiest is to use Maven. You can get the driver using the following Maven artifact:
<dependency>
<groupId>com.datastax.cassandra</groupId>
<artifactId>cassandra-driver-core</artifactId>
<version>3.0.2</version>
</dependency>
Figure 5-14. Different clients connecting to a Cassandra server through the cloud
CHAPTER 5 STORAGE: APACHE CASSANDRA
85
Snippet
The following is the Java snippet:
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.ResultSet;
import com.datastax.driver.core.Row;
import com.datastax.driver.core.Session;
import java.util.Iterator;
...
public static void main(String[] args) {
Cluster cluster = Cluster.builder().addContactPoint("127.0.0.1").build();
Session session = cluster.connect("mykeyspace");
ResultSet results = session.execute("SELECT * FROM users");
StringBuilder line = new StringBuilder();
for (Iterator<Row> iterator = results.iterator(); iterator.hasNext();) {
Row row = iterator.next();
line.delete(0, line.length());
line.append("FirstName = ").
append(row.getString("fname")).
append(",").append(" ").
append("LastName = ").
append(row.getString("lname"));
System.out.println(line.toString());
}
}
R u b y
The snippet was tested with Ruby 2.0.x.
Get Dependence
In Ruby, obtaining the driver is as simple as installing a gem.
%>gem install cassandra-driver
Snippet
The following is the Ruby snippet:
require 'cassandra'
node = '127.0.0.1'
cluster = Cassandra.cluster(hosts: node)
keyspace = 'mykeyspace'
session = cluster.connect(keyspace)
session.execute("SELECT fname, lname FROM users").each do |row|
p "FirstName = #{row['fname']}, LastName = #{row['lname']}"
end
CHAPTER 5 STORAGE: APACHE CASSANDRA
86
N o d e
The snippet was tested with Node v5.0.0.
Get Dependence
With Node, it could not be otherwise; the driver is obtained with npm.
%>npm install cassandra-driver
%>npm install async
Snippet
The following is the Node snippet:
var cassandra = require('cassandra-driver');
var async = require('async');
var client = new cassandra.Client({contactPoints: ['127.0.0.1'], keyspace: 'mykeyspace'});
client.stream('SELECT fname, lname FROM users', [])
.on('readable', function () {
var row;
while (row = this.read()) {
console.log('FirstName = %s , LastName= %s', row.fname,
row.lname);
}
})
.on('end', function () {
//todo
})
.on('error', function (err) {
// todo
});
</code>
These three snippets did the same thing: made a connection to the Cassandra server, got a reference
to the keyspace, made a single query, and displayed the results. In conclusion, the three snippets generated
the same result:
"FirstName = john, LastName = smith"
"FirstName = john, LastName = doe"
"FirstName = john, LastName = smith"
You can see more examples that use other languages on the following web page:
http://www.planetcassandra.org/apache-cassandra-client-drivers/
CHAPTER 5 STORAGE: APACHE CASSANDRA
87
Figure 5-15. The Spark-Cassandra Connector is a special type of client that allows access to keyspaces from a
S p a r k c o n t e x t
Apache Spark-Cassandra Connector
Now that you have a clear understanding on how connecting to a Cassandra server is done, let’s talk about
a very special client. Everything that you have seen previously has been done to get to this point. You can
now see what Spark can do since you know Cassandra and you know that you can use it as a storage layer to
improve the Spark performance.
What do you need to achieve this connection? A client. This client is special because it is designed
specifically for Spark, not for a specific language. This special client is called the Spark-Cassandra Connector
(see Figure
5-15 ).
Installing the Connector
The Spark-Cassandra connector has its own GitHub repository. The latest stable version is the master, but
you can access a special version through a particular branch .
Figure 5-16 shows the Spark-Cassandra Connector project home page, which is located at
https://github.com/datastax/spark-cassandra-connector .
CHAPTER 5 STORAGE: APACHE CASSANDRA
88
At the time of this writing, the most stable connector version is 1.6.0. The connector is basically a . jar
file loaded when Spark starts. If you prefer to directly access the .jar file and avoid the build process,
you can do it by downloading the official maven repository. A widely used repository is located at
http://mvnrepository.com/artifact/com.datastax.spark/spark-cassandra-connector_ .10/1.6.0-M2 .
Generating the .jar file directly from the Git repository has one main advantage: all the necessary
dependencies of the connector are generated. If you choose to download the jar from the official repository,
you must also download all of these dependencies.
Fortunately, there is a third way to run the connector, which is by telling the spark-shell that you require
certain packages for the session to start. This is done by adding the following flag:
./spark-shell --packages datastax:spark-cassandra-connector:1.6.0-M2-s_2.10
The nomenclature of the package is the same used with Gradle, Buildr, or SBT:
GroupID: datastax
ArtifactID: spark-cassandra-connector
Version: 1.6.0-M2-s_2.10
In the preceding lines of code, you are telling the shell that you require that artifact, and the shell will
handle all the units. Now let’s see how it works.
Figure 5-16. The Spark-Cassandra Connector on GitHub
CHAPTER 5 STORAGE: APACHE CASSANDRA
89
Establishing the Connection
The connector version used in this section is 1.6.0 because it is the latest stable version of Apache Spark as of
this writing.
First, validate that the versions are compatible. Access the Spark shell to see if you have the correct
version.
{16-04-18 1:10}localhost:~/opt/apache/spark/spark-1.6.0-bin-hadoop2.6/bin rugi% ./spark-shell
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.
MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark’s repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel(“INFO”)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)
Type in expressions to have them evaluated.
Type :help for more information.
....
scala>
Next, try a simple task:
scala> sc.parallelize( 1 to 50 ).sum()
res0: Double = 1275.0
scala>
Stop the shell ( exit command). Now at start time, indicate the package that you require (the
connector). The first time, the shell makes downloading dependencies:
{16-06-08 23:18}localhost:~/opt/apache/spark/spark-1.6.0-bin-hadoop2.6/bin rugi% >./spark-
shell --packages datastax:spark-cassandra-connector:1.6.0-M2-s_2.10
Ivy Default Cache set to: /home/rugi/.ivy2/cache
The jars for the packages stored in: /home/rugi/.ivy2/jars
:: loading settings :: url = jar:file:/home/rugi/opt/apache/spark/spark-1.6.0-bin-hadoop2.6/
lib/spark-assembly-1.6.0-hadoop2.6.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
datastax#spark-cassandra-connector added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent;1.0
confs: [default]
found datastax#spark-cassandra-connector;1.6.0-M2-s_2.10 in spark-packages
found joda-time#joda-time;2.3 in local-m2-cache
found com.twitter#jsr166e;1.1.0 in central
found org.scala-lang#scala-reflect;2.10.5 in central
CHAPTER 5 STORAGE: APACHE CASSANDRA
90
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 16 | 2 | 2 | 0 || 16 | 2 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent
confs: [default]
2 artifacts copied, 14 already retrieved (5621kB/32ms)
log4j:WARN No appenders could be found for logger (org.apache.hadoop.metrics2.lib.
MutableMetricsFactory).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Using Spark’s repl log4j profile: org/apache/spark/log4j-defaults-repl.properties
To adjust logging level use sc.setLogLevel(“INFO”)
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ ‘_/
/___/ .__/\_,_/_/ /_/\_\ version 1.6.0
/_/
Using Scala version 2.10.5 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_60)
Type in expressions to have them evaluated.
Type :help for more information.
16/06/08 23:18:59 WARN Utils: Your hostname, localhost.localdomain resolves to a loopback
address: 127.0.0.1; using 192.168.1.6 instead (on interface wlp7s0)
16/06/08 23:18:59 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Spark context available as sc.
16/06/08 23:19:07 WARN ObjectStore: Version information not found in metastore. hive.
metastore.schema.verification is not enabled so recording the schema version 1.2.0
16/06/08 23:19:07 WARN ObjectStore: Failed to get database default, returning
NoSuchObjectException
SQL context available as sqlContext.
scala>
The connector is loaded and ready for use.
First, stop the Scala executor from the shell:
sc.stop
Next, import the required classes for communication:
import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.spark.
SparkContext._, org.apache.spark.SparkConf
Then, set a variable with the required configuration to connect:
val conf = new SparkConf(true).set(“spark.cassandra.connection.host”, “localhost”)
CHAPTER 5 STORAGE: APACHE CASSANDRA
91
Finally, connect to the well-known keyspace and table that were created at the beginning of this chapter:
val sc = new SparkContext(conf)
val test_spark_rdd = sc.cassandraTable("mykeyspace", "users")
Given the context and keyspace, it is possible to consult the values with the following statement:
test_spark_rdd.foreach(println)
Here is the complete sequence of the five lines of code:
scala> sc.stop
scala> import com.datastax.spark.connector._, org.apache.spark.SparkContext, org.apache.
spark.SparkContext._, org.apache.spark.SparkConf
import com.datastax.spark.connector._
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
scala> val conf = new SparkConf(true).set("spark.cassandra.connection.host", "localhost")
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@68b5a37d
scala> val sc = new SparkContext(conf)
sc: org.apache.spark.SparkContext = org.apache.spark.SparkContext@3d872a12
scala> val test_spark_rdd = sc.cassandraTable("mykeyspace", "users")
The connection is established and is already accessible through test_spark_rdd to make operations in
our table within our keyspace; for example, to show values.
scala> test_spark_rdd.foreach(println)
CassandraRow{user_id: 1745, fname: john, lname: smith}
CassandraRow{user_id: 1744, fname: john, lname: doe}
CassandraRow{user_id: 1746, fname: john, lname: smith}
More Than One Is Better
Up to this moment, unknowingly, you have been working with a cluster of Cassandra. A cluster with a single
node, but a cluster. Let’s check it. ;)
To check, use the nodetool utility, which is administered as a cluster of the Cassandra nodetool via CLI.
You can run the following to see the full list of commands:
CASSANDRA_HOME/bin>./nodetool
Among the list, you see the status command.
status Print cluster information (state, load, IDs, ...)
CHAPTER 5 STORAGE: APACHE CASSANDRA
92
8 http://docs.datastax.com/en/cassandra/3.0/cassandra/configuration/configCassandra_yaml.html
You can run nodetool with the status command.
CASSANDRA_HOME/bin>./nodetool status
Datacenter: datacenter1
=======================
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns (effective) Host
ID Rack
UN 127.0.0.1 122.62 KiB 256 100.0% 3e7ccbd4-8ffb-4b77-bd06-110d27536cb2 rack1
You can see that you run a cluster with a single node.
cassandra.yaml
When you have a cluster of more than one node, you modify the cassandra.yaml file. In this file, the
necessary settings of each node within a cluster are made. When you have only one node, theres nothing to
change. The file is located in the CASSANDRA HOME/conf folder.
The file has several options; you can see each option in detail in the documentation.
8 For a basic
configuration, however, there are few options that are required.
Table 5-4 describes the fields to create our cluster. The descriptions were taken from the afore
mentioned documentation.
Table 5-4. Minimum Configuration Options for Each Node in the Cluster
Option Description
cluster_name The name of the cluster.
seed_provider The addresses of the hosts deemed as contact points. Cassandra
nodes use the -seeds list to find each provider and learn the
topology of the ring.
seed_provider - class_name The class within Cassandra that handles the seed logic. It can be
customized, but this is typically not required.
seed_provider- parameters - seeds A comma-delimited list of IP addresses used by gossip for
bootstrapping new nodes joining a cluster.
listen_address The IP address or hostname that Cassandra binds to in order to
connect to other Cassandra nodes.
rpc_address The listen address for client connections (Thrift RPC service and
native transport).
broadcast_rpc_address The RPC address to broadcast to drivers and other Cassandra nodes.
endpoint_snitch Set to a class that implements the IEndpointSnitch interface.
CHAPTER 5 STORAGE: APACHE CASSANDRA
93
9 http://docs.datastax.com/en/cassandra/3.0/cassandra/configuration/secureFireWall.html
Setting the Cluster
In this example , assume that Cassandra is installed on the following machines:
107.170.38.238 (seed)
107.170.112.81
107.170.115.161
The documentation recommends having more than one seed, but because you have only three nodes in
this exercise, leave only a single seed. All machines have Ubuntu 14.04 and JDK 1.8 (HotSpot) ready.
The following steps assume that you are starting a clean installation in each machine, so, if a machine is
running Cassandra, you must stop and delete all data. We recommend that you start with clean installations.
If there is a firewall between the machines, it is important to open specific ports.
9
Machine01
Our first machine has the address 107.170.38.238 and it is the seed. It starts first when you finish setting up
the three machines.
Locate the CASSANDRA HOME/conf/cassandra.yaml file and make the following modifications. All nodes
in the cluster must have the same cluster_name .
cluster_name: 'BedxheCluster'
num_tokens: 256
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: “107.170.38.238”
listen_address: 107.170.38.238
rpc_address: 0.0.0.0
broadcast_rpc_address: 1.2.3.4
endpoint_snitch: RackInferringSnitch
Machine02
Our second machine has the address 107.170.112.81. Its setting only changes the value of listen_address .
cluster_name: 'BedxheCluster'
num_tokens: 256
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "107.170.38.238"
listen_address: 107.170.112.81
rpc_address: 0.0.0.0
broadcast_rpc_address: 1.2.3.4
endpoint_snitch: RackInferringSnitch
CHAPTER 5 STORAGE: APACHE CASSANDRA
94
Machine03
Our third machine has the address 107.170.115.161. Its setting also only changes the value of listen_address .
cluster_name: 'BedxheCluster'
num_tokens: 256
seed_provider:
- class_name: org.apache.cassandra.locator.SimpleSeedProvider
parameters:
- seeds: "107.170.38.238"
listen_address: 107.170.115.161
rpc_address: 0.0.0.0
broadcast_rpc_address: 1.2.3.4
endpoint_snitch: RackInferringSnitch
You have now finished the configuration of the nodes.
Note This configuration was simple. It was used for illustrative purposes. Configuring a cluster to a
production environment requires studyng several factors and experimenting a lot. Therefore, we recommend
using this setting because it is a simple exercise to begin learning the options.
Booting the Cluster
You first started Cassandra in the seed node (removing the -f flag, Cassandra starts the process and passes
the background).
MACHINE01/CASSANDRA_HOME/bin%>./cassandra
After you started cassandra in the other two nodes.
MACHINE02/CASSANDRA_HOME/bin%>./cassandra
MACHINE03/CASSANDRA_HOME/bin%>./cassandra
Now, if you execute nodetool in any of the machines, you see something like the following.
CASSANDRA_HOME/bin>./nodetool status
Datacenter: 170
===============
Status=Up/Down
|/ State=Normal/Leaving/Joining/Moving
-- Address Load Tokens Owns Host ID Rack
(effective)
UN 107.170.38.238 107.95 KiB 256 68.4% 23e16126-8c7f-4eb8-9ea0-40ae488127e8 38
UN 107.170.115.161 15.3 KiB 256 63.7% b3a9970a-ff77-43b2-ad4e-594deb04e7f7 115
UN 107.170.112.81 102.49 KiB 256 67.9% ece8b83f-d51d-43ce-b9f2-89b79a0a2097 112
Now, if you repeat the creation of the keyspace example in the seed node, you will see how the keyspace
is available in the other nodes. And conversely, if you apply a change to the keyspace in any node, it is
immediately reflected in the others.
CHAPTER 5 STORAGE: APACHE CASSANDRA
95
10 How SoundCloud Uses Cassandra. https://www.infoq.com/presentations/soundcloud-cassandra
11 Spotify: How to Use Apache Cassandra. https://www.youtube.com/watch?v=JWaECFyhvxI
12 Netflix: A State of Xen Chaos Monkey & Cassandra. https://www.youtube.com/watch?v=Mu01DmxQjWA
Execute the following in machine01 (the seed machine):
MACHINE01_CASSANDRA_HOME/bin%> ./cqlsh
cqlsh>CREATE KEYSPACE mykeyspace WITH REPLICATION = { ‘class’ : ‘SimpleStrategy’,
‘replication_factor’ : 1 };
cqlsh>USE mykeyspace;
cqlsh>CREATE TABLE users (user_id int PRIMARY KEY, fname text, lname text);
cqlsh>INSERT INTO users (user_id, fname, lname) VALUES (1745, ‘john’, ‘smith’);
cqlsh>INSERT INTO users (user_id, fname, lname) VALUES (1744, ‘john’, ‘doe’);
cqlsh>INSERT INTO users (user_id, fname, lname) VALUES (1746, ‘john’, ‘smith’);
Execute the following in machine02 or machine03:
CASSANDRA_HOME/bin%>./cqlsh
cqlsh> use mykeyspace;
cqlsh:mykeyspace> select * from users;
user_id | fname | lname
---------+-------+-------
1745 | john | smith
1744 | john | doe
1746 | john | smith
(3 rows)
That’s it. You have a cluster of three nodes working properly.
Putting It All Together
The best way to assimilate all of these concepts is through examples, so in later chapters, we show concrete
examples of the use of this architecture.
As you can see, beginning to use Cassandra is very simple; the similarity to SQL in making queries helps
to manipulate data from the start. Perhaps now that you know the advantages of Cassandra, you want to
know who is using it. There are three companies in particular that have helped increase the popularity of
Cassandra: SoundCloud,
10 Spotify, 11 and Netflix.
12
A lot of the stuff that exists online about Cassandra makes references to these companies, but they are
not the only ones. The following two web pages offer more extensive lists of companies that are committed
to Cassandra, and using some part of their data management in interesting use cases.
http://www.planetcassandra.org/companies/
http://www.planetcassandra.org/apache-cassandra-use-cases/
Beyond the advantages Cassandra, as the ring model and distributed data management within
the cluster, its main advantage is the level of integration with Spark, and in general, with the rest of the
technologies in this book.
Surely, you’ll use Cassandra in an upcoming project.
97
© Raul Estrada and Isaac Ruiz 2016
R. Estrada and I. Ruiz, Big Data SMACK, DOI 10.1007/978-1-4842-2175-4_6
CHAPTER 6
The Engine: Apache Spark
If our stack were a vehicle, now we have reached the engine. As an engine, we will disarm it, analyze it,
master it, improve it, and run it to the limit.
In this chapter, we walk hand in hand with you. First, we look at the Spark download and installation,
and then we test it in Standalone mode. Next, we discuss the theory around Apache Spark to understand
the fundamental concepts. Then, we go over selected topics, such as running in high availability (cluster).
Finally, we discuss Spark Streaming as the entrance to the data science pipeline.
This chapter is written for people who have never touched Apache Spark before. But as you can
imagine, due to space, we will not delve into many specific issues.
The following topics are covered in this chapter:
Introducing Spark
Spark concepts
Working with RDD
Running in cluster
Spark Streaming
Introducing Spark
Perhaps Apache Spark is the most important technology in the stack. It is divided into five modules: Core ,
SQL , MLIB , Streaming , and GraphX . Simply put, each module deserves a book the same size of the book that
you are now reading. Spark has captured the imagination of developers and analysts, simply because it takes
data manipulation from large laboratories to laptops, from large interdisciplinary teams to lone enthusiasts
who want to make data analysis, and from large corporate clusters to a cheap infrastructure accessible to all.
Spark is both infrastructure software and data science laboratory. Spark as an infrastructure engine can
be attached to powerful tools like Apache Kafka to produce data science pipelines . Simultaneously, it is a data
science laboratory because it represents an engine for machine learning in both a laptop and a productive
cluster, from a few data kilobytes up to what the hardware capacity allows. Likewise, you can build models
based on sample data and then apply them in larger datasets.
In times not so distant, installing the infrastructure for data analysis was an interdisciplinary task among
database specialists, operating system and network analysts, and application engineers and architects.
What makes Apache Spark so attractive is its ability to download and run it on a small and inexpensive
laptop.
Apache Spark (like all the technologies covered in this book) is an open source tool. It only requires Java
version 6 or higher. All the Scala and Akka dependencies are packaged within the distribution.
CHAPTER 6 THE ENGINE: APACHE SPARK
98
Apache Spark Download
Regardless of whether you use the development or production version, you must download the latest build
from
https://spark.apache.org/downloads.html (version 1.6.1 as of this writing).
As shown in Figure
6-1 , select Pre-built for Hadoop and later .
Figure 6-1. Apache Spark download page
Spark has a new release every 90 days. For hard-core coders who like to work with the latest builds, try
to clone the repository at
https://github.com/apache/spark . The instructions for generating the build are
available at
https://spark.apache.org/docs/latest/building-spark.html . Both the source code and the
binary prebuilds are available at this link.
To compile the Spark sources, we need the appropriate versions of Scala and the corresponding SDK.
Spark source tar includes the Scala components required.
The Spark development group has done a good job keeping the dependencies. On
https://spark.
apache.org/docs/latest/building-spark.html , you can see the latest information about it. According to
the site, to build Spark with Maven, Java version 6 or higher and Maven 3.0.4 are required.
To uncompress the package, execute the following command:
tar xvf spark-1.6.1-bin-hadoop2.4.tgz
CHAPTER 6 THE ENGINE: APACHE SPARK
99
Lets Kick the Tires
To test the installation, run the following command:
/opt/spark-1.6.1-bin-hadoop2.6/bin/run-example SparkPi 10
You should see an output like the one shown in Figure
6-2 , with the line Pi is roughly .
Figure 6-2. T e s t i n g A p a c h e S p a r k
To open a Spark interactive shell, go to the bin directory and run the spark-shell:
$> /bin/spark-shell
You should see output similar to Figure 6-3 (which shows Windows 64-bit so that no one feels left out of
this party):
Figure 6-3. The Apache Spark shell
Like all modern shells , the Spark shell includes history. You can access it with the up and down arrows.
There are also autocomplete options that you can access by pressing the Tab key.
As you can see, Spark runs in Scala; the Spark shell is a Scala terminal with more features. This chapters
Scala examples run without problems. You can test, as follows:
scala> val num = 1 to 400000
num: scala.collection.immutable.Range.Inclusive = Range (...
CHAPTER 6 THE ENGINE: APACHE SPARK
100
To convert our Range to a RDD (now we see it is that), do the following:
scala> val myRDD = sc.parallelize(num)
myRDD: org.apache.spark.rdd.RDD [Int] = ParallelCollectionRDD [0] at parallelize at <console>
In this case, there is a numeric RDD. Then, as you may guess, you can do all the math operations with
Scala data types. Let’s use only the odd numbers:
scala> myRDD.filter (_% 2 != 0) .collect ()
res1: Array [Int] = Array (1, 3, 5, 7, 9 ...)
Spark returns an int array with odd numbers from 1 to 400,000. With this array, you can make all the
math operations used with Scala int arrays.
Now, you are inside Spark, where things can be achieved in a big corporate cluster.
Basically, Spark is a framework for processing large volumes of data— in gigabytes, terabytes, or even
petabytes. When you work with small data volumes, however, there are many solutions that are more
appropriate than Spark.
The two main concepts are the calculations and scale. The effectiveness of the Spark solution lies in
making complex calculations over large amounts of data, in an expeditious manner.
Loading a Data File
Upload a text file in Spark within the Spark shell:
scala> val bigfile = sc.textFile ("./big/tooBigFile.txt")
This magically loads the tooBigFile.txt file to Spark, with each line a different entry of the RDD
(explained shortly). The RDDs are very versatile in terms of scaling.
If you connect to the Spark master node, you may try to load the file in any of the different machines
in the cluster, so you have to ensure that it can be accessed from all worker nodes in the cluster. In general,
you always put your files in file systems like HDFS or S3. In local mode, you can add the file directly (e.g., sc.
textFile ([path_to_file) ). You can use the addFile() SparkContext function to make a file available to all
machines in this way:
scala> import org.apache.spark.SparkFiles
scala> val myFile = sc.addFile( "/opt/big/data/path/bigFile.dat" )
scala> val txtFile = sc.textFile (SparkFiles.get("bigFile.txt"))
For example, if you load a (big) input file where each line has a lot of numbers, the first RDD file whose
elements are strings (text lines) is not very helpful. To transform the string elements to an array of doubles,
use your knowledge of the Scala language:
scala> val myArrays = textFile.map (line => line.split('').map(_. toDouble))
To verify that this is what you wanted, you can use the first() operator on both txtFile and myArrays
to see that the first element in the txtFile is a string and in myArrays is an Array[Double].
Loading Data from S3
As part of Amazon support, you have access to a file system called Amazon S3 . To access it, you need the
AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY variables (to configure them, see the “Running
Spark on EC2” section in this chapter).
CHAPTER 6 THE ENGINE: APACHE SPARK
101
For instance, you can use the Amazon examples on a data file from Wikipedia:
scala> val myWiki = sc.textFile ("S3N://bigdatademo/sample/wiki/")
We don’t need to set our AWS credentials as parameters for the Spark shell; this is the general path form
for access the S3 file system:
S3N://<AWS ACCESS ID>:<AWS SECRET>@bucket/path
As another example, you can get Wikipedias traffic statistics from over the last 16 months at
https://
aws.amazon.com/datasets/wikipedia-traffic-statistics-v2/ .
Spark Architecture
Now is a good time to discuss the Spark mechanism. Let’s first talk about the architecture and then about the
programming.
Parallelism is computational term used when we talk about performing operations in parallel;
that is, if we have a process that works on a portion of data, we can “make copies” of that process to act
simultaneously on the same portion of data. Not all processes are parallelizable. Spark’s power is in its ability
to do parallel computing in a simple way; this is just one of its main advantages.
When you program on your machines or laptops, the Spark shell is run locally. The work is performed in
a single node. When you are working on a cluster, all you have to do is connect the same shell to the cluster
to run it in parallel. Figure
6-4 explains how Spark runs on a cluster.
Figure 6-4. Spark cluster with three executor nodes
CHAPTER 6 THE ENGINE: APACHE SPARK
102
The two main concepts of Spark are the resilient distributed dataset (RDD) and the cluster manager. In a
nutshell, the RDD is a parallelized computational abstraction of a collection. The cluster manager distributes
the code and manages the data represented in the RDDs. The cluster manager has three responsibilities:
controls the distribution and interaction with RDDs, distributes code, and manages the fault-tolerant
execution.
Spark can work over several types of cluster managers; in this chapter, we talk about the standalone
manager, and in a subsequent chapter, we talk about Apache Mesos. Hadoop Yarn is not covered in this book
because we focus only on pipeline architectures, not Hadoop.
If you have Hadoop 2.0 installed, we recommend that you install Spark on Hadoop Yarn. If you have
Hadoop 1.0 installed, we recommend that you use Spark Standalone. It is not suitable to install Apache
Mesos and Hadoop Yarn at the same time.
The Spark driver program distributes the program classes in the cluster. The cluster manager starts the
executors, one on each node, and assigns them a tasks set. When you run a program, all of this enginery
runs transparently in your machines. For example, when you run on a cluster, the entire administration is
transparent to you. That is Spark’s power.
S p a r k C o n t e x t
Now that you have Spark running on your laptop, let’s start programming in more detail. The driver
programs access the Spark core through the SparkContext object, which represents the connection between
the cluster and the nodes. In the shell, Spark is always accessed through the sc variable; if you want to learn
more about the sc variable, type this:
scala> sc
res0: org.apache.spark.SparkContext = org.apache.spark.SparkContext@4152bd0f
Creating a SparkContext
In a program, you can create a SparkContext object with the following code:
val sparkContext = new SparkContext( masterPath, "applicationName", ["SparkPath
(optional)"],["JarsList (optional)"])
It is always possible to hard-code the value of the parameters; however, it is best read from the
environment with suitable defaults. This allows you to run the code when you change the host
without recompiling it. Using local as the default for the master makes it easy to launch the application
in a local testing environment. You must be careful when selecting the defaults. Heres an example
snippet:
import spark.sparkContext._
import scala.util.Properties
val masterPath = Properties.envOrElse("MASTER","local")
val sparkHome = Properties.get("SPARK_HOME")
val jarFiles = Seq(System.get("JARS"))
val sparkContext = new SparkContext(masterPath, "MyAppName", sparkHome, jarFiles)
CHAPTER 6 THE ENGINE: APACHE SPARK
103
SparkContext Metadata
The SparkContext object has useful metadata (see Table 6-1 ); for example, the version number, the
application name, and the available memory. If you recall, information about the version is displayed when
you start the Spark shell.
Table 6-1. Some Useful SparkContext Metadata
Value Type Use
appName String The application name. If you followed the convention, this
value is useful at runtime.
getConf SparkConf Return a copy of this SparkContext’s configuration.
getExecutorMemoryStatus Map[String,
(Long, Long)]
Return a map from the slave to the maximum memory
available for caching and the remaining memory available
for caching. As it is distributed, it does not prevent OOM
exceptions.
isLocal Boolean Are we running in local?
isStopped Boolean Are we running?
master String Master node name.
sparkUser String Spark OS username.
startTime Long Node start time.
version String Useful when testing several Spark versions.
Here are some examples of SparkContext metadata to print the Spark version, the application name, the
master node’s name and the memory:
$ bin/spark-shell
scala> sc.version
res0: String = 1.6.1
scala> sc.appName
res1: String = Spark shell
scala> sc.master
res2: String = local[*]
scala> sc.getExecutorMemoryStatus
res3: scala.collection.Map[String,(Long, Long)] = Map(localhost:52962 ->
(535953408,535953408))
SparkContext Methods
The SparkContext object is the main entry point for your application and your cluster. It is also used for
loading and saving data. You can use it to launch additional Spark jobs and remove dependencies.
Table
6-2 shows some SparkContext methods, but you can see all the SparkContext attributes and methods
at
https://spark.apache.org/docs/latest/api/scala/#org.apache.spark.SparkContext $ .
CHAPTER 6 THE ENGINE: APACHE SPARK
104
Working with RDDs
The resilient distributed dataset is Apache Spark’s core concept. Spark has four design goals :
In-memory data storage . This is where Apache Hadoop is defeated, because
Hadoop is primarily disk storage.
Fault tolerant . Achieved with two features: cluster operations and the application of
linear operations on small data chucks.
Efficiency . Achieved with operation parallelization between cluster parts.
Fast . Achieved by minimizing data replication between cluster members.
The main idea is that with RDD, you only can perform two types of operations:
Transformations . When a transformation is applied on an RDD, a new RDD is
created. For example, the set operations (union, intersection, and join) or as you
learned in Chapter
3 , mapping, filtering, sort, and coalesce.
Actions . When we apply an action over an RDD, the original RDD does not change.
For example: count, collect, and first.
Computer science has a solid foundation in mathematics; all computer models have a solid
mathematical model behind them. In functional programming, functions are first-class citizens; that is,
functions are not modeled as objects, but are simply functions. When you apply a function to another
function, the result is another function. In algebra this is known as function composition . If function f is
applied to the function g , the operation is denoted as f o g , which is equivalent to f(g()) .
In linear algebra, there are operations between vectors. There are vector operations whose input is
various vectors and the result is a new vector (for example, vector addition). In Spark, vectors would be
RDDs and operations whose return value is an RDD are equivalent to transformations.
Table 6-2. Some Useful SparkContext Methods
Method Parameter Return Use
addJar() path:String Unit Adds jar files for all tasks to be executed
on the SparkContext in the future.
addFile() path:String Unit Distribute a file to all nodes on a cluster.
accumulator() value: T
name: String
Accumulator Creates an accumulator (a distributed
variable among the cluster).
cancelAllJobs() --- Unit Cancel all jobs (scheduled and
running).
clearJobGroup() --- Unit Clear the current thread’s job.
killExecutor() id:String Boolean Request to cluster manager to kill the
specified executors.
setJobDescription() value:String Unit Set a human-readable description of
the current job.
textFile() path:String
minPartitions: int
String Read a text file and return it as an RDD
of strings.
stop() --- Unit Shut down the SparkContext.
CHAPTER 6 THE ENGINE: APACHE SPARK
105
On the other hand, there are functions whose input is several vectors and the output is a scalar value;
for example, the inner product. In Spark, actions are the equivalent of these operations.
As with functional programming, there are also rules for RDDs:
Immutability . In both actions and transformations, the original RDD is not
modified. Yes, the concept of a “variable” value in functional programming is an
aberration: it does not exist; all the things (functions, values, objects) must be
immutable.
Resilient . In Spark, the chain of transformations from the first RDD to the last RDD
is always logged; that is, if a failure occurs (the power goes out or someone trips over
the power cord), the process can be reproduced again from the beginning or from
the point of failure.
Lazy evaluation . Since we live in a functional context, the transformations on
RDDs are always lazy. They are not executed until (and only until) the end result is
required. As you saw in Chapter
3 , this exists to improve performance, because it
avoids unnecessary data processing and the waste of resources (usually caused by
the developer).
Process aware . As you saw in Chapter
4 , lazy evaluation prevents deadlocks and
bottlenecks, because it prevents the indefinite process of waiting for other processes’
output. Recall that the lazy evaluation emulates all the operations already made and
uses a “result avatar” to estimate the final result.
Memory storage . By default, RDDs are born, and live and die in memory. The
RDDs are stored on disk only if explicitly requested. This increases the performance
terrifically, because you don’t fetch them from the file system or database.
In addition, we now have the DataFrames API (since 2015). This API offers the following:
Scalability . You can test kilobyte-sized data samples on your laptop, and then run
the same programs on a production cluster with several terabytes of data.
Optimization . The powerful Spark SQL Catalyst optimizer offers two advantages:
SQL beautification and SQL optimization. It also provides source code generation
from actual SQL.
Integration . Smooth integration with the other members of the Spark family (Core,
SQL, Streaming, MLlib, GraphX).
Multiformat . Supports several data formats and storage systems.
Before continuing, we must take the time to learn about what RDDs are and what they are not.
It is crucial to understand that when an RDD is defined, it actually contains no data. You only create
a container for it. RDDs follow the lazy evaluation principle; an expression is not evaluated until it is
necessary (i.e., when an action is requested). This means that when you try to access the data in an RDD,
you could fail. The data operation to create an RDD is only performed when the data is referenced to store
or catch the RDD.
This also means that when you concatenate a large number of operations, you don’t have to worry
about the excessive operations locking a thread. It is important to keep this in mind during application
development—when you write and compile code, and even when you run the job.
CHAPTER 6 THE ENGINE: APACHE SPARK
106
Standalone Apps
You can run Spark applications in two ways: from the pretty Spark shell or from a program written in Java,
Scala, Python, or R. The difference between the two modes is that when you run standalone applications ,
you must initialize the SparkContext, as you saw in previous sections.
To run a program in Scala or Java, it is best to use Maven (or Gradle or SBT, whichever you want). You
must import the dependency to your project. At the time of this writing, the Spark version is 1.6.1 (version 2
exists, but it’s very new).
The following are the Maven coordinates for version 1.6.1:
groupId = org.apache.spark
artifactId = spark-core_2.10
version = 1.6.1
Initializing the SparkContext
Once you have Spark dependencies installed in your project, the first thing that you have to do is create a
SparkContext.
As you saw earlier, you must first create an object of type SparkConf to configure your application, and
then build a SparkContext object from it.
// All the necessary imports
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
// Create the SparkConf object
val conf = new SparkConf().setMaster("local").setAppName("mySparkApp")
// Create the SparkContext from SparkConf
val sc = new SparkContext(conf)
The SparkConf constructor receives two parameters:
Cluster URL . This is "local" when you want to run the program on one thread on
the local machine, (i.e., without a cluster).
Application name . This name is used to identify your application in the cluster
manager UI (running in cluster mode). Here we called it mySparkApp.
Standalone Programs
We already have the necessary imports: the SparkConf object and the SparkContext object. Now let’s give
our program a body. It is important to note that all the stuff that runs on the Spark shell should run on Spark
Standalone programs.
In all the “big data” books there is a word-count example; this book could not be the exception. Our
program input is a file (yes, we already know how to load files) of the Franz Kafka novel The Process , in
English.
The exercise objective is to see the number of times a word occurs and to see the most repeated words.
// We create a RDD with the the-process.txt file contents
val myfile = sc.textFile("the-process.txt")
CHAPTER 6 THE ENGINE: APACHE SPARK
107
// Then, we convert the each line text to lowercase
val lowerCase = myFile.map( line => line.toLowerCase)
// We split every line in words (strings separated by spaces)
// As we already know, the split command flattens arrays
val words = lowerCase.flatMap(line => line.split("\\s+"))
// Create the tuple (word, frequency), initial frequency is 1
val counts = words.map(word => (word, 1))
// Let’s group the sum of frequencies by word, (easy isn’t?)
val frequency = counts.reduceByKey(_ + _)
// Reverse the tuple to (frequency, word)
val invFrequency = frequency.map(_.swap)
// Take the 20 more frequent and prints it
invFrequency.top(20).foreach(println)
It is fundamental to note that everything doesn’t run until the last println invocation. Yes, all the
previous words are transformations, and the last line is the action. We will clear this up later.
Hold on, the most frequent types of words (in all human languages) are conjunctions and prepositions,
so before separating each sentence into words in the third step, we filter the “stop words” in English
(obviously there are better lists on Internet, this is just an example).
val tinytStopWords = Set("what", "of", "and", "the", "to", "it", "in", "or", "no", "that",
"is", "with", "by", "those", "which", "its", "his", "her", "me", "him", "on", "an", "if",
"more", "I", "you", "my", "your", "for" )
val words = lowerCase
.flatMap(line => line.split("\\s+"))
.filter(! tinyStopWords.contains(_))
Run the Program
When your program is complete, use the script located on /bin/spark-submit to run it. Modern Java/Scala
IDEs have embedded the Spark integration to run it smoothly.
But this book is mostly read by command-line fellas and old-school soldiers. Here we show how to run
it from a command line with SBT and with Maven:
// To run it with sbt
sbt clean package
$SPARK_HOME/bin/spark-submit \
--class com.apress.smack.WordFreq \
./target/...(as above) \
./README.md ./wordfreq
// To run it with Maven
mvn clean && mvn compile && mvn package
$SPARK_HOME/bin/spark-submit \
--class com.apress.smack.WordFreq \
./target/WordFreq-0.0.1.jar \
./README.md ./wordfreq
If nothing works, you can always refer to the official Spark guide at
http://spark.apache.org/docs/
latest/quick-start.html .
CHAPTER 6 THE ENGINE: APACHE SPARK
108
RDD Operations
RDDs have two types of operations: transformations and actions . Transformations are operations that
receive one or more RDD as input and return a new RDD. Actions return a result to the driver program and/
or store it, and/or trigger a new operation.
If you still get confused and don’t know how to distinguish them, this is the rule: transformations return
RDDs; actions don’t .
Transformations
Transformations are operations with these characteristics:
Lazy evaluation . Transformations are lazy operations; they aren’t calculated until
you perform an action or explicitly invoke the collect method. This behavior is
inherited from the actor model and functional programming.
Element-wise . Transformations work on each individual element of a collection;
one at a time.
Immutable . RDDs are immutable, thus transformations are immutable too (i.e., they can’t
modify the value of the RDD received as a parameter. There are no global variables).
Lineage graph . Let’s suppose you have a transformations sequence. We have RDDs as
result of transformations in other RDDs. Spark keeps a track of each operation and of
the dependencies among all the RDDs. This record, known as a lineage graph , is kept
to recover the system from a failure. Spark always builds a lineage graph when running
distributed applications on a cluster.
Table 6-3 enumerates the main transformations.
Table 6-3. Spark Main Transformations
Transformation Purpose Example
filter( function) Builds a new RDD by
selecting the elements
on which the function
returns true.
> val rdd = sc.parallelize(List(“Spark”, “Mesos,
Akka, “Cassandra, “Kafka”))
> val k = rdd.filter(_.contains(“k”))
> k.collect()
Result:
Array[String] = Array(Spark, Akka, Kafka)
map( function) Builds a new RDD by
applying the function on
each element.
> val rdd = sc.parallelize(List(1, 2, 3, 4))
> val t = rdd.map(_*5)
> t.collect()
Result:
Array[Int] = Array(5, 10, 15, 20)
flatMap( function ) The same as map() but
it returns a sequence
instead of a value.
> val rdd = sc.parallelize(List(“Big Data are
Buzzwords”, “Make Fast Data”))
> val fm = rdd.flatMap( s => s.split(“ ”) )
> fm.collect()
Result:
Array[String] = Array(Big, Data, are, Buzzwords,
Make, Fast, Data)
(continued)
CHAPTER 6 THE ENGINE: APACHE SPARK
109
Table 6-4 lists the main transformations on sets.
Transformation Purpose Example
reduceByKey( function,
[number] )
Aggregates the values of a
key using the function.
> val words = fm.map( w => (w, 1) )
> val wordCount = words.reduceByKey( _+_ )
> wordCount.collect()
Result:
Array[(String, Int)] = Array((are,1), (Big,1), (Fast,1),
(Make,1), (Buzzwords,1), (Data,2))
groupByKey([numTasks]) Converts (K, V) to (K,
Iterable<V>).
> val wc = wordCount.map{case(w,c) => (c,w)}
> wc.groupByKey().collect()
Result:
Array[(Int, Iterable[String])] =
Array((1,CompactBuffer(are, Big, Fast, Make,
Buzzwords)), (2,CompactBuffer(Data)))
distinct([numTasks]) Eliminates duplicates. > fm.distinct().collect()
Result:
Array[String] = Array(are, Big, Fast, Make,
Buzzwords, Data)
Table 6-3. (continued)
Table 6-4. Main Transformations on Sets
Transformation Purpose Example
union() Builds a new RDD containing all
elements from the source and the
argument.
> val foo = sc.parallelize(List(“Big”, “Data”))
> val bar = sc.parallelize(List(“Fast, “Data”))
> foo.union(bar).collect()
Result:
Array[String] = Array(Big, Data, Fast, Data)
intersection() Builds a new RDD containing only
common elements between the
source and argument.
> foo.intersection(bar).collect()
Result:
Array[String] = Array(Data)
cartesian() Builds an RDD with cross product
of all elements from the source and
the argument.
> foo.cartesian(bar).collect()
Result:
Array[(String, String)] = Array((Big,Fast),
(Big,Data), (Data,Fast), (Data,Data))
subtract() Builds a new RDD by removing
common data elements between
source and argument.
> foo.subtract(bar).collect()
Result:
Array[String] = Array(Big)
(continued)
CHAPTER 6 THE ENGINE: APACHE SPARK
110
Actions
Although actions return scalar (simple) values, you must never underestimate them, since the internal
process can become really complex. Actions return the result to the driver program and/or write in and store
the result.
Pipeline of operations are advanced sequentially, operation by operation; however, remember that
everything is lazy evaluation. Flow can advance, and when it finds an action, everything is evaluated to that
point. Actions trigger the evaluation of all previous transformations.
Actions always trigger an evaluation because they must always return a value; if they don’t return a
value or store something, they can’t continue. Table
6-5 enumerates the main Spark actions.
Table 6-5. Main Spark Actions
Action Purpose Example
count() Obtains the number of
RDD elements.
> val smack = sc.parallelize( List(‘s', 'M', 'A', 'C', 'K') )
> smack.count()
Result:
long = 5
collect() Returns all the RDD
elements as an array.
> val smack = sc.parallelize( List(“S”, “M”, “A, “C”,
“K”) )
> smack.collect()
Result:
Array[String] = Array(S, M, A, C, K)
reduce( function) Aggregates the RDD
elements using the
function.
> val smack = sc.parallelize( List(1, 5, 2, 4, 3) )
> smack.reduce(_+_) // the sum of all
Result:
Int = 15
Transformation Purpose Example
join( RDD,
[number] )
When invoked on (K,V) and (K,W),
creates a new RDD with (K, (V,W))
> val foo = sc.parallelize( Seq((1, “S”), (2, “M”),
(3, “A”), (1, “C”), (4, “K”)))
> val bar = sc.parallelize( Seq((1, “W”), (2, “X”),
(3, “Y”), (2, “Z”)))
> foo.join( bar ).collect()
Result:
Array[(Int, (String, String))] = Array((1,(S,W)),
(1,(C,W)), (2,(M,X)), (2,(M,Z)), (3,(A,Y)))
cogroup( RDD,
[number] )
Converts (K, V) to (K, Iterable<V>). > foo.cogroup(bar).collect()
Result:
Array[(Int, (Iterable[String], Iterable[String]))] =
Array((4,(CompactBuffer(K),CompactBuffer())),
(1,(CompactBuffer(S, C),CompactBuffer(W))),
(2,(CompactBuffer(M),CompactBuffer(X, Z))),
(3,(CompactBuffer(A),CompactBuffer(Y))))
Table 6-4. (continued)
(continued)
CHAPTER 6 THE ENGINE: APACHE SPARK
111
RDD Persistence (Caching)
Now you know that RDDs support lazy evaluation. But what if you want to use the same RDD several
times? If you don’t do this work conscientiously, by default, Spark will recalculate the RDD and all of its
dependencies each time that you apply an action on it. If not done carefully, this can be very expensive.
You can tell Spark to persist the RDD to avoid recalculating them all the time. When you persist an RDD,
the nodes working with it store the RDD partitions assigned to them. If a node fails, Spark recalculates lost
partitions as needed (yes, it’s powerful).
You can also replicate your RDD among several nodes if you want to handle a node failure without
performance implications. As shown in Table
6-6 , Spark offers several levels of persistence to suit all of our
scenarios and needs. Note that when writing data to disk the data is always serialized.
Action Purpose Example
take( n ) Fetches the first n
elements of the RDD.
> val smack = sc.parallelize( List(‘s', 'M', 'A', 'C', 'K') )
> smack.take(4)
Result:
Array[Char] = Array(S, M, A, C)
foreach( function) Executes the function in
each RDD element.
> val s = sc.parallelize(List(1, 4, 2, 3))
> s.foreach(n =>
print( “%s*7=%s ”.format(n, n*7) ))
Result:
1*7=7 4*7=28 2*7=14 3*7=21
first() Fetches the RDD first
element, the same as
take(1).
> val rdd = sc.parallelize(List(4, 3, 2, 1))
> rdd.first()
Result:
Int = 4
saveAsTextFile(path) Writes the RDD content
to the text file on local file
system/HDFS.
> val myLogs = sc.textFile(“/users/smack/
evidence.log”)
> myLogs.filter(_.contains(“Fatal”)).
myLogs.saveAsTextFile(“/users/smack/fact.txt”)
Result:
smack@localhost~/smack$ ls _SUCCESS part-
00000 part-00001
Table 6-5. (continued)
Table 6-6. RDD Persistence Levels
Persistence Level CPU Used Space Used On Disk In Memory
MEMORY_ONLY Low High No Yes
MEMORY_AND_DISK(*) Medium High Some Some
MEMORY_ONLY_SER High Low No Yes
MEMORY_AND_DISK_SER(*) High Low Some Some
DISK_ONLY High Low Yes No
OFF_HEAP (experimental) Low Low Some Some
*Write to disk if there is much data stored in memory. (Note that SER means serializable)
CHAPTER 6 THE ENGINE: APACHE SPARK
112
An important caching scheme is off-heap, a mixed scheme. It was previously called Tachyon, but now
it’s called Alluxio (
http://alluxio.org/ ). Note that the off-heap catching doesn’t guarantee recovery after
failure.
This is a code example:
import org.apache.spark.storage.StorageLevel
val rdd = input.map( foo )
rdd.persist( StorageLevel.DISK_ONLY )
rdd.reduce( bar )
rdd.collect()
Here are some points to consider:
You must call the persist() method in the code before the first action.
The persist() function doesn’t force the evaluation.
Spark automatically evicts old partitions using an LRU (least recently used) cache
policy.
The persist() method counterpart is the unpersist() method to manually remove
RDDs from the cache.
Spark in Cluster Mode
In this chapter, we have focused on running Spark in local mode. As we mentioned, horizontal scaling is
what makes Spark so powerful. To run Apache Spark on a cluster, you do not need specialized software-
hardware integration engineers. To escalate, you don’t need to make great efforts and stop the entire
production to add more machines to your cluster.
The good news is that the same scripts that you are building on your laptop with examples that are only
a few kilobytes can run on business clusters running terabytes of data. There is no need to change your code,
nor invoke another API. All you have to do is test your model several times to know if it runs correctly, and
then you can deploy it.
In this section, you analyze the runtime architecture of a distributed Spark application. Then you see
the options to run Spark on a cluster.
Apache Spark has its own built-in standalone cluster manager. But you can run it on multiple cluster
managers, including Hadoop YARN, Amazon EC2, and Apache Mesos. This topic is so large that it has its
own chapter in this book.
Runtime Architecture
Before running Spark on a cluster, it’s important to understand the distributed Spark architecture.
As shown in Figure
6-5 , Spark uses a master/slave architecture. The master is called the driver and the
slaves are called executors . When running on a single machine, there is a distributed architecture: a driver
with several executors. The driver runs in its own Java process, and each executor runs in a separate Java
process. This architecture is made on the actor model.
CHAPTER 6 THE ENGINE: APACHE SPARK
113
The driver and executors set is known as a Spark application. If you have more than one machine, the
Spark application must be launched using the cluster manager service. The Spark application architecture is
always the same; it does not matter if it’s clustered or not.
In a typical Spark clustered application architecture, each physical machine has its own executor. You
will see several strategies to know when an executor dies or goes offline.
Driver
The driver is the process where the SparkContext runs. It is in charge of creating and executing
transformations and actions on RDDs. When you run the Spark shell command on your laptop, you are
actually creating a driver program. Its first task is to create the SparkContext, called sc. When the driver
program dies, the entire application dies.
The following sections explain the two responsibilities in the life of a driver program: dividing a
program into tasks and scheduling tasks on executors.
Divide a Program into Tasks
The Spark driver is responsible for splitting the user program , which could be programmed in an inefficient
way in execution units called tasks .
A user program basically applies transformations and actions into one or more RDDs to generate new
RDDs and calculate and/or store data.
Another task of the Spark driver is to generate an operation’s directed acyclic graph (DAG) . With this
graph, the driver knows which tasks are assigned to which node; so if you lost a node, the driver knows at
which point it was at and how to assign the lost nodes tasks to the remaining nodes.
The driver also does pipeline optimizations; it splits the DAG into stages. Each stage has multiple tasks.
In Spark, the task is the smallest work unit; a normal program can launch thousands of tasks.
Figure 6-5. Distributed Spark application
CHAPTER 6 THE ENGINE: APACHE SPARK
114
Scheduling Tasks on Executors
Given a physical execution plan, the Spark driver coordinates which tasks are performed by each executor
node . When an executor starts operating, it registers itself in the driver, so the driver always has an entire
view of all the executor nodes. Each executor is a standalone Java process that can run tasks and store RDDs.
When a program runs, the driver subdivides the program into tasks, sees all the available executor
nodes, and tries to balance the workload among them. The driver also knows which part of the data that
each node has, in order to rebuild everything at the end.
The driver displays its information on the Web, so that the user can always see what is happening; by
default, it runs on port 4040. When you run locally, it can be accessed at http://localhost:4040 , as you can
see in Figure
6-6 (let’s run the Spark shell and browse it).
Figure 6-6. Spark shell application web UI
E x e c u t o r
Executors are responsible for running the individual tasks of a given Spark job. Executors are launched when
you start the Spark application; they live while the Spark application lives.
CHAPTER 6 THE ENGINE: APACHE SPARK
115
Executors have two roles:
Run the assigned tasks and deliver results to the driver.
Provide in-memory storage for RDDs. A program called Block Manager runs on each
executor and manages memory and RDDs.
When running Spark in local mode, the driver and executors run in the same process. This is for
development purposes; it is not recommended in a production environment.
Cluster Manager
Spark depends on a cluster manager to coordinate and launch the executors. The cluster manager that
ships with the Spark distribution is called the standalone manager , but it is a pluggable component. You can
change it and use a custom cluster manager like Hadoop Yarn, Amazon EC2, or Apache Mesos.
It is important to note that the terms driver and executor are used when talking about the Spark
application. When we talk about the cluster manager, we use the terms master and worker . It is important
not confuse the terms or to exchange them, because they are different concepts.
Regardless of the cluster manager that you use, Spark provides a single script, called spark-submit, to
launch the program. The spark-submit script can connect to different managers through various options and
manage the cluster resources that the application needs.
Program Execution
When you run a Spark application on a cluster, these are the steps followed by the program:
1 . The user runs the spark-submit shell.
2 . The spark-submit shell launches the driver program, and calls the user programs
main() method.
3 . The driver program establishes the connection to the cluster manager, which has
the slave machines list. Then, the necessary resources are requested to launch
the executors.
4 . The cluster manager launches executors in each slave node.
5 . The driver program analyzes, divides, and distributes the user application,
sending each executor its tasks.
6 . The tasks run in the executors, calculating and storing the results.
7 . The user program ends when the exit() method in the main() method is
invoked, or when the SparkContext stop() method is called.
8 . The driver program ends the executors and frees the cluster manager’s resources.
Application Deployment
Spark uses the spark-submit tool to send jobs to the cluster manager.
When you run a program in local mode, you only invoke spark-submit passing your script name or jar
file as a parameter.
When you run a program in cluster mode, you have to pass additional parameters—for example, the
size of each executor process.
CHAPTER 6 THE ENGINE: APACHE SPARK
116
The --master flag specifies the cluster URL to which you want to connect. In this case, spark:// means
that we are using Spark in stand-alone mode.
For example:
bin/spark-submit --master spark://skynet:7077 --executor-memory 10g Terminator.jar
Here we indicate that we will run our Terminator program in stand-alone cluster mode in the master
node called SkyNet and each executor node will have 10 gigabytes of memory.
In addition to the cluster URL, spark-submit has several options to specify how you want to run your
application. These options are in of two categories:
Scheduling data . The amount of resources that each job will have.
Dependencies . The files and libraries available in the slaves.
Table 6-7 lists and describes some of the spark-submit flags.
Table 6-7. s p a r k - s u b m i t F l a g s
Flag Purpose
--master The cluster manager to connect to (sample values explained in Table 6-8 ).
--deploy-mode Indicates if the program is launched in local mode (client) or cluster mode. In
local mode, the driver is launched where spark-submit is launched. The default
value is client mode.
--class The main application class is Java/Scala.
--name Human-readable name for your application, as displayed in Spark web UI.
--jars List of jar files to upload on application classpath.
--files List of files uploaded on the application’s working directory on each node.
--executor-memory Memory for executors: k for kilobytes, m for megabytes, g for gigabytes.
--driver-memory Memory for the driver process: k for kilobytes, m for megabytes, g for gigabytes.
--conf prop=value A single configuration property value in key-value form.
--properties-file A configuration file with properties in key-value form.
Table 6-8. Master Flag Sample Values
Value Meaning
spark://host:port Connect to a cluster in stand-alone mode at specified host and port. 7077 is the
default port for a stand-alone master.
mesos://host:port Connect to a Mesos cluster at the specified host and port. 5050 is the default port for
the Mesos master.
local Local mode master with a single core.
local[N] Local mode master with N cores.
local[*] Local mode master using as many cores as the machine has.
Table 6-8 lists a few example values that the --master flag could have.
CHAPTER 6 THE ENGINE: APACHE SPARK
117
Now you are able to read this:
$ ./bin/spark-submit \
--master spark://skynet:7077 \
--deploy-mode cluster \
--class com.cyberdyne.Terminator \
--name "T1000 model" \
--jars neuralNetwork.jar,geneticAlgorithm.jar \
--total-executor-cor