Spark OReilly.Spark.The.Definitive.Guide.2018.2

User Manual:

Open the PDF directly: View PDF PDF.
Page Count: 792 [warning: Documents this large are best viewed by clicking the View PDF Link!]

Spark: The Definitive Guide
Big Data Processing Made Simple
Bill Chambers and Matei Zaharia
Spark: The Definitive Guide
by Bill Chambers and Matei Zaharia
Copyright © 2018 Databricks. All rights reserved.
Printed in the United States of America.
Published by O’Reilly Media, Inc., 1005 Gravenstein Highway North,
Sebastopol, CA 95472.
O’Reilly books may be purchased for educational, business, or sales
promotional use. Online editions are also available for most titles
(http://oreilly.com/safari). For more information, contact our
corporate/institutional sales department: 800-998-9938 or
corporate@oreilly.com.
Editor: Nicole Tache
Production Editor: Justin Billing
Copyeditor: Octal Publishing, Inc., Chris Edwards, and Amanda
Kersey
Proofreader: Jasmine Kwityn
Indexer: Judith McConville
Interior Designer: David Futato
Cover Designer: Karen Montgomery
Illustrator: Rebecca Demarest
February 2018: First Edition
Revision History for the First Edition
2018-02-08: First Release
See http://oreilly.com/catalog/errata.csp?isbn=9781491912218 for release
details.
The O’Reilly logo is a registered trademark of O’Reilly Media, Inc. Spark:
The Definitive Guide, the cover image, and related trade dress are trademarks
of O’Reilly Media, Inc. Apache, Spark and Apache Spark are trademarks of
the Apache Software Foundation.
While the publisher and the authors have used good faith efforts to ensure
that the information and instructions contained in this work are accurate, the
publisher and the authors disclaim all responsibility for errors or omissions,
including without limitation responsibility for damages resulting from the use
of or reliance on this work. Use of the information and instructions contained
in this work is at your own risk. If any code samples or other technology this
work contains or describes is subject to open source licenses or the
intellectual property rights of others, it is your responsibility to ensure that
your use thereof complies with such licenses and/or rights.
978-1-491-91221-8
[M]
Preface
Welcome to this first edition of Spark: The Definitive Guide! We are excited
to bring you the most complete resource on Apache Spark today, focusing
especially on the new generation of Spark APIs introduced in Spark 2.0.
Apache Spark is currently one of the most popular systems for large-scale
data processing, with APIs in multiple programming languages and a wealth
of built-in and third-party libraries. Although the project has existed for
multiple years—first as a research project started at UC Berkeley in 2009,
then at the Apache Software Foundation since 2013—the open source
community is continuing to build more powerful APIs and high-level
libraries over Spark, so there is still a lot to write about the project. We
decided to write this book for two reasons. First, we wanted to present the
most comprehensive book on Apache Spark, covering all of the fundamental
use cases with easy-to-run examples. Second, we especially wanted to
explore the higher-level “structured” APIs that were finalized in Apache
Spark 2.0—namely DataFrames, Datasets, Spark SQL, and Structured
Streaming—which older books on Spark don’t always include. We hope this
book gives you a solid foundation to write modern Apache Spark applications
using all the available tools in the project.
In this preface, we’ll tell you a little bit about our background, and explain
who this book is for and how we have organized the material. We also want
to thank the numerous people who helped edit and review this book, without
whom it would not have been possible.
About the Authors
Both of the book’s authors have been involved in Apache Spark for a long
time, so we are very excited to be able to bring you this book.
Bill Chambers started using Spark in 2014 on several research projects.
Currently, Bill is a Product Manager at Databricks where he focuses on
enabling users to write various types of Apache Spark applications. Bill also
regularly blogs about Spark and presents at conferences and meetups on the
topic. Bill holds a Master’s in Information Management and Systems from
the UC Berkeley School of Information.
Matei Zaharia started the Spark project in 2009, during his time as a PhD
student at UC Berkeley. Matei worked with other Berkeley researchers and
external collaborators to design the core Spark APIs and grow the Spark
community, and has continued to be involved in new initiatives such as the
structured APIs and Structured Streaming. In 2013, Matei and other members
of the Berkeley Spark team co-founded Databricks to further grow the open
source project and provide commercial offerings around it. Today, Matei
continues to work as Chief Technologist at Databricks, and also holds a
position as an Assistant Professor of Computer Science at Stanford
University, where he does research on large-scale systems and AI. Matei
received his PhD in Computer Science from UC Berkeley in 2013.
Who This Book Is For
We designed this book mainly for data scientists and data engineers looking
to use Apache Spark. The two roles have slightly different needs, but in
reality, most application development covers a bit of both, so we think the
material will be useful in both cases. Specifically, in our minds, the data
scientist workload focuses more on interactively querying data to answer
questions and build statistical models, while the data engineer job focuses on
writing maintainable, repeatable production applications—either to use the
data scientist’s models in practice, or just to prepare data for further analysis
(e.g., building a data ingest pipeline). However, we often see with Spark that
these roles blur. For instance, data scientists are able to package production
applications without too much hassle and data engineers use interactive
analysis to understand and inspect their data to build and maintain pipelines.
While we tried to provide everything data scientists and engineers need to get
started, there are some things we didn’t have space to focus on in this book.
First, this book does not include in-depth introductions to some of the
analytics techniques you can use in Apache Spark, such as machine learning.
Instead, we show you how to invoke these techniques using libraries in
Spark, assuming you already have a basic background in machine learning.
Many full, standalone books exist to cover these techniques in formal detail,
so we recommend starting with those if you want to learn about these areas.
Second, this book focuses more on application development than on
operations and administration (e.g., how to manage an Apache Spark cluster
with dozens of users). Nonetheless, we have tried to include comprehensive
material on monitoring, debugging, and configuration in Parts V and VI of
the book to help engineers get their application running efficiently and tackle
day-to-day maintenance. Finally, this book places less emphasis on the older,
lower-level APIs in Spark—specifically RDDs and DStreams—to introduce
most of the concepts using the newer, higher-level structured APIs. Thus, the
book may not be the best fit if you need to maintain an old RDD or DStream
application, but should be a great introduction to writing new applications.
Conventions Used in This Book
The following typographical conventions are used in this book:
Italic
Indicates new terms, URLs, email addresses, filenames, and file
extensions.
Constant width
Used for program listings, as well as within paragraphs to refer to
program elements such as variable or function names, databases, data
types, environment variables, statements, and keywords.
Constant width bold
Shows commands or other text that should be typed literally by the user.
Constant width italic
Shows text that should be replaced with user-supplied values or by values
determined by context.
TIP
This element signifies a tip or suggestion.
NOTE
This element signifies a general note.
WARNING
This element indicates a warning or caution.
Using Code Examples
We’re very excited to have designed this book so that all of the code content
is runnable on real data. We wrote the whole book using Databricks
notebooks and have posted the data and related material on GitHub. This
means that you can run and edit all the code as you follow along, or copy it
into working code in your own applications.
We tried to use real data wherever possible to illustrate the challenges you’ll
run into while building large-scale data applications. Finally, we also include
several larger standalone applications in the book’s GitHub repository for
examples that it does not make sense to show inline in the text.
The GitHub repository will remain a living document as we update based on
Spark’s progress. Be sure to follow updates there.
This book is here to help you get your job done. In general, if example code
is offered with this book, you may use it in your programs and
documentation. You do not need to contact us for permission unless you’re
reproducing a significant portion of the code. For example, writing a program
that uses several chunks of code from this book does not require permission.
Selling or distributing a CD-ROM of examples from O’Reilly books does
require permission. Answering a question by citing this book and quoting
example code does not require permission. Incorporating a significant
amount of example code from this book into your product’s documentation
does require permission.
We appreciate, but do not require, attribution. An attribution usually includes
the title, author, publisher, and ISBN. For example: “Spark: The Definitive
Guide by Bill Chambers and Matei Zaharia (O’Reilly). Copyright 2018
Databricks, Inc., 978-1-491-91221-8.”
If you feel your use of code examples falls outside fair use or the permission
given above, feel free to contact us at permissions@oreilly.com.
O’Reilly Safari
Safari (formerly Safari Books Online) is a membership-based training and
reference platform for enterprise, government, educators, and individuals.
Members have access to thousands of books, training videos, Learning Paths,
interactive tutorials, and curated playlists from over 250 publishers, including
O’Reilly Media, Harvard Business Review, Prentice Hall Professional,
Addison-Wesley Professional, Microsoft Press, Sams, Que, Peachpit Press,
Adobe, Focal Press, Cisco Press, John Wiley & Sons, Syngress, Morgan
Kaufmann, IBM Redbooks, Packt, Adobe Press, FT Press, Apress, Manning,
New Riders, McGraw-Hill, Jones & Bartlett, and Course Technology, among
others.
For more information, please visit http://oreilly.com/safari.
How to Contact Us
Please address comments and questions concerning this book to the
publisher:
O’Reilly Media, Inc.
1005 Gravenstein Highway North
Sebastopol, CA 95472
800-998-9938 (in the United States or Canada)
707-829-0515 (international or local)
707-829-0104 (fax)
To comment or ask technical questions about this book, send email to
bookquestions@oreilly.com.
For more information about our books, courses, conferences, and news, see
our website at http://www.oreilly.com.
Find us on Facebook: http://facebook.com/oreilly
Follow us on Twitter: http://twitter.com/oreillymedia
Watch us on YouTube: http://www.youtube.com/oreillymedia
Acknowledgments
There were a huge number of people that made this book possible.
First, we would like to thank our employer, Databricks, for allocating time
for us to work on this book. Without the support of the company, this book
would not have been possible. In particular, we would like to thank Ali
Ghodsi, Ion Stoica, and Patrick Wendell for their support.
Additionally, there are numerous people that read drafts of the book and
individual chapters. Our reviewers were best-in-class, and provided
invaluable feedback.
These reviewers, in alphabetical order by last name, are:
Lynn Armstrong
Mikio Braun
Jules Damji
Denny Lee
Alex Thomas
In addition to the formal book reviewers, there were numerous other Spark
users, contributors, and committers who read over specific chapters or helped
formulate how topics should be discussed. In alphabetical order by last name,
the people who helped are:
Sameer Agarwal
Bagrat Amirbekian
Michael Armbrust
Joseph Bradley
Tathagata Das
Hossein Falaki
Wenchen Fan
Sue Ann Hong
Yin Huai
Tim Hunter
Xiao Li
Cheng Lian
Xiangrui Meng
Kris Mok
Josh Rosen
Srinath Shankar
Takuya Ueshin
Herman van Hövell
Reynold Xin
Philip Yang
Burak Yavuz
Shixiong Zhu
Lastly, we would like to thank friends, family, and loved ones. Without their
support, patience, and encouragement, we would not have been able to write
the definitive guide to Spark.
Part I. Gentle Overview of Big
Data and Spark
Chapter 1. What Is Apache
Spark?
Apache Spark is a unified computing engine and a set of libraries for parallel
data processing on computer clusters. As of this writing, Spark is the most
actively developed open source engine for this task, making it a standard tool
for any developer or data scientist interested in big data. Spark supports
multiple widely used programming languages (Python, Java, Scala, and R),
includes libraries for diverse tasks ranging from SQL to streaming and
machine learning, and runs anywhere from a laptop to a cluster of thousands
of servers. This makes it an easy system to start with and scale-up to big data
processing or incredibly large scale.
Figure 1-1 illustrates all the components and libraries Spark offers to end-
users.
Figure 1-1. Spark’s toolkit
You’ll notice the categories roughly correspond to the different parts of this
book. That should really come as no surprise; our goal here is to educate you
on all aspects of Spark, and Spark is composed of a number of different
components.
Given that you’re reading this book, you might already know a little bit about
Apache Spark and what it can do. Nonetheless, in this chapter, we want to
briefly cover the overriding philosophy behind Spark as well as the context it
was developed in (why is everyone suddenly excited about parallel data
processing?) and its history. We will also outline the first few steps to
running Spark.
Apache Spark’s Philosophy
Let’s break down our description of Apache Spark—a unified computing
engine and set of libraries for big data—into its key components:
Unified
Spark’s key driving goal is to offer a unified platform for writing big data
applications. What do we mean by unified? Spark is designed to support a
wide range of data analytics tasks, ranging from simple data loading and
SQL queries to machine learning and streaming computation, over the
same computing engine and with a consistent set of APIs. The main
insight behind this goal is that real-world data analytics tasks—whether
they are interactive analytics in a tool such as a Jupyter notebook, or
traditional software development for production applications—tend to
combine many different processing types and libraries.
Spark’s unified nature makes these tasks both easier and more efficient to
write. First, Spark provides consistent, composable APIs that you can use
to build an application out of smaller pieces or out of existing libraries. It
also makes it easy for you to write your own analytics libraries on top.
However, composable APIs are not enough: Spark’s APIs are also
designed to enable high performance by optimizing across the different
libraries and functions composed together in a user program. For
example, if you load data using a SQL query and then evaluate a machine
learning model over it using Spark’s ML library, the engine can combine
these steps into one scan over the data. The combination of general APIs
and high-performance execution, no matter how you combine them,
makes Spark a powerful platform for interactive and production
applications.
Spark’s focus on defining a unified platform is the same idea behind
unified platforms in other areas of software. For example, data scientists
benefit from a unified set of libraries (e.g., Python or R) when doing
modeling, and web developers benefit from unified frameworks such as
Node.js or Django. Before Spark, no open source systems tried to provide
this type of unified engine for parallel data processing, meaning that users
had to stitch together an application out of multiple APIs and systems.
Thus, Spark quickly became the standard for this type of development.
Over time, Spark has continued to expand its built-in APIs to cover more
workloads. At the same time, the project’s developers have continued to
refine its theme of a unified engine. In particular, one major focus of this
book will be the “structured APIs” (DataFrames, Datasets, and SQL) that
were finalized in Spark 2.0 to enable more powerful optimization under
user applications.
Computing engine
At the same time that Spark strives for unification, it carefully limits its
scope to a computing engine. By this, we mean that Spark handles
loading data from storage systems and performing computation on it, not
permanent storage as the end itself. You can use Spark with a wide
variety of persistent storage systems, including cloud storage systems
such as Azure Storage and Amazon S3, distributed file systems such as
Apache Hadoop, key-value stores such as Apache Cassandra, and
message buses such as Apache Kafka. However, Spark neither stores data
long term itself, nor favors one over another. The key motivation here is
that most data already resides in a mix of storage systems. Data is
expensive to move so Spark focuses on performing computations over the
data, no matter where it resides. In user-facing APIs, Spark works hard to
make these storage systems look largely similar so that applications do
not need to worry about where their data is.
Spark’s focus on computation makes it different from earlier big data
software platforms such as Apache Hadoop. Hadoop included both a
storage system (the Hadoop file system, designed for low-cost storage
over clusters of commodity servers) and a computing system
(MapReduce), which were closely integrated together. However, this
choice makes it difficult to run one of the systems without the other.
More important, this choice also makes it a challenge to write
applications that access data stored anywhere else. Although Spark runs
well on Hadoop storage, today it is also used broadly in environments for
which the Hadoop architecture does not make sense, such as the public
cloud (where storage can be purchased separately from computing) or
streaming applications.
Libraries
Spark’s final component is its libraries, which build on its design as a
unified engine to provide a unified API for common data analysis tasks.
Spark supports both standard libraries that ship with the engine as well as
a wide array of external libraries published as third-party packages by the
open source communities. Today, Spark’s standard libraries are actually
the bulk of the open source project: the Spark core engine itself has
changed little since it was first released, but the libraries have grown to
provide more and more types of functionality. Spark includes libraries for
SQL and structured data (Spark SQL), machine learning (MLlib), stream
processing (Spark Streaming and the newer Structured Streaming), and
graph analytics (GraphX). Beyond these libraries, there are hundreds of
open source external libraries ranging from connectors for various storage
systems to machine learning algorithms. One index of external libraries is
available at spark-packages.org.
Context: The Big Data Problem
Why do we need a new engine and programming model for data analytics in
the first place? As with many trends in computing, this is due to changes in
the economic factors that underlie computer applications and hardware.
For most of their history, computers became faster every year through
processor speed increases: the new processors each year could run more
instructions per second than the previous year’s. As a result, applications also
automatically became faster every year, without any changes needed to their
code. This trend led to a large and established ecosystem of applications
building up over time, most of which were designed to run only on a single
processor. These applications rode the trend of improved processor speeds to
scale up to larger computations and larger volumes of data over time.
Unfortunately, this trend in hardware stopped around 2005: due to hard limits
in heat dissipation, hardware developers stopped making individual
processors faster, and switched toward adding more parallel CPU cores all
running at the same speed. This change meant that suddenly applications
needed to be modified to add parallelism in order to run faster, which set the
stage for new programming models such as Apache Spark.
On top of that, the technologies for storing and collecting data did not slow
down appreciably in 2005, when processor speeds did. The cost to store 1 TB
of data continues to drop by roughly two times every 14 months, meaning
that it is very inexpensive for organizations of all sizes to store large amounts
of data. Moreover, many of the technologies for collecting data (sensors,
cameras, public datasets, etc.) continue to drop in cost and improve in
resolution. For example, camera technology continues to improve in
resolution and drop in cost per pixel every year, to the point where a 12-
megapixel webcam costs only $3 to $4; this has made it inexpensive to
collect a wide range of visual data, whether from people filming video or
automated sensors in an industrial setting. Moreover, cameras are themselves
the key sensors in other data collection devices, such as telescopes and even
gene-sequencing machines, driving the cost of these technologies down as
well.
The end result is a world in which collecting data is extremely inexpensive—
many organizations today even consider it negligent not to log data of
possible relevance to the business—but processing it requires large, parallel
computations, often on clusters of machines. Moreover, in this new world,
the software developed in the past 50 years cannot automatically scale up,
and neither can the traditional programming models for data processing
applications, creating the need for new programming models. It is this world
that Apache Spark was built for.
History of Spark
Apache Spark began at UC Berkeley in 2009 as the Spark research project,
which was first published the following year in a paper entitled “Spark:
Cluster Computing with Working Sets” by Matei Zaharia, Mosharaf
Chowdhury, Michael Franklin, Scott Shenker, and Ion Stoica of the UC
Berkeley AMPlab. At the time, Hadoop MapReduce was the dominant
parallel programming engine for clusters, being the first open source system
to tackle data-parallel processing on clusters of thousands of nodes. The
AMPlab had worked with multiple early MapReduce users to understand the
benefits and drawbacks of this new programming model, and was therefore
able to synthesize a list of problems across several use cases and begin
designing more general computing platforms. In addition, Zaharia had also
worked with Hadoop users at UC Berkeley to understand their needs for the
platform—specifically, teams that were doing large-scale machine learning
using iterative algorithms that need to make multiple passes over the data.
Across these conversations, two things were clear. First, cluster computing
held tremendous potential: at every organization that used MapReduce, brand
new applications could be built using the existing data, and many new groups
began using the system after its initial use cases. Second, however, the
MapReduce engine made it both challenging and inefficient to build large
applications. For example, the typical machine learning algorithm might need
to make 10 or 20 passes over the data, and in MapReduce, each pass had to
be written as a separate MapReduce job, which had to be launched separately
on the cluster and load the data from scratch.
To address this problem, the Spark team first designed an API based on
functional programming that could succinctly express multistep applications.
The team then implemented this API over a new engine that could perform
efficient, in-memory data sharing across computation steps. The team also
began testing this system with both Berkeley and external users.
The first version of Spark supported only batch applications, but soon enough
another compelling use case became clear: interactive data science and ad
hoc queries. By simply plugging the Scala interpreter into Spark, the project
could provide a highly usable interactive system for running queries on
hundreds of machines. The AMPlab also quickly built on this idea to develop
Shark, an engine that could run SQL queries over Spark and enable
interactive use by analysts as well as data scientists. Shark was first released
in 2011.
After these initial releases, it quickly became clear that the most powerful
additions to Spark would be new libraries, and so the project began to follow
the “standard library” approach it has today. In particular, different AMPlab
groups started MLlib, Spark Streaming, and GraphX. They also ensured that
these APIs would be highly interoperable, enabling writing end-to-end big
data applications in the same engine for the first time.
In 2013, the project had grown to widespread use, with more than 100
contributors from more than 30 organizations outside UC Berkeley. The
AMPlab contributed Spark to the Apache Software Foundation as a long-
term, vendor-independent home for the project. The early AMPlab team also
launched a company, Databricks, to harden the project, joining the
community of other companies and organizations contributing to Spark.
Since that time, the Apache Spark community released Spark 1.0 in 2014 and
Spark 2.0 in 2016, and continues to make regular releases, bringing new
features into the project.
Finally, Spark’s core idea of composable APIs has also been refined over
time. Early versions of Spark (before 1.0) largely defined this API in terms of
functional operations—parallel operations such as maps and reduces over
collections of Java objects. Beginning with 1.0, the project added Spark SQL,
a new API for working with structured data—tables with a fixed data format
that is not tied to Java’s in-memory representation. Spark SQL enabled
powerful new optimizations across libraries and APIs by understanding both
the data format and the user code that runs on it in more detail. Over time, the
project added a plethora of new APIs that build on this more powerful
structured foundation, including DataFrames, machine learning pipelines, and
Structured Streaming, a high-level, automatically optimized streaming API.
In this book, we will spend a signficant amount of time explaining these next-
generation APIs, most of which are marked as production-ready.
The Present and Future of Spark
Spark has been around for a number of years but continues to gain in
popularity and use cases. Many new projects within the Spark ecosystem
continue to push the boundaries of what’s possible with the system. For
example, a new high-level streaming engine, Structured Streaming, was
introduced in 2016. This technology is a huge part of companies solving
massive-scale data challenges, from technology companies like Uber and
Netflix using Spark’s streaming and machine learning tools, to institutions
like NASA, CERN, and the Broad Institute of MIT and Harvard applying
Spark to scientific data analysis.
Spark will continue to be a cornerstone of companies doing big data analysis
for the foreseeable future, especially given that the project is still developing
quickly. Any data scientist or engineer who needs to solve big data problems
probably needs a copy of Spark on their machine—and hopefully, a copy of
this book on their bookshelf!
Running Spark
This book contains an abundance of Spark-related code, and it’s essential that
you’re prepared to run it as you learn. For the most part, you’ll want to run
the code interactively so that you can experiment with it. Let’s go over some
of your options before we begin working with the coding parts of the book.
You can use Spark from Python, Java, Scala, R, or SQL. Spark itself is
written in Scala, and runs on the Java Virtual Machine (JVM), so therefore to
run Spark either on your laptop or a cluster, all you need is an installation of
Java. If you want to use the Python API, you will also need a Python
interpreter (version 2.7 or later). If you want to use R, you will need a version
of R on your machine.
There are two options we recommend for getting started with Spark:
downloading and installing Apache Spark on your laptop, or running a web-
based version in Databricks Community Edition, a free cloud environment
for learning Spark that includes the code in this book. We explain both of
those options next.
Downloading Spark Locally
If you want to download and run Spark locally, the first step is to make sure
that you have Java installed on your machine (available as java), as well as a
Python version if you would like to use Python. Next, visit the project’s
official download page, select the package type of “Pre-built for Hadoop 2.7
and later,” and click “Direct Download.” This downloads a compressed TAR
file, or tarball, that you will then need to extract. The majority of this book
was written using Spark 2.2, so downloading version 2.2 or later should be a
good starting point.
Downloading Spark for a Hadoop cluster
Spark can run locally without any distributed storage system, such as Apache
Hadoop. However, if you would like to connect the Spark version on your
laptop to a Hadoop cluster, make sure you download the right Spark version
for that Hadoop version, which can be chosen at
http://spark.apache.org/downloads.html by selecting a different package
type. We discuss how Spark runs on clusters and the Hadoop file system in
later chapters, but at this point we recommend just running Spark on your
laptop to start out.
NOTE
In Spark 2.2, the developers also added the ability to install Spark for Python via pip
install pyspark. This functionality came out as this book was being written, so we
weren’t able to include all of the relevant instructions.
Building Spark from source
We won’t cover this in the book, but you can also build and configure Spark
from source. You can select a source package on the Apache download page
to get just the source and follow the instructions in the README file for
building.
After you’ve downloaded Spark, you’ll want to open a command-line prompt
and extract the package. In our case, we’re installing Spark 2.2. The
following is a code snippet that you can run on any Unix-style command line
to unzip the file you downloaded from Spark and move into the directory:
cd ~/Downloads
tar -xf spark-2.2.0-bin-hadoop2.7.tgz
cd spark-2.2.0-bin-hadoop2.7.tgz
Note that Spark has a large number of directories and files within the project.
Don’t be intimidated! Most of these directories are relevant only if you’re
reading source code. The next section will cover the most important
directories—the ones that let us launch Spark’s different consoles for
interactive use.
Launching Spark’s Interactive Consoles
You can start an interactive shell in Spark for several different programming
languages. The majority of this book is written with Python, Scala, and SQL
in mind; thus, those are our recommended starting points.
Launching the Python console
You’ll need Python 2 or 3 installed in order to launch the Python console.
From Spark’s home directory, run the following code:
./bin/pyspark
After you’ve done that, type “spark” and press Enter. You’ll see the
SparkSession object printed, which we cover in Chapter 2.
Launching the Scala console
To launch the Scala console, you will need to run the following command:
./bin/spark-shell
After you’ve done that, type “spark” and press Enter. As in Python, you’ll see
the SparkSession object, which we cover in Chapter 2.
Launching the SQL console
Parts of this book will cover a large amount of Spark SQL. For those, you
might want to start the SQL console. We’ll revisit some of the more relevant
details after we actually cover these topics in the book.
./bin/spark-sql
Running Spark in the Cloud
If you would like to have a simple, interactive notebook experience for
learning Spark, you might prefer using Databricks Community Edition.
Databricks, as we mentioned earlier, is a company founded by the Berkeley
team that started Spark, and offers a free community edition of its cloud
service as a learning environment. The Databricks Community Edition
includes a copy of all the data and code examples for this book, making it
easy to quickly run any of them. To use the Databricks Community Edition,
follow the instructions at https://github.com/databricks/Spark-The-Definitive-
Guide. You will be able to use Scala, Python, SQL, or R from a web
browser–based interface to run and visualize results.
Data Used in This Book
We’ll use a number of data sources in this book for our examples. If you
want to run the code locally, you can download them from the official code
repository in this book as desribed at https://github.com/databricks/Spark-
The-Definitive-Guide. In short, you will download the data, put it in a folder,
and then run the code snippets in this book!
Chapter 2. A Gentle Introduction
to Spark
Now that our history lesson on Apache Spark is completed, it’s time to begin
using and applying it! This chapter presents a gentle introduction to Spark, in
which we will walk through the core architecture of a cluster, Spark
Application, and Spark’s structured APIs using DataFrames and SQL. Along
the way we will touch on Spark’s core terminology and concepts so that you
can begin using Spark right away. Let’s get started with some basic
background information.
Spark’s Basic Architecture
Typically, when you think of a “computer,” you think about one machine
sitting on your desk at home or at work. This machine works perfectly well
for watching movies or working with spreadsheet software. However, as
many users likely experience at some point, there are some things that your
computer is not powerful enough to perform. One particularly challenging
area is data processing. Single machines do not have enough power and
resources to perform computations on huge amounts of information (or the
user probably does not have the time to wait for the computation to finish). A
cluster, or group, of computers, pools the resources of many machines
together, giving us the ability to use all the cumulative resources as if they
were a single computer. Now, a group of machines alone is not powerful, you
need a framework to coordinate work across them. Spark does just that,
managing and coordinating the execution of tasks on data across a cluster of
computers.
The cluster of machines that Spark will use to execute tasks is managed by a
cluster manager like Spark’s standalone cluster manager, YARN, or Mesos.
We then submit Spark Applications to these cluster managers, which will
grant resources to our application so that we can complete our work.
Spark Applications
Spark Applications consist of a driver process and a set of executor
processes. The driver process runs your main() function, sits on a node in the
cluster, and is responsible for three things: maintaining information about the
Spark Application; responding to a user’s program or input; and analyzing,
distributing, and scheduling work across the executors (discussed
momentarily). The driver process is absolutely essential—it’s the heart of a
Spark Application and maintains all relevant information during the lifetime
of the application.
The executors are responsible for actually carrying out the work that the
driver assigns them. This means that each executor is responsible for only
two things: executing code assigned to it by the driver, and reporting the state
of the computation on that executor back to the driver node.
Figure 2-1 demonstrates how the cluster manager controls physical machines
and allocates resources to Spark Applications. This can be one of three core
cluster managers: Spark’s standalone cluster manager, YARN, or Mesos.
This means that there can be multiple Spark Applications running on a cluster
at the same time. We will discuss cluster managers more in Part IV.
Figure 2-1. The architecture of a Spark Application
In Figure 2-1, we can see the driver on the left and four executors on the
right. In this diagram, we removed the concept of cluster nodes. The user can
specify how many executors should fall on each node through configurations.
NOTE
Spark, in addition to its cluster mode, also has a local mode. The driver and executors are
simply processes, which means that they can live on the same machine or different
machines. In local mode, the driver and executurs run (as threads) on your individual
computer instead of a cluster. We wrote this book with local mode in mind, so you should
be able to run everything on a single machine.
Here are the key points to understand about Spark Applications at this point:
Spark employs a cluster manager that keeps track of the resources
available.
The driver process is responsible for executing the driver program’s
commands across the executors to complete a given task.
The executors, for the most part, will always be running Spark code.
However, the driver can be “driven” from a number of different languages
through Spark’s language APIs. Let’s take a look at those in the next section.
Spark’s Language APIs
Spark’s language APIs make it possible for you to run Spark code using
various programming languages. For the most part, Spark presents some core
“concepts” in every language; these concepts are then translated into Spark
code that runs on the cluster of machines. If you use just the Structured APIs,
you can expect all languages to have similar performance characteristics.
Here’s a brief rundown:
Scala
Spark is primarily written in Scala, making it Spark’s “default” language.
This book will include Scala code examples wherever relevant.
Java
Even though Spark is written in Scala, Spark’s authors have been careful
to ensure that you can write Spark code in Java. This book will focus
primarily on Scala but will provide Java examples where relevant.
Python
Python supports nearly all constructs that Scala supports. This book will
include Python code examples whenever we include Scala code examples
and a Python API exists.
SQL
Spark supports a subset of the ANSI SQL 2003 standard. This makes it
easy for analysts and non-programmers to take advantage of the big data
powers of Spark. This book includes SQL code examples wherever
relevant.
R
Spark has two commonly used R libraries: one as a part of Spark core
(SparkR) and another as an R community-driven package (sparklyr). We
cover both of these integrations in Chapter 32.
Figure 2-2 presents a simple illustration of this relationship.
Figure 2-2. The relationship between the SparkSession and Spark’s Language API
Each language API maintains the same core concepts that we described
earlier. There is a SparkSession object available to the user, which is the
entrance point to running Spark code. When using Spark from Python or R,
you don’t write explicit JVM instructions; instead, you write Python and R
code that Spark translates into code that it then can run on the executor
JVMs.
Spark’s APIs
Although you can drive Spark from a variety of languages, what it makes
available in those languages is worth mentioning. Spark has two fundamental
sets of APIs: the low-level “unstructured” APIs, and the higher-level
structured APIs. We discuss both in this book, but these introductory chapters
will focus primarily on the higher-level structured APIs.
Starting Spark
Thus far, we covered the basic concepts of Spark Applications. This has all
been conceptual in nature. When we actually go about writing our Spark
Application, we are going to need a way to send user commands and data to
it. We do that by first creating a SparkSession.
NOTE
To do this, we will start Spark’s local mode, just like we did in Chapter 1. This means
running ./bin/spark-shell to access the Scala console to start an interactive session.
You can also start the Python console by using ./bin/pyspark. This starts an interactive
Spark Application. There is also a process for submitting standalone applications to Spark
called spark-submit, whereby you can submit a precompiled application to Spark. We’ll
show you how to do that in Chapter 3.
When you start Spark in this interactive mode, you implicitly create a
SparkSession that manages the Spark Application. When you start it through
a standalone application, you must create the SparkSession object yourself in
your application code.
The SparkSession
As discussed in the beginning of this chapter, you control your Spark
Application through a driver process called the SparkSession. The
SparkSession instance is the way Spark executes user-defined manipulations
across the cluster. There is a one-to-one correspondence between a
SparkSession and a Spark Application. In Scala and Python, the variable is
available as spark when you start the console. Let’s go ahead and look at the
SparkSession in both Scala and/or Python:
spark
In Scala, you should see something like the following:
res0: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@...
In Python you’ll see something like this:
<pyspark.sql.session.SparkSession at 0x7efda4c1ccd0>
Let’s now perform the simple task of creating a range of numbers. This range
of numbers is just like a named column in a spreadsheet:
// in Scala
val myRange = spark.range(1000).toDF("number")
# in Python
myRange = spark.range(1000).toDF("number")
You just ran your first Spark code! We created a DataFrame with one
column containing 1,000 rows with values from 0 to 999. This range of
numbers represents a distributed collection. When run on a cluster, each part
of this range of numbers exists on a different executor. This is a Spark
DataFrame.
DataFrames
A DataFrame is the most common Structured API and simply represents a
table of data with rows and columns. The list that defines the columns and the
types within those columns is called the schema. You can think of a
DataFrame as a spreadsheet with named columns. Figure 2-3 illustrates the
fundamental difference: a spreadsheet sits on one computer in one specific
location, whereas a Spark DataFrame can span thousands of computers. The
reason for putting the data on more than one computer should be intuitive:
either the data is too large to fit on one machine or it would simply take too
long to perform that computation on one machine.
Figure 2-3. Distributed versus single-machine analysis
The DataFrame concept is not unique to Spark. R and Python both have
similar concepts. However, Python/R DataFrames (with some exceptions)
exist on one machine rather than multiple machines. This limits what you can
do with a given DataFrame to the resources that exist on that specific
machine. However, because Spark has language interfaces for both Python
and R, it’s quite easy to convert Pandas (Python) DataFrames to Spark
DataFrames, and R DataFrames to Spark DataFrames.
NOTE
Spark has several core abstractions: Datasets, DataFrames, SQL Tables, and Resilient
Distributed Datasets (RDDs). These different abstractions all represent distributed
collections of data. The easiest and most efficient are DataFrames, which are available in
all languages. We cover Datasets at the end of Part II, and RDDs in Part III.
Partitions
To allow every executor to perform work in parallel, Spark breaks up the data
into chunks called partitions. A partition is a collection of rows that sit on
one physical machine in your cluster. A DataFrame’s partitions represent
how the data is physically distributed across the cluster of machines during
execution. If you have one partition, Spark will have a parallelism of only
one, even if you have thousands of executors. If you have many partitions but
only one executor, Spark will still have a parallelism of only one because
there is only one computation resource.
An important thing to note is that with DataFrames you do not (for the most
part) manipulate partitions manually or individually. You simply specify
high-level transformations of data in the physical partitions, and Spark
determines how this work will actually execute on the cluster. Lower-level
APIs do exist (via the RDD interface), and we cover those in Part III.
Transformations
In Spark, the core data structures are immutable, meaning they cannot be
changed after they’re created. This might seem like a strange concept at first:
if you cannot change it, how are you supposed to use it? To “change” a
DataFrame, you need to instruct Spark how you would like to modify it to do
what you want. These instructions are called transformations. Let’s perform a
simple transformation to find all even numbers in our current DataFrame:
// in Scala
val divisBy2 = myRange.where("number % 2 = 0")
# in Python
divisBy2 = myRange.where("number % 2 = 0")
Notice that these return no output. This is because we specified only an
abstract transformation, and Spark will not act on transformations until we
call an action (we discuss this shortly). Transformations are the core of how
you express your business logic using Spark. There are two types of
transformations: those that specify narrow dependencies, and those that
specify wide dependencies.
Transformations consisting of narrow dependencies (we’ll call them narrow
transformations) are those for which each input partition will contribute to
only one output partition. In the preceding code snippet, the where statement
specifies a narrow dependency, where only one partition contributes to at
most one output partition, as you can see in Figure 2-4.
Figure 2-4. A narrow dependency
A wide dependency (or wide transformation) style transformation will have
input partitions contributing to many output partitions. You will often hear
this referred to as a shuffle whereby Spark will exchange partitions across the
cluster. With narrow transformations, Spark will automatically perform an
operation called pipelining, meaning that if we specify multiple filters on
DataFrames, they’ll all be performed in-memory. The same cannot be said
for shuffles. When we perform a shuffle, Spark writes the results to disk.
Wide transformations are illustrated in Figure 2-5.
Figure 2-5. A wide dependency
You’ll see a lot of discussion about shuffle optimization across the web
because it’s an important topic, but for now, all you need to understand is that
there are two kinds of transformations. You now can see how transformations
are simply ways of specifying different series of data manipulation. This
leads us to a topic called lazy evaluation.
Lazy Evaluation
Lazy evaulation means that Spark will wait until the very last moment to
execute the graph of computation instructions. In Spark, instead of modifying
the data immediately when you express some operation, you build up a plan
of transformations that you would like to apply to your source data. By
waiting until the last minute to execute the code, Spark compiles this plan
from your raw DataFrame transformations to a streamlined physical plan that
will run as efficiently as possible across the cluster. This provides immense
benefits because Spark can optimize the entire data flow from end to end. An
example of this is something called predicate pushdown on DataFrames. If
we build a large Spark job but specify a filter at the end that only requires us
to fetch one row from our source data, the most efficient way to execute this
is to access the single record that we need. Spark will actually optimize this
for us by pushing the filter down automatically.
Actions
Transformations allow us to build up our logical transformation plan. To
trigger the computation, we run an action. An action instructs Spark to
compute a result from a series of transformations. The simplest action is
count, which gives us the total number of records in the DataFrame:
divisBy2.count()
The output of the preceding code should be 500. Of course, count is not the
only action. There are three kinds of actions:
Actions to view data in the console
Actions to collect data to native objects in the respective language
Actions to write to output data sources
In specifying this action, we started a Spark job that runs our filter
transformation (a narrow transformation), then an aggregation (a wide
transformation) that performs the counts on a per partition basis, and then a
collect, which brings our result to a native object in the respective language.
You can see all of this by inspecting the Spark UI, a tool included in Spark
with which you can monitor the Spark jobs running on a cluster.
Spark UI
You can monitor the progress of a job through the Spark web UI. The Spark
UI is available on port 4040 of the driver node. If you are running in local
mode, this will be http://localhost:4040. The Spark UI displays information
on the state of your Spark jobs, its environment, and cluster state. It’s very
useful, especially for tuning and debugging. Figure 2-6 shows an example UI
for a Spark job where two stages containing nine tasks were executed.
Figure 2-6. The Spark UI
This chapter will not go into detail about Spark job execution and the Spark
UI. We will cover that in Chapter 18. At this point, all you need to
understand is that a Spark job represents a set of transformations triggered by
an individual action, and you can monitor that job from the Spark UI.
An End-to-End Example
In the previous example, we created a DataFrame of a range of numbers; not
exactly groundbreaking big data. In this section, we will reinforce everything
we learned previously in this chapter with a more realistic example, and
explain step by step what is happening under the hood. We’ll use Spark to
analyze some flight data from the United States Bureau of Transportation
statistics.
Inside of the CSV folder, you’ll see that we have a number of files. There’s
also a number of other folders with different file formats, which we discuss in
Chapter 9. For now, let’s focus on the CSV files.
Each file has a number of rows within it. These files are CSV files, meaning
that they’re a semi-structured data format, with each row in the file
representing a row in our future DataFrame:
$ head /data/flight-data/csv/2015-summary.csv
DEST_COUNTRY_NAME,ORIGIN_COUNTRY_NAME,count
United States,Romania,15
United States,Croatia,1
United States,Ireland,344
Spark includes the ability to read and write from a large number of data
sources. To read this data, we will use a DataFrameReader that is associated
with our SparkSession. In doing so, we will specify the file format as well as
any options we want to specify. In our case, we want to do something called
schema inference, which means that we want Spark to take a best guess at
what the schema of our DataFrame should be. We also want to specify that
the first row is the header in the file, so we’ll specify that as an option, too.
To get the schema information, Spark reads in a little bit of the data and then
attempts to parse the types in those rows according to the types available in
Spark. You also have the option of strictly specifying a schema when you
read in data (which we recommend in production scenarios):
// in Scala
val flightData2015 = spark
.read
.option("inferSchema", "true")
.option("header", "true")
.csv("/data/flight-data/csv/2015-summary.csv")
# in Python
flightData2015 = spark\
.read\
.option("inferSchema", "true")\
.option("header", "true")\
.csv("/data/flight-data/csv/2015-summary.csv")
Each of these DataFrames (in Scala and Python) have a set of columns with
an unspecified number of rows. The reason the number of rows is unspecified
is because reading data is a transformation, and is therefore a lazy operation.
Spark peeked at only a couple of rows of data to try to guess what types each
column should be. Figure 2-7 provides an illustration of the CSV file being
read into a DataFrame and then being converted into a local array or list of
rows.
Figure 2-7. Reading a CSV file into a DataFrame and converting it to a local array or list of rows
If we perform the take action on the DataFrame, we will be able to see the
same results that we saw before when we used the command line:
flightData2015.take(3)
Array([United States,Romania,15], [United States,Croatia...
Let’s specify some more transformations! Now, let’s sort our data according
to the count column, which is an integer type. Figure 2-8 illustrates this
process.
NOTE
Remember, sort does not modify the DataFrame. We use sort as a transformation that
returns a new DataFrame by transforming the previous DataFrame. Let’s illustrate what’s
happening when we call take on that resulting DataFrame (Figure 2-8).
Figure 2-8. Reading, sorting, and collecting a DataFrame
Nothing happens to the data when we call sort because it’s just a
transformation. However, we can see that Spark is building up a plan for how
it will execute this across the cluster by looking at the explain plan. We can
call explain on any DataFrame object to see the DataFrame’s lineage (or
how Spark will execute this query):
flightData2015.sort("count").explain()
== Physical Plan ==
*Sort [count#195 ASC NULLS FIRST], true, 0
+- Exchange rangepartitioning(count#195 ASC NULLS FIRST, 200)
+- *FileScan csv [DEST_COUNTRY_NAME#193,ORIGIN_COUNTRY_NAME#194,count#195]
...
Congratulations, you’ve just read your first explain plan! Explain plans are a
bit arcane, but with a bit of practice it becomes second nature. You can read
explain plans from top to bottom, the top being the end result, and the bottom
being the source(s) of data. In this case, take a look at the first keywords. You
will see sort, exchange, and FileScan. That’s because the sort of our data is
actually a wide transformation because rows will need to be compared with
one another. Don’t worry too much about understanding everything about
explain plans at this point, they can just be helpful tools for debugging and
improving your knowledge as you progress with Spark.
Now, just like we did before, we can specify an action to kick off this plan.
However, before doing that, we’re going to set a configuration. By default,
when we perform a shuffle, Spark outputs 200 shuffle partitions. Let’s set
this value to 5 to reduce the number of the output partitions from the shuffle:
spark.conf.set("spark.sql.shuffle.partitions", "5")
flightData2015.sort("count").take(2)
... Array([United States,Singapore,1], [Moldova,United States,1])
Figure 2-9 illustrates this operation. Notice that in addition to the logical
transformations, we include the physical partition count, as well.
Figure 2-9. The process of logical and physical DataFrame manipulation
The logical plan of transformations that we build up defines a lineage for the
DataFrame so that at any given point in time, Spark knows how to recompute
any partition by performing all of the operations it had before on the same
input data. This sits at the heart of Spark’s programming model—functional
programming where the same inputs always result in the same outputs when
the transformations on that data stay constant.
We do not manipulate the physical data; instead, we configure physical
execution characteristics through things like the shuffle partitions parameter
that we set a few moments ago. We ended up with five output partitions
because that’s the value we specified in the shuffle partition. You can change
this to help control the physical execution characteristics of your Spark jobs.
Go ahead and experiment with different values and see the number of
partitions yourself. In experimenting with different values, you should see
drastically different runtimes. Remember that you can monitor the job
progress by navigating to the Spark UI on port 4040 to see the physical and
logical execution characteristics of your jobs.
DataFrames and SQL
We worked through a simple transformation in the previous example, let’s
now work through a more complex one and follow along in both DataFrames
and SQL. Spark can run the same transformations, regardless of the language,
in the exact same way. You can express your business logic in SQL or
DataFrames (either in R, Python, Scala, or Java) and Spark will compile that
logic down to an underlying plan (that you can see in the explain plan) before
actually executing your code. With Spark SQL, you can register any
DataFrame as a table or view (a temporary table) and query it using pure
SQL. There is no performance difference between writing SQL queries or
writing DataFrame code, they both “compile” to the same underlying plan
that we specify in DataFrame code.
You can make any DataFrame into a table or view with one simple method
call:
flightData2015.createOrReplaceTempView("flight_data_2015")
Now we can query our data in SQL. To do so, we’ll use the spark.sql
function (remember, spark is our SparkSession variable) that conveniently
returns a new DataFrame. Although this might seem a bit circular in logic—
that a SQL query against a DataFrame returns another DataFrame—it’s
actually quite powerful. This makes it possible for you to specify
transformations in the manner most convenient to you at any given point in
time and not sacrifice any efficiency to do so! To understand that this is
happening, let’s take a look at two explain plans:
// in Scala
val sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
val dataFrameWay = flightData2015
.groupBy('DEST_COUNTRY_NAME)
.count()
sqlWay.explain
dataFrameWay.explain
# in Python
sqlWay = spark.sql("""
SELECT DEST_COUNTRY_NAME, count(1)
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
""")
dataFrameWay = flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.count()
sqlWay.explain()
dataFrameWay.explain()
== Physical Plan ==
*HashAggregate(keys=[DEST_COUNTRY_NAME#182], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#182, 5)
+- *HashAggregate(keys=[DEST_COUNTRY_NAME#182], functions=[partial_count(1)])
+- *FileScan csv [DEST_COUNTRY_NAME#182] ...
== Physical Plan ==
*HashAggregate(keys=[DEST_COUNTRY_NAME#182], functions=[count(1)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#182, 5)
+- *HashAggregate(keys=[DEST_COUNTRY_NAME#182], functions=[partial_count(1)])
+- *FileScan csv [DEST_COUNTRY_NAME#182] ...
Notice that these plans compile to the exact same underlying plan!
Let’s pull out some interesting statistics from our data. One thing to
understand is that DataFrames (and SQL) in Spark already have a huge
number of manipulations available. There are hundreds of functions that you
can use and import to help you resolve your big data problems faster. We will
use the max function, to establish the maximum number of flights to and from
any given location. This just scans each value in the relevant column in the
DataFrame and checks whether it’s greater than the previous values that have
been seen. This is a transformation, because we are effectively filtering down
to one row. Let’s see what that looks like:
spark.sql("SELECT max(count) from flight_data_2015").take(1)
// in Scala
import org.apache.spark.sql.functions.max
flightData2015.select(max("count")).take(1)
# in Python
from pyspark.sql.functions import max
flightData2015.select(max("count")).take(1)
Great, that’s a simple example that gives a result of 370,002. Let’s perform
something a bit more complicated and find the top five destination countries
in the data. This is our first multi-transformation query, so we’ll take it step
by step. Let’s begin with a fairly straightforward SQL aggregation:
// in Scala
val maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()
# in Python
maxSql = spark.sql("""
SELECT DEST_COUNTRY_NAME, sum(count) as destination_total
FROM flight_data_2015
GROUP BY DEST_COUNTRY_NAME
ORDER BY sum(count) DESC
LIMIT 5
""")
maxSql.show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
| United States| 411352|
| Canada| 8399|
| Mexico| 7140|
| United Kingdom| 2025|
| Japan| 1548|
+-----------------+-----------------+
Now, let’s move to the DataFrame syntax that is semantically similar but
slightly different in implementation and ordering. But, as we mentioned, the
underlying plans for both of them are the same. Let’s run the queries and see
their results as a sanity check:
// in Scala
import org.apache.spark.sql.functions.desc
flightData2015
.groupBy("DEST_COUNTRY_NAME")
.sum("count")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5)
.show()
# in Python
from pyspark.sql.functions import desc
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.show()
+-----------------+-----------------+
|DEST_COUNTRY_NAME|destination_total|
+-----------------+-----------------+
| United States| 411352|
| Canada| 8399|
| Mexico| 7140|
| United Kingdom| 2025|
| Japan| 1548|
+-----------------+-----------------+
Now there are seven steps that take us all the way back to the source data.
You can see this in the explain plan on those DataFrames. Figure 2-10 shows
the set of steps that we perform in “code.” The true execution plan (the one
visible in explain) will differ from that shown in Figure 2-10 because of
optimizations in the physical execution; however, the llustration is as good of
a starting point as any. This execution plan is a directed acyclic graph (DAG)
of transformations, each resulting in a new immutable DataFrame, on which
we call an action to generate a result.
Figure 2-10. The entire DataFrame transformation flow
The first step is to read in the data. We defined the DataFrame previously but,
as a reminder, Spark does not actually read it in until an action is called on
that DataFrame or one derived from the original DataFrame.
The second step is our grouping; technically when we call groupBy, we end
up with a RelationalGroupedDataset, which is a fancy name for a
DataFrame that has a grouping specified but needs the user to specify an
aggregation before it can be queried further. We basically specified that we’re
going to be grouping by a key (or set of keys) and that now we’re going to
perform an aggregation over each one of those keys.
Therefore, the third step is to specify the aggregation. Let’s use the sum
aggregation method. This takes as input a column expression or, simply, a
column name. The result of the sum method call is a new DataFrame. You’ll
see that it has a new schema but that it does know the type of each column.
It’s important to reinforce (again!) that no computation has been performed.
This is simply another transformation that we’ve expressed, and Spark is
simply able to trace our type information through it.
The fourth step is a simple renaming. We use the withColumnRenamed
method that takes two arguments, the original column name and the new
column name. Of course, this doesn’t perform computation: this is just
another transformation!
The fifth step sorts the data such that if we were to take results off of the top
of the DataFrame, they would have the largest values in the
destination_total column.
You likely noticed that we had to import a function to do this, the desc
function. You might also have noticed that desc does not return a string but a
Column. In general, many DataFrame methods will accept strings (as column
names) or Column types or expressions. Columns and expressions are actually
the exact same thing.
Penultimately, we’ll specify a limit. This just specifies that we only want to
return the first five values in our final DataFrame instead of all the data.
The last step is our action! Now we actually begin the process of collecting
the results of our DataFrame, and Spark will give us back a list or array in the
language that we’re executing. To reinforce all of this, let’s look at the
explain plan for the previous query:
// in Scala
flightData2015
.groupBy("DEST_COUNTRY_NAME")
.sum("count")
.withColumnRenamed("sum(count)", "destination_total")
.sort(desc("destination_total"))
.limit(5)
.explain()
# in Python
flightData2015\
.groupBy("DEST_COUNTRY_NAME")\
.sum("count")\
.withColumnRenamed("sum(count)", "destination_total")\
.sort(desc("destination_total"))\
.limit(5)\
.explain()
== Physical Plan ==
TakeOrderedAndProject(limit=5, orderBy=[destination_total#16194L DESC], outpu...
+- *HashAggregate(keys=[DEST_COUNTRY_NAME#7323], functions=[sum(count#7325L)])
+- Exchange hashpartitioning(DEST_COUNTRY_NAME#7323, 5)
+- *HashAggregate(keys=[DEST_COUNTRY_NAME#7323], functions=[partial_sum...
+- InMemoryTableScan [DEST_COUNTRY_NAME#7323, count#7325L]
+- InMemoryRelation [DEST_COUNTRY_NAME#7323, ORIGIN_COUNTRY_NA...
+- *Scan csv [DEST_COUNTRY_NAME#7578,ORIGIN_COUNTRY_NAME...
Although this explain plan doesn’t match our exact “conceptual plan,” all of
the pieces are there. You can see the limit statement as well as the orderBy
(in the first line). You can also see how our aggregation happens in two
phases, in the partial_sum calls. This is because summing a list of numbers
is commutative, and Spark can perform the sum, partition by partition. Of
course we can see how we read in the DataFrame, as well.
Naturally, we don’t always need to collect the data. We can also write it out
to any data source that Spark supports. For instance, suppose we want to store
the information in a database like PostgreSQL or write them out to another
file.
Conclusion
This chapter introduced the basics of Apache Spark. We talked about
transformations and actions, and how Spark lazily executes a DAG of
transformations in order to optimize the execution plan on DataFrames. We
also discussed how data is organized into partitions and set the stage for
working with more complex transformations. In Chapter 3 we take you on a
tour of the vast Spark ecosystem and look at some more advanced concepts
and tools that are available in Spark, from streaming to machine learning.
Chapter 3. A Tour of Spark’s
Toolset
In Chapter 2, we introduced Spark’s core concepts, like transformations and
actions, in the context of Spark’s Structured APIs. These simple conceptual
building blocks are the foundation of Apache Spark’s vast ecosystem of tools
and libraries (Figure 3-1). Spark is composed of these primitives—the lower-
level APIs and the Structured APIs—and then a series of standard libraries
for additional functionality.
Figure 3-1. Spark’s toolset
Spark’s libraries support a variety of different tasks, from graph analysis and
machine learning to streaming and integrations with a host of computing and
storage systems. This chapter presents a whirlwind tour of much of what
Spark has to offer, including some of the APIs we have not yet covered and a
few of the main libraries. For each section, you will find more detailed
information in other parts of this book; our purpose here is provide you with
an overview of what’s possible.
This chapter covers the following:
Running production applications with spark-submit
Datasets: type-safe APIs for structured data
Structured Streaming
Machine learning and advanced analytics
Resilient Distributed Datasets (RDD): Spark’s low level APIs
SparkR
The third-party package ecosystem
After you’ve taken the tour, you’ll be able to jump to the corresponding parts
of the book to find answers to your questions about particular topics.
Running Production Applications
Spark makes it easy to develop and create big data programs. Spark also
makes it easy to turn your interactive exploration into production applications
with spark-submit, a built-in command-line tool. spark-submit does one
thing: it lets you send your application code to a cluster and launch it to
execute there. Upon submission, the application will run until it exits
(completes the task) or encounters an error. You can do this with all of
Spark’s support cluster managers including Standalone, Mesos, and YARN.
spark-submit offers several controls with which you can specify the
resources your application needs as well as how it should be run and its
command-line arguments.
You can write applications in any of Spark’s supported languages and then
submit them for execution. The simplest example is running an application
on your local machine. We’ll show this by running a sample Scala
application that comes with Spark, using the following command in the
directory where you downloaded Spark:
./bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master local \
./examples/jars/spark-examples_2.11-2.2.0.jar 10
This sample application calculates the digits of pi to a certain level of
estimation. Here, we’ve told spark-submit that we want to run on our local
machine, which class and which JAR we would like to run, and some
command-line arguments for that class.
We can also run a Python version of the application using the following
command:
./bin/spark-submit \
--master local \
./examples/src/main/python/pi.py 10
By changing the master argument of spark-submit, we can also submit the
same application to a cluster running Spark’s standalone cluster manager,
Mesos or YARN.
spark-submit will come in handy to run many of the examples we’ve
packaged with this book. In the rest of this chapter, we’ll go through
examples of some APIs that we haven’t yet seen in our introduction to Spark.
Datasets: Type-Safe Structured APIs
The first API we’ll describe is a type-safe version of Spark’s structured API
called Datasets, for writing statically typed code in Java and Scala. The
Dataset API is not available in Python and R, because those languages are
dynamically typed.
Recall that DataFrames, which we saw in the previous chapter, are a
distributed collection of objects of type Row that can hold various types of
tabular data. The Dataset API gives users the ability to assign a Java/Scala
class to the records within a DataFrame and manipulate it as a collection of
typed objects, similar to a Java ArrayList or Scala Seq. The APIs available
on Datasets are type-safe, meaning that you cannot accidentally view the
objects in a Dataset as being of another class than the class you put in
initially. This makes Datasets especially attractive for writing large
applications, with which multiple software engineers must interact through
well-defined interfaces.
The Dataset class is parameterized with the type of object contained inside:
Dataset<T> in Java and Dataset[T] in Scala. For example, a
Dataset[Person] will be guaranteed to contain objects of class Person. As
of Spark 2.0, the supported types are classes following the JavaBean pattern
in Java and case classes in Scala. These types are restricted because Spark
needs to be able to automatically analyze the type T and create an appropriate
schema for the tabular data within your Dataset.
One great thing about Datasets is that you can use them only when you need
or want to. For instance, in the following example, we’ll define our own data
type and manipulate it via arbitrary map and filter functions. After we’ve
performed our manipulations, Spark can automatically turn it back into a
DataFrame, and we can manipulate it further by using the hundreds of
functions that Spark includes. This makes it easy to drop down to lower level,
perform type-safe coding when necessary, and move higher up to SQL for
more rapid analysis. Here is a small example showing how you can use both
type-safe functions and DataFrame-like SQL expressions to quickly write
business logic:
// in Scala
case class Flight(DEST_COUNTRY_NAME: String,
ORIGIN_COUNTRY_NAME: String,
count: BigInt)
val flightsDF = spark.read
.parquet("/data/flight-data/parquet/2010-summary.parquet/")
val flights = flightsDF.as[Flight]
One final advantage is that when you call collect or take on a Dataset, it
will collect objects of the proper type in your Dataset, not DataFrame Rows.
This makes it easy to get type safety and securely perform manipulation in a
distributed and a local manner without code changes:
// in Scala
flights
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(flight_row => flight_row)
.take(5)
flights
.take(5)
.filter(flight_row => flight_row.ORIGIN_COUNTRY_NAME != "Canada")
.map(fr => Flight(fr.DEST_COUNTRY_NAME, fr.ORIGIN_COUNTRY_NAME, fr.count + 5))
We cover Datasets in depth in Chapter 11.
Structured Streaming
Structured Streaming is a high-level API for stream processing that became
production-ready in Spark 2.2. With Structured Streaming, you can take the
same operations that you perform in batch mode using Spark’s structured
APIs and run them in a streaming fashion. This can reduce latency and allow
for incremental processing. The best thing about Structured Streaming is that
it allows you to rapidly and quickly extract value out of streaming systems
with virtually no code changes. It also makes it easy to conceptualize because
you can write your batch job as a way to prototype it and then you can
convert it to a streaming job. The way all of this works is by incrementally
processing that data.
Let’s walk through a simple example of how easy it is to get started with
Structured Streaming. For this, we will use a retail dataset, one that has
specific dates and times for us to be able to use. We will use the “by-day” set
of files, in which one file represents one day of data.
We put it in this format to simulate data being produced in a consistent and
regular manner by a different process. This is retail data so imagine that these
are being produced by retail stores and sent to a location where they will be
read by our Structured Streaming job.
It’s also worth sharing a sample of the data so you can reference what the
data looks like:
InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Countr
y
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01
08:26:00,2.55,17...
536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850.0,United
Kin...
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850...
To ground this, let’s first analyze the data as a static dataset and create a
DataFrame to do so. We’ll also create a schema from this static dataset (there
are ways of using schema inference with streaming that we will touch on in
Part V):
// in Scala
val staticDataFrame = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
val staticSchema = staticDataFrame.schema
# in Python
staticDataFrame = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/data/retail-data/by-day/*.csv")
staticDataFrame.createOrReplaceTempView("retail_data")
staticSchema = staticDataFrame.schema
Because we’re working with time–series data, it’s worth mentioning how we
might go along grouping and aggregating our data. In this example we’ll take
a look at the sale hours during which a given customer (identified by
CustomerId) makes a large purchase. For example, let’s add a total cost
column and see on what days a customer spent the most.
The window function will include all data from each day in the aggregation.
It’s simply a window over the time–series column in our data. This is a
helpful tool for manipulating date and timestamps because we can specify our
requirements in a more human form (via intervals), and Spark will group all
of them together for us:
// in Scala
import org.apache.spark.sql.functions.{window, column, desc, col}
staticDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))
.sum("total_cost")
.show(5)
# in Python
from pyspark.sql.functions import window, column, desc, col
staticDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")\
.show(5)
It’s worth mentioning that you can also run this as SQL code, just as we saw
in the previous chapter.
Here’s a sample of the output that you’ll see:
+----------+--------------------+------------------+
|CustomerId| window| sum(total_cost)|
+----------+--------------------+------------------+
| 17450.0|[2011-09-20 00:00...| 71601.44|
...
| null|[2011-12-08 00:00...|31975.590000000007|
+----------+--------------------+------------------+
The null values represent the fact that we don’t have a customerId for some
transactions.
That’s the static DataFrame version; there shouldn’t be any big surprises in
there if you’re familiar with the syntax.
Because you’re likely running this in local mode, it’s a good practice to set
the number of shuffle partitions to something that’s going to be a better fit for
local mode. This configuration specifies the number of partitions that should
be created after a shuffle. By default, the value is 200, but because there
aren’t many executors on this machine, it’s worth reducing this to 5. We did
this same operation in Chapter 2, so if you don’t remember why this is
important, feel free to flip back to review.
spark.conf.set("spark.sql.shuffle.partitions", "5")
Now that we’ve seen how that works, let’s take a look at the streaming code!
You’ll notice that very little actually changes about the code. The biggest
change is that we used readStream instead of read, additionally you’ll notice
the maxFilesPerTrigger option, which simply specifies the number of files we
should read in at once. This is to make our demonstration more “streaming,”
and in a production scenario this would probably be omitted.
val streamingDataFrame = spark.readStream
.schema(staticSchema)
.option("maxFilesPerTrigger", 1)
.format("csv")
.option("header", "true")
.load("/data/retail-data/by-day/*.csv")
# in Python
streamingDataFrame = spark.readStream\
.schema(staticSchema)\
.option("maxFilesPerTrigger", 1)\
.format("csv")\
.option("header", "true")\
.load("/data/retail-data/by-day/*.csv")
Now we can see whether our DataFrame is streaming:
streamingDataFrame.isStreaming // returns true
Let’s set up the same business logic as the previous DataFrame manipulation.
We’ll perform a summation in the process:
// in Scala
val purchaseByCustomerPerHour = streamingDataFrame
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")
.groupBy(
$"CustomerId", window($"InvoiceDate", "1 day"))
.sum("total_cost")
# in Python
purchaseByCustomerPerHour = streamingDataFrame\
.selectExpr(
"CustomerId",
"(UnitPrice * Quantity) as total_cost",
"InvoiceDate")\
.groupBy(
col("CustomerId"), window(col("InvoiceDate"), "1 day"))\
.sum("total_cost")
This is still a lazy operation, so we will need to call a streaming action to start
the execution of this data flow.
Streaming actions are a bit different from our conventional static action
because we’re going to be populating data somewhere instead of just calling
something like count (which doesn’t make any sense on a stream anyways).
The action we will use will output to an in-memory table that we will update
after each trigger. In this case, each trigger is based on an individual file (the
read option that we set). Spark will mutate the data in the in-memory table
such that we will always have the highest value as specified in our previous
aggregation:
// in Scala
purchaseByCustomerPerHour.writeStream
.format("memory") // memory = store in-memory table
.queryName("customer_purchases") // the name of the in-memory table
.outputMode("complete") // complete = all the counts should be in the table
.start()
# in Python
purchaseByCustomerPerHour.writeStream\
.format("memory")\
.queryName("customer_purchases")\
.outputMode("complete")\
.start()
When we start the stream, we can run queries against it to debug what our
result will look like if we were to write this out to a production sink:
// in Scala
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")
.show(5)
# in Python
spark.sql("""
SELECT *
FROM customer_purchases
ORDER BY `sum(total_cost)` DESC
""")\
.show(5)
You’ll notice that the composition of our table changes as we read in more
data! With each file, the results might or might not be changing based on the
data. Naturally, because we’re grouping customers, we hope to see an
increase in the top customer purchase amounts over time (and do for a period
of time!). Another option you can use is to write the results out to the
console:
purchaseByCustomerPerHour.writeStream
.format("console")
.queryName("customer_purchases_2")
.outputMode("complete")
.start()
You shouldn’t use either of these streaming methods in production, but they
do make for convenient demonstration of Structured Streaming’s power.
Notice how this window is built on event time, as well, not the time at which
Spark processes the data. This was one of the shortcomings of Spark
Streaming that Structured Streaming has resolved. We cover Structured
Streaming in depth in Part V.
Machine Learning and Advanced Analytics
Another popular aspect of Spark is its ability to perform large-scale machine
learning with a built-in library of machine learning algorithms called MLlib.
MLlib allows for preprocessing, munging, training of models, and making
predictions at scale on data. You can even use models trained in MLlib to
make predictions in Strucutred Streaming. Spark provides a sophisticated
machine learning API for performing a variety of machine learning tasks,
from classification to regression, and clustering to deep learning. To
demonstrate this functionality, we will perform some basic clustering on our
data using a standard algorithm called -means.
WHAT IS K-MEANS?
-means is a clustering algorithm in which “” centers are randomly assigned within the
data. The points closest to that point are then “assigned” to a class and the center of the
assigned points is computed. This center point is called the centroid. We then label the
points closest to that centroid, to the centroid’s class, and shift the centroid to the new
center of that cluster of points. We repeat this process for a finite set of iterations or until
convergence (our center points stop changing).
Spark includes a number of preprocessing methods out of the box. To
demonstrate these methods, we will begin with some raw data, build up
transformations before getting the data into the right format, at which point
we can actually train our model and then serve predictions:
staticDataFrame.printSchema()
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: timestamp (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: double (nullable = true)
|-- Country: string (nullable = true)
Machine learning algorithms in MLlib require that data is represented as
numerical values. Our current data is represented by a variety of different
types, including timestamps, integers, and strings. Therefore we need to
transform this data into some numerical representation. In this instance, we’ll
use several DataFrame transformations to manipulate our date data:
// in Scala
import org.apache.spark.sql.functions.date_format
val preppedDataFrame = staticDataFrame
.na.fill(0)
.withColumn("day_of_week", date_format($"InvoiceDate", "EEEE"))
.coalesce(5)
# in Python
from pyspark.sql.functions import date_format, col
preppedDataFrame = staticDataFrame\
.na.fill(0)\
.withColumn("day_of_week", date_format(col("InvoiceDate"), "EEEE"))\
.coalesce(5)
We are also going to need to split the data into training and test sets. In this
instance, we are going to do this manually by the date on which a certain
purchase occurred; however, we could also use MLlib’s transformation APIs
to create a training and test set via train validation splits or cross validation
(these topics are covered at length in Part VI):
// in Scala
val trainDataFrame = preppedDataFrame
.where("InvoiceDate < '2011-07-01'")
val testDataFrame = preppedDataFrame
.where("InvoiceDate >= '2011-07-01'")
# in Python
trainDataFrame = preppedDataFrame\
.where("InvoiceDate < '2011-07-01'")
testDataFrame = preppedDataFrame\
.where("InvoiceDate >= '2011-07-01'")
Now that we’ve prepared the data, let’s split it into a training and test set.
Because this is a time–series set of data, we will split by an arbitrary date in
the dataset. Although this might not be the optimal split for our training and
test, for the intents and purposes of this example it will work just fine. We’ll
see that this splits our dataset roughly in half:
trainDataFrame.count()
testDataFrame.count()
Note that these transformations are DataFrame transformations, which we
cover extensively in Part II. Spark’s MLlib also provides a number of
transformations with which we can automate some of our general
transformations. One such transformer is a StringIndexer:
// in Scala
import org.apache.spark.ml.feature.StringIndexer
val indexer = new StringIndexer()
.setInputCol("day_of_week")
.setOutputCol("day_of_week_index")
# in Python
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer()\
.setInputCol("day_of_week")\
.setOutputCol("day_of_week_index")
This will turn our days of weeks into corresponding numerical values. For
example, Spark might represent Saturday as 6, and Monday as 1. However,
with this numbering scheme, we are implicitly stating that Saturday is greater
than Monday (by pure numerical values). This is obviously incorrect. To fix
this, we therefore need to use a OneHotEncoder to encode each of these
values as their own column. These Boolean flags state whether that day of
week is the relevant day of the week:
// in Scala
import org.apache.spark.ml.feature.OneHotEncoder
val encoder = new OneHotEncoder()
.setInputCol("day_of_week_index")
.setOutputCol("day_of_week_encoded")
# in Python
from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder()\
.setInputCol("day_of_week_index")\
.setOutputCol("day_of_week_encoded")
Each of these will result in a set of columns that we will “assemble” into a
vector. All machine learning algorithms in Spark take as input a Vector type,
which must be a set of numerical values:
// in Scala
import org.apache.spark.ml.feature.VectorAssembler
val vectorAssembler = new VectorAssembler()
.setInputCols(Array("UnitPrice", "Quantity", "day_of_week_encoded"))
.setOutputCol("features")
# in Python
from pyspark.ml.feature import VectorAssembler
vectorAssembler = VectorAssembler()\
.setInputCols(["UnitPrice", "Quantity", "day_of_week_encoded"])\
.setOutputCol("features")
Here, we have three key features: the price, the quantity, and the day of week.
Next, we’ll set this up into a pipeline so that any future data we need to
transform can go through the exact same process:
// in Scala
import org.apache.spark.ml.Pipeline
val transformationPipeline = new Pipeline()
.setStages(Array(indexer, encoder, vectorAssembler))
# in Python
from pyspark.ml import Pipeline
transformationPipeline = Pipeline()\
.setStages([indexer, encoder, vectorAssembler])
Preparing for training is a two-step process. We first need to fit our
transformers to this dataset. We cover this in depth in Part VI, but basically
our StringIndexer needs to know how many unique values there are to be
indexed. After those exist, encoding is easy but Spark must look at all the
distinct values in the column to be indexed in order to store those values later
on:
// in Scala
val fittedPipeline = transformationPipeline.fit(trainDataFrame)
# in Python
fittedPipeline = transformationPipeline.fit(trainDataFrame)
After we fit the training data, we are ready to take that fitted pipeline and use
it to transform all of our data in a consistent and repeatable way:
// in Scala
val transformedTraining = fittedPipeline.transform(trainDataFrame)
# in Python
transformedTraining = fittedPipeline.transform(trainDataFrame)
At this point, it’s worth mentioning that we could have included our model
training in our pipeline. We chose not to in order to demonstrate a use case
for caching the data. Instead, we’re going to perform some hyperparameter
tuning on the model because we do not want to repeat the exact same
transformations over and over again; specifically, we’ll use caching, an
optimization that we discuss in more detail in Part IV. This will put a copy of
the intermediately transformed dataset into memory, allowing us to
repeatedly access it at much lower cost than running the entire pipeline again.
If you’re curious to see how much of a difference this makes, skip this line
and run the training without caching the data. Then try it after caching; you’ll
see the results are significant:
transformedTraining.cache()
We now have a training set; it’s time to train the model. First we’ll import the
relevant model that we’d like to use and instantiate it:
// in Scala
import org.apache.spark.ml.clustering.KMeans
val kmeans = new KMeans()
.setK(20)
.setSeed(1L)
# in Python
from pyspark.ml.clustering import KMeans
kmeans = KMeans()\
.setK(20)\
.setSeed(1L)
In Spark, training machine learning models is a two-phase process. First, we
initialize an untrained model, and then we train it. There are always two types
for every algorithm in MLlib’s DataFrame API. They follow the naming
pattern of Algorithm, for the untrained version, and AlgorithmModel for the
trained version. In our example, this is KMeans and then KMeansModel.
Estimators in MLlib’s DataFrame API share roughly the same interface that
we saw earlier with our preprocessing transformers like the StringIndexer.
This should come as no surprise because it makes training an entire pipeline
(which includes the model) simple. For our purposes here, we want to do
things a bit more step by step, so we chose to not do this in this example:
// in Scala
val kmModel = kmeans.fit(transformedTraining)
# in Python
kmModel = kmeans.fit(transformedTraining)
After we train this model, we can compute the cost according to some
success merits on our training set. The resulting cost on this dataset is
actually quite high, which is likely due to the fact that we did not properly
preprocess and scale our input data, which we cover in depth in Chapter 25:
kmModel.computeCost(transformedTraining)
// in Scala
val transformedTest = fittedPipeline.transform(testDataFrame)
# in Python
transformedTest = fittedPipeline.transform(testDataFrame)
kmModel.computeCost(transformedTest)
Naturally, we could continue to improve this model, layering more
preprocessing as well as performing hyperparameter tuning to ensure that
we’re getting a good model. We leave that discussion for Part VI.
Lower-Level APIs
Spark includes a number of lower-level primitives to allow for arbitrary Java
and Python object manipulation via Resilient Distributed Datasets (RDDs).
Virtually everything in Spark is built on top of RDDs. As we will discuss in
Chapter 4, DataFrame operations are built on top of RDDs and compile down
to these lower-level tools for convenient and extremely efficient distributed
execution. There are some things that you might use RDDs for, especially
when you’re reading or manipulating raw data, but for the most part you
should stick to the Structured APIs. RDDs are lower level than DataFrames
because they reveal physical execution characteristics (like partitions) to end
users.
One thing that you might use RDDs for is to parallelize raw data that you
have stored in memory on the driver machine. For instance, let’s parallelize
some simple numbers and create a DataFrame after we do so. We then can
convert that to a DataFrame to use it with other DataFrames:
// in Scala
spark.sparkContext.parallelize(Seq(1, 2, 3)).toDF()
# in Python
from pyspark.sql import Row
spark.sparkContext.parallelize([Row(1), Row(2), Row(3)]).toDF()
RDDs are available in Scala as well as Python. However, they’re not
equivalent. This differs from the DataFrame API (where the execution
characteristics are the same) due to some underlying implementation details.
We cover lower-level APIs, including RDDs in Part IV. As end users, you
shouldn’t need to use RDDs much in order to perform many tasks unless
you’re maintaining older Spark code. There are basically no instances in
modern Spark, for which you should be using RDDs instead of the structured
APIs beyond manipulating some very raw unprocessed and unstructured data.
SparkR
SparkR is a tool for running R on Spark. It follows the same principles as all
of Spark’s other language bindings. To use SparkR, you simply import it into
your environment and run your code. It’s all very similar to the Python API
except that it follows R’s syntax instead of Python. For the most part, almost
everything available in Python is available in SparkR:
# in R
library(SparkR)
sparkDF <- read.df("/data/flight-data/csv/2015-summary.csv",
source = "csv", header="true", inferSchema = "true")
take(sparkDF, 5)
# in R
collect(orderBy(sparkDF, "count"), 20)
R users can also use other R libraries like the pipe operator in magrittr to
make Spark transformations a bit more R-like. This can make it easy to use
with other libraries like ggplot for more sophisticated plotting:
# in R
library(magrittr)
sparkDF %>%
orderBy(desc(sparkDF$count)) %>%
groupBy("ORIGIN_COUNTRY_NAME") %>%
count() %>%
limit(10) %>%
collect()
We will not include R code samples as we do in Python, because almost
every concept throughout this book that applies to Python also applies to
SparkR. The only difference will by syntax. We cover SparkR and sparklyr in
Part VII.
Spark’s Ecosystem and Packages
One of the best parts about Spark is the ecosystem of packages and tools that
the community has created. Some of these tools even move into the core
Spark project as they mature and become widely used. As of this writing, the
list of packages is rather long, numbering over 300—and more are added
frequently. You can find the largest index of Spark Packages at spark-
packages.org, where any user can publish to this package repository. There
are also various other projects and packages that you can find on the web; for
example, on GitHub.
Conclusion
We hope this chapter showed you the sheer variety of ways in which you can
apply Spark to your own business and technical challenges. Spark’s simple,
robust programming model makes it easy to apply to a large number of
problems, and the vast array of packages that have crept up around it, created
by hundreds of different people, are a true testament to Spark’s ability to
robustly tackle a number of business problems and challenges. As the
ecosystem and community grows, it’s likely that more and more packages
will continue to crop up. We look forward to seeing what the community has
in store!
The rest of this book will provide deeper dives into the product areas in
Figure 3-1.
You may read the rest of the book any way that you prefer, we find that most
people hop from area to area as they hear terminology or want to apply Spark
to certain problems they’re facing.
Part II. Structured APIs—
DataFrames, SQL, and Datasets
Chapter 4. Structured API
Overview
This part of the book will be a deep dive into Spark’s Structured APIs. The
Structured APIs are a tool for manipulating all sorts of data, from
unstructured log files to semi-structured CSV files and highly structured
Parquet files. These APIs refer to three core types of distributed collection
APIs:
Datasets
DataFrames
SQL tables and views
Although they are distinct parts of the book, the majority of the Structured
APIs apply to both batch and streaming computation. This means that when
you work with the Structured APIs, it should be simple to migrate from batch
to streaming (or vice versa) with little to no effort. We’ll cover streaming in
detail in Part V.
The Structured APIs are the fundamental abstraction that you will use to
write the majority of your data flows. Thus far in this book, we have taken a
tutorial-based approach, meandering our way through much of what Spark
has to offer. This part offers a more in-depth exploration. In this chapter,
we’ll introduce the fundamental concepts that you should understand: the
typed and untyped APIs (and their differences); what the core terminology is;
and, finally, how Spark actually takes your Structured API data flows and
executes it on the cluster. We will then provide more specific task-based
information for working with certain types of data or data sources.
NOTE
Before proceeding, let’s review the fundamental concepts and definitions that we covered
in Part I. Spark is a distributed programming model in which the user specifies
transformations. Multiple transformations build up a directed acyclic graph of
instructions. An action begins the process of executing that graph of instructions, as a
single job, by breaking it down into stages and tasks to execute across the cluster. The
logical structures that we manipulate with transformations and actions are DataFrames and
Datasets. To create a new DataFrame or Dataset, you call a transformation. To start
computation or convert to native language types, you call an action.
DataFrames and Datasets
Part I discussed DataFrames. Spark has two notions of structured collections:
DataFrames and Datasets. We will touch on the (nuanced) differences
shortly, but let’s define what they both represent first.
DataFrames and Datasets are (distributed) table-like collections with well-
defined rows and columns. Each column must have the same number of rows
as all the other columns (although you can use null to specify the absence of
a value) and each column has type information that must be consistent for
every row in the collection. To Spark, DataFrames and Datasets represent
immutable, lazily evaluated plans that specify what operations to apply to
data residing at a location to generate some output. When we perform an
action on a DataFrame, we instruct Spark to perform the actual
transformations and return the result. These represent plans of how to
manipulate rows and columns to compute the user’s desired result.
NOTE
Tables and views are basically the same thing as DataFrames. We just execute SQL
against them instead of DataFrame code. We cover all of this in Chapter 10, which focuses
specifically on Spark SQL.
To add a bit more specificity to these definitions, we need to talk about
schemas, which are the way you define the types of data you’re storing in this
distributed collection.
Schemas
A schema defines the column names and types of a DataFrame. You can
define schemas manually or read a schema from a data source (often called
schema on read). Schemas consist of types, meaning that you need a way of
specifying what lies where.
Overview of Structured Spark Types
Spark is effectively a programming language of its own. Internally, Spark
uses an engine called Catalyst that maintains its own type information
through the planning and processing of work. In doing so, this opens up a
wide variety of execution optimizations that make significant differences.
Spark types map directly to the different language APIs that Spark maintains
and there exists a lookup table for each of these in Scala, Java, Python, SQL,
and R. Even if we use Spark’s Structured APIs from Python or R, the
majority of our manipulations will operate strictly on Spark types, not Python
types. For example, the following code does not perform addition in Scala or
Python; it actually performs addition purely in Spark:
// in Scala
val df = spark.range(500).toDF("number")
df.select(df.col("number") + 10)
# in Python
df = spark.range(500).toDF("number")
df.select(df["number"] + 10)
This addition operation happens because Spark will convert an expression
written in an input language to Spark’s internal Catalyst representation of that
same type information. It then will operate on that internal representation. We
touch on why this is the case momentarily, but before we can, we need to
discuss Datasets.
DataFrames Versus Datasets
In essence, within the Structured APIs, there are two more APIs, the
“untyped” DataFrames and the “typed” Datasets. To say that DataFrames are
untyped is aslightly inaccurate; they have types, but Spark maintains them
completely and only checks whether those types line up to those specified in
the schema at runtime. Datasets, on the other hand, check whether types
conform to the specification at compile time. Datasets are only available to
Java Virtual Machine (JVM)–based languages (Scala and Java) and we
specify types with case classes or Java beans.
For the most part, you’re likely to work with DataFrames. To Spark (in
Scala), DataFrames are simply Datasets of Type Row. The “Row” type is
Spark’s internal representation of its optimized in-memory format for
computation. This format makes for highly specialized and efficient
computation because rather than using JVM types, which can cause high
garbage-collection and object instantiation costs, Spark can operate on its
own internal format without incurring any of those costs. To Spark (in Python
or R), there is no such thing as a Dataset: everything is a DataFrame and
therefore we always operate on that optimized format.
NOTE
The internal Catalyst format is well covered in numerous Spark presentations. Given that
this book is intended for a more general audience, we’ll refrain from going into the
implementation. If you’re curious, there are some excellent talks by Josh Rosen and
Herman van Hovell, both of Databricks, about their work in the development of Spark’s
Catalyst engine.
Understanding DataFrames, Spark Types, and Schemas takes some time to
digest. What you need to know is that when you’re using DataFrames, you’re
taking advantage of Spark’s optimized internal format. This format applies
the same efficiency gains to all of Spark’s language APIs. If you need strict
compile-time checking, read Chapter 11 to learn more about it.
Let’s move onto some friendlier and more approachable concepts: columns
and rows.
Columns
Columns represent a simple type like an integer or string, a complex type like
an array or map, or a null value. Spark tracks all of this type information for
you and offers a variety of ways, with which you can transform columns.
Columns are discussed extensively in Chapter 5, but for the most part you
can think about Spark Column types as columns in a table.
Rows
A row is nothing more than a record of data. Each record in a DataFrame
must be of type Row, as we can see when we collect the following
DataFrames. We can create these rows manually from SQL, from Resilient
Distributed Datasets (RDDs), from data sources, or manually from scratch.
Here, we create one by using a range:
// in Scala
spark.range(2).toDF().collect()
# in Python
spark.range(2).collect()
These both result in an array of Row objects.
Spark Types
We mentioned earlier that Spark has a large number of internal type
representations. We include a handy reference table on the next several pages
so that you can most easily reference what type, in your specific language,
lines up with the type in Spark.
Before getting to those tables, let’s talk about how we instantiate, or declare,
a column to be of a certain type.
To work with the correct Scala types, use the following:
import org.apache.spark.sql.types._
val b = ByteType
To work with the correct Java types, you should use the factory methods in
the following package:
import org.apache.spark.sql.types.DataTypes;
ByteType x = DataTypes.ByteType;
Python types at times have certain requirements, which you can see listed in
Table 4-1, as do Scala and Java, which you can see listed in Tables 4-2 and 4-
3, respectively. To work with the correct Python types, use the following:
from pyspark.sql.types import *
b = ByteType()
The following tables provide the detailed type information for each of
Spark’s language bindings.
Table 4-1. Python type reference
Data type Value type in Python API to access or
create a data type
ByteType
int or long. Note: Numbers will be converted to 1-
byte signed integer numbers at runtime. Ensure that
numbers are within the range of –128 to 127.
ByteType()
ShortType
int or long. Note: Numbers will be converted to 2-
byte signed integer numbers at runtime. Ensure that
numbers are within the range of –32768 to 32767.
ShortType()
IntegerType
int or long. Note: Python has a lenient definition of
“integer.” Numbers that are too large will be
rejected by Spark SQL if you use the IntegerType().
It’s best practice to use LongType.
IntegerType()
LongType
long. Note: Numbers will be converted to 8-byte
signed integer numbers at runtime. Ensure that
numbers are within the range of –
9223372036854775808 to 9223372036854775807.
Otherwise, convert data to decimal.Decimal and use
DecimalType.
LongType()
float. Note: Numbers will be converted to 4-byte
FloatType single-precision floating-point numbers at runtime. FloatType()
DoubleType float DoubleType()
DecimalType decimal.Decimal DecimalType()
StringType string StringType()
BinaryType bytearray BinaryType()
BooleanType bool BooleanType()
TimestampType datetime.datetime TimestampType()
DateType datetime.date DateType()
ArrayType list, tuple, or array
ArrayType(elementType,
[containsNull]). Note:
The default value of
containsNull is True.
MapType dict
MapType(keyType,
valueType,
[valueContainsNull]).
Note: The default value
of valueContainsNull is
True.
StructType list or tuple
StructType(fields). Note:
fields is a list of
StructFields. Also, fields
with the same name are
not allowed.
StructField
The value type in Python of the data type of this
field (for example, Int for a StructField with the
data type IntegerType)
StructField(name,
dataType, [nullable])
Note: The default value
of nullable is True.
Table 4-2. Scala type reference
Data type Value type in Scala API to access or create a data
type
ByteType Byte ByteType
ShortType Short ShortType
IntegerType Int IntegerType
LongType Long LongType
FloatType Float FloatType
DoubleType Double DoubleType
DecimalType java.math.BigDecimal DecimalType
StringType String StringType
BinaryType Array[Byte] BinaryType
BooleanType Boolean BooleanType
TimestampType java.sql.Timestamp TimestampType
DateType java.sql.Date DateType
ArrayType scala.collection.Seq
ArrayType(elementType,
[containsNull]). Note: The default
value of containsNull is true.
MapType scala.collection.Map
MapType(keyType, valueType,
[valueContainsNull]). Note: The
default value of valueContainsNull is
true.
StructType org.apache.spark.sql.Row
StructType(fields). Note: fields is an
Array of StructFields. Also, fields
with the same name are not allowed.
StructField
The value type in Scala of the data type
of this field (for example, Int for a
StructField with the data type
IntegerType)
StructField(name, dataType,
[nullable]). Note: The default value
of nullable is true.
Table 4-3. Java type reference
Data type Value type in Java API to access or create a data type
ByteType byte or Byte DataTypes.ByteType
ShortType short or Short DataTypes.ShortType
IntegerType int or Integer DataTypes.IntegerType
LongType long or Long DataTypes.LongType
FloatType float or Float DataTypes.FloatType
DoubleType double or Double DataTypes.DoubleType
DecimalType java.math.BigDecimal
DataTypes.createDecimalType()
DataTypes.createDecimalType(precision,
scale).
StringType String DataTypes.StringType
BinaryType byte[] DataTypes.BinaryType
BooleanType boolean or Boolean DataTypes.BooleanType
TimestampType java.sql.Timestamp DataTypes.TimestampType
DateType java.sql.Date DataTypes.DateType
ArrayType java.util.List
DataTypes.createArrayType(elementType).
Note: The value of containsNull will be true
DataTypes.createArrayType(elementType,
containsNull).
MapType java.util.Map
DataTypes.createMapType(keyType,
valueType). Note: The value of
valueContainsNull will be true.
DataTypes.createMapType(keyType,
valueType, valueContainsNull)
StructType org.apache.spark.sql.Row
DataTypes.createStructType(fields). Note:
fields is a List or an array of StructFields.
Also, two fields with the same name are not
allowed.
StructField
The value type in Java of the
data type of this field (for
example, int for a StructField
with the data type IntegerType)
DataTypes.createStructField(name, dataType,
nullable)
It’s worth keeping in mind that the types might change over time as Spark
SQL continues to grow so you may want to reference Spark’s documentation
for future updates. Of course, all of these types are great, but you almost
never work with purely static DataFrames. You will always manipulate and
transform them. Therefore it’s important that we give you an overview of the
execution process in the Structured APIs.
Overview of Structured API Execution
This section will demonstrate how this code is actually executed across a
cluster. This will help you understand (and potentially debug) the process of
writing and executing code on clusters, so let’s walk through the execution of
a single structured API query from user code to executed code. Here’s an
overview of the steps:
1. Write DataFrame/Dataset/SQL Code.
2. If valid code, Spark converts this to a Logical Plan.
3. Spark transforms this Logical Plan to a Physical Plan, checking for
optimizations along the way.
4. Spark then executes this Physical Plan (RDD manipulations) on the
cluster.
To execute code, we must write code. This code is then submitted to Spark
either through the console or via a submitted job. This code then passes
through the Catalyst Optimizer, which decides how the code should be
executed and lays out a plan for doing so before, finally, the code is run and
the result is returned to the user. Figure 4-1 shows the process.
Figure 4-1. The Catalyst Optimizer
Logical Planning
The first phase of execution is meant to take user code and convert it into a
logical plan. Figure 4-2 illustrates this process.
Figure 4-2. The structured API logical planning process
This logical plan only represents a set of abstract transformations that do not
refer to executors or drivers, it’s purely to convert the user’s set of
expressions into the most optimized version. It does this by converting user
code into an unresolved logical plan. This plan is unresolved because
although your code might be valid, the tables or columns that it refers to
might or might not exist. Spark uses the catalog, a repository of all table and
DataFrame information, to resolve columns and tables in the analyzer. The
analyzer might reject the unresolved logical plan if the required table or
column name does not exist in the catalog. If the analyzer can resolve it, the
result is passed through the Catalyst Optimizer, a collection of rules that
attempt to optimize the logical plan by pushing down predicates or selections.
Packages can extend the Catalyst to include their own rules for domain-
specific optimizations.
Physical Planning
After successfully creating an optimized logical plan, Spark then begins the
physical planning process. The physical plan, often called a Spark plan,
specifies how the logical plan will execute on the cluster by generating
different physical execution strategies and comparing them through a cost
model, as depicted in Figure 4-3. An example of the cost comparison might
be choosing how to perform a given join by looking at the physical attributes
of a given table (how big the table is or how big its partitions are).
Figure 4-3. The physical planning process
Physical planning results in a series of RDDs and transformations. This result
is why you might have heard Spark referred to as a compiler—it takes queries
in DataFrames, Datasets, and SQL and compiles them into RDD
transformations for you.
Execution
Upon selecting a physical plan, Spark runs all of this code over RDDs, the
lower-level programming interface of Spark (which we cover in Part III).
Spark performs further optimizations at runtime, generating native Java
bytecode that can remove entire tasks or stages during execution. Finally the
result is returned to the user.
Conclusion
In this chapter, we covered Spark Structured APIs and how Spark transforms
your code into what will physically execute on the cluster. In the chapters
that follow, we cover core concepts and how to use the key functionality of
the Structured APIs.
Chapter 5. Basic Structured
Operations
In Chapter 4, we introduced the core abstractions of the Structured API. This
chapter moves away from the architectural concepts and toward the tactical
tools you will use to manipulate DataFrames and the data within them. This
chapter focuses exclusively on fundamental DataFrame operations and avoids
aggregations, window functions, and joins. These are discussed in subsequent
chapters.
Definitionally, a DataFrame consists of a series of records (like rows in a
table), that are of type Row, and a number of columns (like columns in a
spreadsheet) that represent a computation expression that can be performed
on each individual record in the Dataset. Schemas define the name as well as
the type of data in each column. Partitioning of the DataFrame defines the
layout of the DataFrame or Dataset’s physical distribution across the cluster.
The partitioning scheme defines how that is allocated. You can set this to be
based on values in a certain column or nondeterministically.
Let’s create a DataFrame with which we can work:
// in Scala
val df = spark.read.format("json")
.load("/data/flight-data/json/2015-summary.json")
# in Python
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
We discussed that a DataFame will have columns, and we use a schema to
define them. Let’s take a look at the schema on our current DataFrame:
df.printSchema()
Schemas tie everything together, so they’re worth belaboring.
Schemas
A schema defines the column names and types of a DataFrame. We can either
let a data source define the schema (called schema-on-read) or we can define
it explicitly ourselves.
WARNING
Deciding whether you need to define a schema prior to reading in your data depends on
your use case. For ad hoc analysis, schema-on-read usually works just fine (although at
times it can be a bit slow with plain-text file formats like CSV or JSON). However, this
can also lead to precision issues like a long type incorrectly set as an integer when reading
in a file. When using Spark for production Extract, Transform, and Load (ETL), it is often
a good idea to define your schemas manually, especially when working with untyped data
sources like CSV and JSON because schema inference can vary depending on the type of
data that you read in.
Let’s begin with a simple file, which we saw in Chapter 4, and let the semi-
structured nature of line-delimited JSON define the structure. This is flight
data from the United States Bureau of Transportation statistics:
// in Scala
spark.read.format("json").load("/data/flight-data/json/2015-
summary.json").schema
Scala returns the following:
org.apache.spark.sql.types.StructType = ...
StructType(StructField(DEST_COUNTRY_NAME,StringType,true),
StructField(ORIGIN_COUNTRY_NAME,StringType,true),
StructField(count,LongType,true))
# in Python
spark.read.format("json").load("/data/flight-data/json/2015-
summary.json").schema
Python returns the following:
StructType(List(StructField(DEST_COUNTRY_NAME,StringType,true),
StructField(ORIGIN_COUNTRY_NAME,StringType,true),
StructField(count,LongType,true)))
A schema is a StructType made up of a number of fields, StructFields,
that have a name, type, a Boolean flag which specifies whether that column
can contain missing or null values, and, finally, users can optionally specify
associated metadata with that column. The metadata is a way of storing
information about this column (Spark uses this in its machine learning
library).
Schemas can contain other StructTypes (Spark’s complex types). We will
see this in Chapter 6 when we discuss working with complex types. If the
types in the data (at runtime) do not match the schema, Spark will throw an
error. The example that follows shows how to create and enforce a specific
schema on a DataFrame.
// in Scala
import org.apache.spark.sql.types.{StructField, StructType, StringType,
LongType}
import org.apache.spark.sql.types.Metadata
val myManualSchema = StructType(Array(
StructField("DEST_COUNTRY_NAME", StringType, true),
StructField("ORIGIN_COUNTRY_NAME", StringType, true),
StructField("count", LongType, false,
Metadata.fromJson("{\"hello\":\"world\"}"))
))
val df = spark.read.format("json").schema(myManualSchema)
.load("/data/flight-data/json/2015-summary.json")
Here’s how to do the same in Python:
# in Python
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("DEST_COUNTRY_NAME", StringType(), True),
StructField("ORIGIN_COUNTRY_NAME", StringType(), True),
StructField("count", LongType(), False, metadata={"hello":"world"})
])
df = spark.read.format("json").schema(myManualSchema)\
.load("/data/flight-data/json/2015-summary.json")
As discussed in Chapter 4, we cannot simply set types via the per-language
types because Spark maintains its own type information. Let’s now discuss
what schemas define: columns.
Columns and Expressions
Columns in Spark are similar to columns in a spreadsheet, R dataframe, or
pandas DataFrame. You can select, manipulate, and remove columns from
DataFrames and these operations are represented as expressions.
To Spark, columns are logical constructions that simply represent a value
computed on a per-record basis by means of an expression. This means that
to have a real value for a column, we need to have a row; and to have a row,
we need to have a DataFrame. You cannot manipulate an individual column
outside the context of a DataFrame; you must use Spark transformations
within a DataFrame to modify the contents of a column.
Columns
There are a lot of different ways to construct and refer to columns but the two
simplest ways are by using the col or column functions. To use either of
these functions, you pass in a column name:
// in Scala
import org.apache.spark.sql.functions.{col, column}
col("someColumnName")
column("someColumnName")
# in Python
from pyspark.sql.functions import col, column
col("someColumnName")
column("someColumnName")
We will stick to using col throughout this book. As mentioned, this column
might or might not exist in our DataFrames. Columns are not resolved until
we compare the column names with those we are maintaining in the catalog.
Column and table resolution happens in the analyzer phase, as discussed in
Chapter 4.
NOTE
We just mentioned two different ways of referring to columns. Scala has some unique
language features that allow for more shorthand ways of referring to columns. The
following bits of syntactic sugar perform the exact same thing, namely creating a column,
but provide no performance improvement:
// in Scala
$"myColumn"
'myColumn
The $ allows us to designate a string as a special string that should refer to an expression.
The tick mark (') is a special thing called a symbol; this is a Scala-specific construct of
referring to some identifier. They both perform the same thing and are shorthand ways of
referring to columns by name. You’ll likely see all of the aforementioned references when
you read different people’s Spark code. We leave it to you to use whatever is most
comfortable and maintainable for you and those with whom you work.
Explicit column references
If you need to refer to a specific DataFrame’s column, you can use the col
method on the specific DataFrame. This can be useful when you are
performing a join and need to refer to a specific column in one DataFrame
that might share a name with another column in the joined DataFrame. We
will see this in Chapter 8. As an added benefit, Spark does not need to resolve
this column itself (during the analyzer phase) because we did that for Spark:
df.col("count")
Expressions
We mentioned earlier that columns are expressions, but what is an
expression? An expression is a set of transformations on one or more values
in a record in a DataFrame. Think of it like a function that takes as input one
or more column names, resolves them, and then potentially applies more
expressions to create a single value for each record in the dataset.
Importantly, this “single value” can actually be a complex type like a Map or
Array. We’ll see more of the complex types in Chapter 6.
In the simplest case, an expression, created via the expr function, is just a
DataFrame column reference. In the simplest case, expr("someCol") is
equivalent to col("someCol").
Columns as expressions
Columns provide a subset of expression functionality. If you use col() and
want to perform transformations on that column, you must perform those on
that column reference. When using an expression, the expr function can
actually parse transformations and column references from a string and can
subsequently be passed into further transformations. Let’s look at some
examples.
expr("someCol - 5") is the same transformation as performing
col("someCol") - 5, or even expr("someCol") - 5. That’s because
Spark compiles these to a logical tree specifying the order of operations. This
might be a bit confusing at first, but remember a couple of key points:
Columns are just expressions.
Columns and transformations of those columns compile to the same
logical plan as parsed expressions.
Let’s ground this with an example:
(((col("someCol") + 5) * 200) - 6) < col("otherCol")
Figure 5-1 shows an overview of that logical tree.
Figure 5-1. A logical tree
This might look familiar because it’s a directed acyclic graph. This graph is
represented equivalently by the following code:
// in Scala
import org.apache.spark.sql.functions.expr
expr("(((someCol + 5) * 200) - 6) < otherCol")
# in Python
from pyspark.sql.functions import expr
expr("(((someCol + 5) * 200) - 6) < otherCol")
This is an extremely important point to reinforce. Notice how the previous
expression is actually valid SQL code, as well, just like you might put in a
SELECT statement? That’s because this SQL expression and the previous
DataFrame code compile to the same underlying logical tree prior to
execution. This means that you can write your expressions as DataFrame
code or as SQL expressions and get the exact same performance
characteristics. This is discussed in Chapter 4.
Accessing a DataFrame’s columns
Sometimes, you’ll need to see a DataFrame’s columns, which you can do by
using something like printSchema; however, if you want to
programmatically access columns, you can use the columns property to see
all columns on a DataFrame:
spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
.columns
Records and Rows
In Spark, each row in a DataFrame is a single record. Spark represents this
record as an object of type Row. Spark manipulates Row objects using column
expressions in order to produce usable values. Row objects internally
represent arrays of bytes. The byte array interface is never shown to users
because we only use column expressions to manipulate them.
You’ll notice commands that return individual rows to the driver will always
return one or more Row types when we are working with DataFrames.
NOTE
We use lowercase “row” and “record” interchangeably in this chapter, with a focus on the
latter. A capitalized Row refers to the Row object.
Let’s see a row by calling first on our DataFrame:
df.first()
Creating Rows
You can create rows by manually instantiating a Row object with the values
that belong in each column. It’s important to note that only DataFrames have
schemas. Rows themselves do not have schemas. This means that if you
create a Row manually, you must specify the values in the same order as the
schema of the DataFrame to which they might be appended (we will see this
when we discuss creating DataFrames):
// in Scala
import org.apache.spark.sql.Row
val myRow = Row("Hello", null, 1, false)
# in Python
from pyspark.sql import Row
myRow = Row("Hello", None, 1, False)
Accessing data in rows is equally as easy: you just specify the position that
you would like. In Scala or Java, you must either use the helper methods or
explicitly coerce the values. However, in Python or R, the value will
automatically be coerced into the correct type:
// in Scala
myRow(0) // type Any
myRow(0).asInstanceOf[String] // String
myRow.getString(0) // String
myRow.getInt(2) // Int
# in Python
myRow[0]
myRow[2]
You can also explicitly return a set of Data in the corresponding Java Virtual
Machine (JVM) objects by using the Dataset APIs. This is covered in
Chapter 11.
DataFrame Transformations
Now that we briefly defined the core parts of a DataFrame, we will move
onto manipulating DataFrames. When working with individual DataFrames
there are some fundamental objectives. These break down into several core
operations, as depicted in Figure 5-2:
We can add rows or columns
We can remove rows or columns
We can transform a row into a column (or vice versa)
We can change the order of rows based on the values in columns
Figure 5-2. Different kinds of transformations
Luckily, we can translate all of these into simple transformations, the most
common being those that take one column, change it row by row, and then
return our results.
Creating DataFrames
As we saw previously, we can create DataFrames from raw data sources. This
is covered extensively in Chapter 9; however, we will use them now to create
an example DataFrame (for illustration purposes later in this chapter, we will
also register this as a temporary view so that we can query it with SQL and
show off basic transformations in SQL, as well):
// in Scala
val df = spark.read.format("json")
.load("/data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")
# in Python
df = spark.read.format("json").load("/data/flight-data/json/2015-summary.json")
df.createOrReplaceTempView("dfTable")
We can also create DataFrames on the fly by taking a set of rows and
converting them to a DataFrame.
// in Scala
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructField, StructType, StringType,
LongType}
val myManualSchema = new StructType(Array(
new StructField("some", StringType, true),
new StructField("col", StringType, true),
new StructField("names", LongType, false)))
val myRows = Seq(Row("Hello", null, 1L))
val myRDD = spark.sparkContext.parallelize(myRows)
val myDf = spark.createDataFrame(myRDD, myManualSchema)
myDf.show()
NOTE
In Scala, we can also take advantage of Spark’s implicits in the console (and if you import
them in your JAR code) by running toDF on a Seq type. This does not play well with null
types, so it’s not necessarily recommended for production use cases.
// in Scala
val myDF = Seq(("Hello", 2, 1L)).toDF("col1", "col2", "col3")
# in Python
from pyspark.sql import Row
from pyspark.sql.types import StructField, StructType, StringType, LongType
myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()
Giving an output of:
+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null| 1|
+-----+----+-----+
Now that you know how to create DataFrames, let’s take a look at their most
useful methods that you’re going to be using: the select method when
you’re working with columns or expressions, and the selectExpr method
when you’re working with expressions in strings. Naturally some
transformations are not specified as methods on columns; therefore, there
exists a group of functions found in the org.apache.spark.sql.functions
package.
With these three tools, you should be able to solve the vast majority of
transformation challenges that you might encounter in DataFrames.
select and selectExpr
select and selectExpr allow you to do the DataFrame equivalent of SQL
queries on a table of data:
-- in SQL
SELECT * FROM dataFrameTable
SELECT columnName FROM dataFrameTable
SELECT columnName * 10, otherColumn, someOtherCol as c FROM dataFrameTable
In the simplest possible terms, you can use them to manipulate columns in
your DataFrames. Let’s walk through some examples on DataFrames to talk
about some of the different ways of approaching this problem. The easiest
way is just to use the select method and pass in the column names as strings
with which you would like to work:
// in Scala
df.select("DEST_COUNTRY_NAME").show(2)
# in Python
df.select("DEST_COUNTRY_NAME").show(2)
-- in SQL
SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2
Giving an output of:
+-----------------+
|DEST_COUNTRY_NAME|
+-----------------+
| United States|
| United States|
+-----------------+
You can select multiple columns by using the same style of query, just add
more column name strings to your select method call:
// in Scala
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
# in Python
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)
-- in SQL
SELECT DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME FROM dfTable LIMIT 2
Giving an output of:
+-----------------+-------------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|
+-----------------+-------------------+
| United States| Romania|
| United States| Croatia|
+-----------------+-------------------+
As discussed in “Columns and Expressions”, you can refer to columns in a
number of different ways; all you need to keep in mind is that you can use
them interchangeably:
// in Scala
import org.apache.spark.sql.functions.{expr, col, column}
df.select(
df.col("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"),
'DEST_COUNTRY_NAME,
$"DEST_COUNTRY_NAME",
expr("DEST_COUNTRY_NAME"))
.show(2)
# in Python
from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)
One common error is attempting to mix Column objects and strings. For
example, the following code will result in a compiler error:
df.select(col("DEST_COUNTRY_NAME"), "DEST_COUNTRY_NAME")
As we’ve seen thus far, expr is the most flexible reference that we can use. It
can refer to a plain column or a string manipulation of a column. To illustrate,
let’s change the column name, and then change it back by using the AS
keyword and then the alias method on the column:
// in Scala
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
# in Python
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)
-- in SQL
SELECT DEST_COUNTRY_NAME as destination FROM dfTable LIMIT 2
This changes the column name to “destination.” You can further manipulate
the result of your expression as another expression:
// in Scala
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))
.show(2)
# in Python
df.select(expr("DEST_COUNTRY_NAME as destination").alias("DEST_COUNTRY_NAME"))\
.show(2)
The preceding operation changes the column name back to its original name.
Because select followed by a series of expr is such a common pattern,
Spark has a shorthand for doing this efficiently: selectExpr. This is
probably the most convenient interface for everyday use:
// in Scala
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)
# in Python
df.selectExpr("DEST_COUNTRY_NAME as newColumnName", "DEST_COUNTRY_NAME").show(2)
This opens up the true power of Spark. We can treat selectExpr as a simple
way to build up complex expressions that create new DataFrames. In fact, we
can add any valid non-aggregating SQL statement, and as long as the
columns resolve, it will be valid! Here’s a simple example that adds a new
column withinCountry to our DataFrame that specifies whether the
destination and origin are the same:
// in Scala
df.selectExpr(
"*", // include all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")
.show(2)
# in Python
df.selectExpr(
"*", # all original columns
"(DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry")\
.show(2)
-- in SQL
SELECT *, (DEST_COUNTRY_NAME = ORIGIN_COUNTRY_NAME) as withinCountry
FROM dfTable
LIMIT 2
Giving an output of:
+-----------------+-------------------+-----+-------------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|withinCountry|
+-----------------+-------------------+-----+-------------+
| United States| Romania| 15| false|
| United States| Croatia| 1| false|
+-----------------+-------------------+-----+-------------+
With select expression, we can also specify aggregations over the entire
DataFrame by taking advantage of the functions that we have. These look just
like what we have been showing so far:
// in Scala
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)
# in Python
df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(2)
-- in SQL
SELECT avg(count), count(distinct(DEST_COUNTRY_NAME)) FROM dfTable LIMIT 2
Giving an output of:
+-----------+---------------------------------+
| avg(count)|count(DISTINCT DEST_COUNTRY_NAME)|
+-----------+---------------------------------+
|1770.765625| 132|
+-----------+---------------------------------+
Converting to Spark Types (Literals)
Sometimes, we need to pass explicit values into Spark that are just a value
(rather than a new column). This might be a constant value or something
we’ll need to compare to later on. The way we do this is through literals. This
is basically a translation from a given programming language’s literal value
to one that Spark understands. Literals are expressions and you can use them
in the same way:
// in Scala
import org.apache.spark.sql.functions.lit
df.select(expr("*"), lit(1).as("One")).show(2)
# in Python
from pyspark.sql.functions import lit
df.select(expr("*"), lit(1).alias("One")).show(2)
In SQL, literals are just the specific value:
-- in SQL
SELECT *, 1 as One FROM dfTable LIMIT 2
Giving an output of:
+-----------------+-------------------+-----+---+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|One|
+-----------------+-------------------+-----+---+
| United States| Romania| 15| 1|
| United States| Croatia| 1| 1|
+-----------------+-------------------+-----+---+
This will come up when you might need to check whether a value is greater
than some constant or other programmatically created variable.
Adding Columns
There’s also a more formal way of adding a new column to a DataFrame, and
that’s by using the withColumn method on our DataFrame. For example, let’s
add a column that just adds the number one as a column:
// in Scala
df.withColumn("numberOne", lit(1)).show(2)
# in Python
df.withColumn("numberOne", lit(1)).show(2)
-- in SQL
SELECT *, 1 as numberOne FROM dfTable LIMIT 2
Giving an output of:
+-----------------+-------------------+-----+---------+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|numberOne|
+-----------------+-------------------+-----+---------+
| United States| Romania| 15| 1|
| United States| Croatia| 1| 1|
+-----------------+-------------------+-----+---------+
Let’s do something a bit more interesting and make it an actual expression. In
the next example, we’ll set a Boolean flag for when the origin country is the
same as the destination country:
// in Scala
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
.show(2)
# in Python
df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME ==
DEST_COUNTRY_NAME"))\
.show(2)
Notice that the withColumn function takes two arguments: the column name
and the expression that will create the value for that given row in the
DataFrame. Interestingly, we can also rename a column this way. The SQL
syntax is the same as we had previously, so we can omit it in this example:
df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns
Resulting in:
... DEST_COUNTRY_NAME, ORIGIN_COUNTRY_NAME, count, Destination
Renaming Columns
Although we can rename a column in the manner that we just described,
another alternative is to use the withColumnRenamed method. This will
rename the column with the name of the string in the first argument to the
string in the second argument:
// in Scala
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
# in Python
df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns
... dest, ORIGIN_COUNTRY_NAME, count
Reserved Characters and Keywords
One thing that you might come across is reserved characters like spaces or
dashes in column names. Handling these means escaping column names
appropriately. In Spark, we do this by using backtick (`) characters. Let’s use
withColumn, which you just learned about to create a column with reserved
characters. We’ll show two examples—in the one shown here, we don’t need
escape characters, but in the next one, we do:
// in Scala
import org.apache.spark.sql.functions.expr
val dfWithLongColName = df.withColumn(
"This Long Column-Name",
expr("ORIGIN_COUNTRY_NAME"))
# in Python
dfWithLongColName = df.withColumn(
"This Long Column-Name",
expr("ORIGIN_COUNTRY_NAME"))
We don’t need escape characters here because the first argument to
withColumn is just a string for the new column name. In this example,
however, we need to use backticks because we’re referencing a column in an
expression:
// in Scala
dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`")
.show(2)
# in Python
dfWithLongColName.selectExpr(
"`This Long Column-Name`",
"`This Long Column-Name` as `new col`")\
.show(2)
dfWithLongColName.createOrReplaceTempView("dfTableLong")
-- in SQL
SELECT `This Long Column-Name`, `This Long Column-Name` as `new col`
FROM dfTableLong LIMIT 2
We can refer to columns with reserved characters (and not escape them) if
we’re doing an explicit string-to-column reference, which is interpreted as a
literal instead of an expression. We only need to escape expressions that use
reserved characters or keywords. The following two examples both result in
the same DataFrame:
// in Scala
dfWithLongColName.select(col("This Long Column-Name")).columns
# in Python
dfWithLongColName.select(expr("`This Long Column-Name`")).columns
Case Sensitivity
By default Spark is case insensitive; however, you can make Spark case
sensitive by setting the configuration:
-- in SQL
set spark.sql.caseSensitive true
Removing Columns
Now that we’ve created this column, let’s take a look at how we can remove
columns from DataFrames. You likely already noticed that we can do this by
using select. However, there is also a dedicated method called drop:
df.drop("ORIGIN_COUNTRY_NAME").columns
We can drop multiple columns by passing in multiple columns as arguments:
dfWithLongColName.drop("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME")
Changing a Column’s Type (cast)
Sometimes, we might need to convert from one type to another; for example,
if we have a set of StringType that should be integers. We can convert
columns from one type to another by casting the column from one type to
another. For instance, let’s convert our count column from an integer to a
type Long:
df.withColumn("count2", col("count").cast("long"))
-- in SQL
SELECT *, cast(count as long) AS count2 FROM dfTable
Filtering Rows
To filter rows, we create an expression that evaluates to true or false. You
then filter out the rows with an expression that is equal to false. The most
common way to do this with DataFrames is to create either an expression as a
String or build an expression by using a set of column manipulations. There
are two methods to perform this operation: you can use where or filter and
they both will perform the same operation and accept the same argument
types when used with DataFrames. We will stick to where because of its
familiarity to SQL; however, filter is valid as well.
NOTE
When using the Dataset API from either Scala or Java, filter also accepts an arbitrary
function that Spark will apply to each record in the Dataset. See Chapter 11 for more
information.
The following filters are equivalent, and the results are the same in Scala and
Python:
df.filter(col("count") < 2).show(2)
df.where("count < 2").show(2)
-- in SQL
SELECT * FROM dfTable WHERE count < 2 LIMIT 2
Giving an output of:
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Croatia| 1|
| United States| Singapore| 1|
+-----------------+-------------------+-----+
Instinctually, you might want to put multiple filters into the same expression.
Although this is possible, it is not always useful, because Spark automatically
performs all filtering operations at the same time regardless of the filter
ordering. This means that if you want to specify multiple AND filters, just
chain them sequentially and let Spark handle the rest:
// in Scala
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") =!= "Croatia")
.show(2)
# in Python
df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
.show(2)
-- in SQL
SELECT * FROM dfTable WHERE count < 2 AND ORIGIN_COUNTRY_NAME != "Croatia"
LIMIT 2
Giving an output of:
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Singapore| 1|
| Moldova| United States| 1|
+-----------------+-------------------+-----+
Getting Unique Rows
A very common use case is to extract the unique or distinct values in a
DataFrame. These values can be in one or more columns. The way we do this
is by using the distinct method on a DataFrame, which allows us to
deduplicate any rows that are in that DataFrame. For instance, let’s get the
unique origins in our dataset. This, of course, is a transformation that will
return a new DataFrame with only unique rows:
// in Scala
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
# in Python
df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
-- in SQL
SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable
Results in 256.
// in Scala
df.select("ORIGIN_COUNTRY_NAME").distinct().count()
# in Python
df.select("ORIGIN_COUNTRY_NAME").distinct().count()
-- in SQL
SELECT COUNT(DISTINCT ORIGIN_COUNTRY_NAME) FROM dfTable
Results in 125.
Random Samples
Sometimes, you might just want to sample some random records from your
DataFrame. You can do this by using the sample method on a DataFrame,
which makes it possible for you to specify a fraction of rows to extract from a
DataFrame and whether you’d like to sample with or without replacement:
val seed = 5
val withReplacement = false
val fraction = 0.5
df.sample(withReplacement, fraction, seed).count()
# in Python
seed = 5
withReplacement = False
fraction = 0.5
df.sample(withReplacement, fraction, seed).count()
Giving an output of 126.
Random Splits
Random splits can be helpful when you need to break up your DataFrame
into a random “splits” of the original DataFrame. This is often used with
machine learning algorithms to create training, validation, and test sets. In
this next example, we’ll split our DataFrame into two different DataFrames
by setting the weights by which we will split the DataFrame (these are the
arguments to the function). Because this method is designed to be
randomized, we will also specify a seed (just replace seed with a number of
your choosing in the code block). It’s important to note that if you don’t
specify a proportion for each DataFrame that adds up to one, they will be
normalized so that they do:
// in Scala
val dataFrames = df.randomSplit(Array(0.25, 0.75), seed)
dataFrames(0).count() > dataFrames(1).count() // False
# in Python
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False
Concatenating and Appending Rows (Union)
As you learned in the previous section, DataFrames are immutable. This
means users cannot append to DataFrames because that would be changing it.
To append to a DataFrame, you must union the original DataFrame along
with the new DataFrame. This just concatenates the two DataFramess. To
union two DataFrames, you must be sure that they have the same schema and
number of columns; otherwise, the union will fail.
WARNING
Unions are currently performed based on location, not on the schema. This means that
columns will not automatically line up the way you think they might.
// in Scala
import org.apache.spark.sql.Row
val schema = df.schema
val newRows = Seq(
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L)
)
val parallelizedRows = spark.sparkContext.parallelize(newRows)
val newDF = spark.createDataFrame(parallelizedRows, schema)
df.union(newDF)
.where("count = 1")
.where($"ORIGIN_COUNTRY_NAME" =!= "United States")
.show() // get all of them and we'll see our new rows at the end
In Scala, you must use the =!= operator so that you don’t just compare the
unevaluated column expression to a string but instead to the evaluated one:
# in Python
from pyspark.sql import Row
schema = df.schema
newRows = [
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)
# in Python
df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()
Giving the output of:
+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
| United States| Croatia| 1|
...
| United States| Namibia| 1|
| New Country 2| Other Country 3| 1|
+-----------------+-------------------+-----+
As expected, you’ll need to use this new DataFrame reference in order to
refer to the DataFrame with the newly appended rows. A common way to do
this is to make the DataFrame into a view or register it as a table so that you
can reference it more dynamically in your code.
Sorting Rows
When we sort the values in a DataFrame, we always want to sort with either
the largest or smallest values at the top of a DataFrame. There are two
equivalent operations to do this sort and orderBy that work the exact same
way. They accept both column expressions and strings as well as multiple
columns. The default is to sort in ascending order:
// in Scala
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
# in Python
df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)
To more explicitly specify sort direction, you need to use the asc and desc
functions if operating on a column. These allow you to specify the order in
which a given column should be sorted:
// in Scala
import org.apache.spark.sql.functions.{desc, asc}
df.orderBy(expr("count desc")).show(2)
df.orderBy(desc("count"), asc("DEST_COUNTRY_NAME")).show(2)
# in Python
from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
-- in SQL
SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2
An advanced tip is to use asc_nulls_first, desc_nulls_first,
asc_nulls_last, or desc_nulls_last to specify where you would like
your null values to appear in an ordered DataFrame.
For optimization purposes, it’s sometimes advisable to sort within each
partition before another set of transformations. You can use the
sortWithinPartitions method to do this:
// in Scala
spark.read.format("json").load("/data/flight-data/json/*-summary.json")
.sortWithinPartitions("count")
# in Python
spark.read.format("json").load("/data/flight-data/json/*-summary.json")\
.sortWithinPartitions("count")
We will discuss this more when we look at tuning and optimization in
Part III.
Limit
Oftentimes, you might want to restrict what you extract from a DataFrame;
for example, you might want just the top ten of some DataFrame. You can do
this by using the limit method:
// in Scala
df.limit(5).show()
# in Python
df.limit(5).show()
-- in SQL
SELECT * FROM dfTable LIMIT 6
// in Scala
df.orderBy(expr("count desc")).limit(6).show()
# in Python
df.orderBy(expr("count desc")).limit(6).show()
-- in SQL
SELECT * FROM dfTable ORDER BY count desc LIMIT 6
Repartition and Coalesce
Another important optimization opportunity is to partition the data according
to some frequently filtered columns, which control the physical layout of data
across the cluster including the partitioning scheme and the number of
partitions.
Repartition will incur a full shuffle of the data, regardless of whether one is
necessary. This means that you should typically only repartition when the
future number of partitions is greater than your current number of partitions
or when you are looking to partition by a set of columns:
// in Scala
df.rdd.getNumPartitions // 1
# in Python
df.rdd.getNumPartitions() # 1
// in Scala
df.repartition(5)
# in Python
df.repartition(5)
If you know that you’re going to be filtering by a certain column often, it can
be worth repartitioning based on that column:
// in Scala
df.repartition(col("DEST_COUNTRY_NAME"))
# in Python
df.repartition(col("DEST_COUNTRY_NAME"))
You can optionally specify the number of partitions you would like, too:
// in Scala
df.repartition(5, col("DEST_COUNTRY_NAME"))
# in Python
df.repartition(5, col("DEST_COUNTRY_NAME"))
Coalesce, on the other hand, will not incur a full shuffle and will try to
combine partitions. This operation will shuffle your data into five partitions
based on the destination country name, and then coalesce them (without a full
shuffle):
// in Scala
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
# in Python
df.repartition(5, col("DEST_COUNTRY_NAME")).coalesce(2)
Collecting Rows to the Driver
As discussed in previous chapters, Spark maintains the state of the cluster in
the driver. There are times when you’ll want to collect some of your data to
the driver in order to manipulate it on your local machine.
Thus far, we did not explicitly define this operation. However, we used
several different methods for doing so that are effectively all the same.
collect gets all data from the entire DataFrame, take selects the first N
rows, and show prints out a number of rows nicely.
// in Scala
val collectDF = df.limit(10)
collectDF.take(5) // take works with an Integer count
collectDF.show() // this prints it out nicely
collectDF.show(5, false)
collectDF.collect()
# in Python
collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()
There’s an additional way of collecting rows to the driver in order to iterate
over the entire dataset. The method toLocalIterator collects partitions to
the driver as an iterator. This method allows you to iterate over the entire
dataset partition-by-partition in a serial manner:
collectDF.toLocalIterator()
WARNING
Any collection of data to the driver can be a very expensive operation! If you have a large
dataset and call collect, you can crash the driver. If you use toLocalIterator and have
very large partitions, you can easily crash the driver node and lose the state of your
application. This is also expensive because we can operate on a one-by-one basis, instead
of running computation in parallel.
Conclusion
This chapter covered basic operations on DataFrames. You learned the
simple concepts and tools that you will need to be successful with Spark
DataFrames. Chapter 6 covers in much greater detail all of the different ways
in which you can manipulate the data in those DataFrames.
Chapter 6. Working with
Different Types of Data
Chapter 5 presented basic DataFrame concepts and abstractions. This chapter
covers building expressions, which are the bread and butter of Spark’s
structured operations. We also review working with a variety of different
kinds of data, including the following:
Booleans
Numbers
Strings
Dates and timestamps
Handling null
Complex types
User-defined functions
Where to Look for APIs
Before we begin, it’s worth explaining where you as a user should look for
transformations. Spark is a growing project, and any book (including this
one) is a snapshot in time. One of our priorities in this book is to teach where,
as of this writing, you should look to find functions to transform your data.
Following are the key places to look:
DataFrame (Dataset) Methods
This is actually a bit of a trick because a DataFrame is just a Dataset of
Row types, so you’ll actually end up looking at the Dataset methods,
which are available at this link.
Dataset submodules like DataFrameStatFunctions and
DataFrameNaFunctions have more methods that solve specific sets of
problems. DataFrameStatFunctions, for example, holds a variety of
statistically related functions, whereas DataFrameNaFunctions refers to
functions that are relevant when working with null data.
Column Methods
These were introduced for the most part in Chapter 5. They hold a variety
of general column-related methods like alias or contains. You can find
the API Reference for Column methods here.
org.apache.spark.sql.functions contains a variety of functions for a
range of different data types. Often, you’ll see the entire package imported
because they are used so frequently. You can find SQL and DataFrame
functions here.
Now this may feel a bit overwhelming but have no fear, the majority of these
functions are ones that you will find in SQL and analytics systems. All of
these tools exist to achieve one purpose, to transform rows of data in one
format or structure to another. This might create more rows or reduce the
number of rows available. To begin, let’s read in the DataFrame that we’ll be
using for this analysis:
// in Scala
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")
# in Python
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/data/retail-data/by-day/2010-12-01.csv")
df.printSchema()
df.createOrReplaceTempView("dfTable")
Here’s the result of the schema and a small sample of the data:
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: timestamp (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: double (nullable = true)
|-- Country: string (nullable = true)
+---------+---------+--------------------+--------+-------------------+----...
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|Unit...
+---------+---------+--------------------+--------+-------------------+----...
| 536365| 85123A|WHITE HANGING HEA...| 6|2010-12-01 08:26:00| ...
| 536365| 71053| WHITE METAL LANTERN| 6|2010-12-01 08:26:00| ...
...
| 536367| 21755|LOVE BUILDING BLO...| 3|2010-12-01 08:34:00| ...
| 536367| 21777|RECIPE BOX WITH M...| 4|2010-12-01 08:34:00| ...
+---------+---------+--------------------+--------+-------------------+----...
Converting to Spark Types
One thing you’ll see us do throughout this chapter is convert native types to
Spark types. We do this by using the first function that we introduce here, the
lit function. This function converts a type in another language to its
correspnding Spark representation. Here’s how we can convert a couple of
different kinds of Scala and Python values to their respective Spark types:
// in Scala
import org.apache.spark.sql.functions.lit
df.select(lit(5), lit("five"), lit(5.0))
# in Python
from pyspark.sql.functions import lit
df.select(lit(5), lit("five"), lit(5.0))
There’s no equivalent function necessary in SQL, so we can use the values
directly:
-- in SQL
SELECT 5, "five", 5.0
Working with Booleans
Booleans are essential when it comes to data analysis because they are the
foundation for all filtering. Boolean statements consist of four elements: and,
or, true, and false. We use these simple structures to build logical statements
that evaluate to either true or false. These statements are often used as
conditional requirements for when a row of data must either pass the test
(evaluate to true) or else it will be filtered out.
Let’s use our retail dataset to explore working with Booleans. We can specify
equality as well as less-than or greater-than:
// in Scala
import org.apache.spark.sql.functions.col
df.where(col("InvoiceNo").equalTo(536365))
.select("InvoiceNo", "Description")
.show(5, false)
WARNING
Scala has some particular semantics regarding the use of == and ===. In Spark, if you want
to filter by equality you should use === (equal) or =!= (not equal). You can also use the
not function and the equalTo method.
// in Scala
import org.apache.spark.sql.functions.col
df.where(col("InvoiceNo") === 536365)
.select("InvoiceNo", "Description")
.show(5, false)
Python keeps a more conventional notation:
# in Python
from pyspark.sql.functions import col
df.where(col("InvoiceNo") != 536365)\
.select("InvoiceNo", "Description")\
.show(5, False)
+---------+-----------------------------+
|InvoiceNo|Description |
+---------+-----------------------------+
|536366 |HAND WARMER UNION JACK |
...
|536367 |POPPY'S PLAYHOUSE KITCHEN |
+---------+-----------------------------+
Another option—and probably the cleanest—is to specify the predicate as an
expression in a string. This is valid for Python or Scala. Note that this also
gives you access to another way of expressing “does not equal”:
df.where("InvoiceNo = 536365")
.show(5, false)
df.where("InvoiceNo <> 536365")
.show(5, false)
We mentioned that you can specify Boolean expressions with multiple parts
when you use and or or. In Spark, you should always chain together and
filters as a sequential filter.
The reason for this is that even if Boolean statements are expressed serially
(one after the other), Spark will flatten all of these filters into one statement
and perform the filter at the same time, creating the and statement for us.
Although you can specify your statements explicitly by using and if you like,
they’re often easier to understand and to read if you specify them serially. or
statements need to be specified in the same statement:
// in Scala
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.where(col("StockCode").isin("DOT")).where(priceFilter.or(descripFilter))
.show()
# in Python
from pyspark.sql.functions import instr
priceFilter = col("UnitPrice") > 600
descripFilter = instr(df.Description, "POSTAGE") >= 1
df.where(df.StockCode.isin("DOT")).where(priceFilter | descripFilter).show()
-- in SQL
SELECT * FROM dfTable WHERE StockCode in ("DOT") AND(UnitPrice > 600 OR
instr(Description, "POSTAGE") >= 1)
+---------+---------+--------------+--------+-------------------+---------+...
|InvoiceNo|StockCode| Description|Quantity| InvoiceDate|UnitPrice|...
+---------+---------+--------------+--------+-------------------+---------+...
| 536544| DOT|DOTCOM POSTAGE| 1|2010-12-01 14:32:00| 569.77|...
| 536592| DOT|DOTCOM POSTAGE| 1|2010-12-01 17:06:00| 607.49|...
+---------+---------+--------------+--------+-------------------+---------+...
Boolean expressions are not just reserved to filters. To filter a DataFrame,
you can also just specify a Boolean column:
// in Scala
val DOTCodeFilter = col("StockCode") === "DOT"
val priceFilter = col("UnitPrice") > 600
val descripFilter = col("Description").contains("POSTAGE")
df.withColumn("isExpensive", DOTCodeFilter.and(priceFilter.or(descripFilter)))
.where("isExpensive")
.select("unitPrice", "isExpensive").show(5)
# in Python
from pyspark.sql.functions import instr
DOTCodeFilter = col("StockCode") == "DOT"
priceFilter = col("UnitPrice") > 600
descripFilter = instr(col("Description"), "POSTAGE") >= 1
df.withColumn("isExpensive", DOTCodeFilter & (priceFilter | descripFilter))\
.where("isExpensive")\
.select("unitPrice", "isExpensive").show(5)
-- in SQL
SELECT UnitPrice, (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1)) as isExpensive
FROM dfTable
WHERE (StockCode = 'DOT' AND
(UnitPrice > 600 OR instr(Description, "POSTAGE") >= 1))
Notice how we did not need to specify our filter as an expression and how we
could use a column name without any extra work.
If you’re coming from a SQL background, all of these statements should
seem quite familiar. Indeed, all of them can be expressed as a where clause.
In fact, it’s often easier to just express filters as SQL statements than using
the programmatic DataFrame interface and Spark SQL allows us to do this
without paying any performance penalty. For example, the following two
statements are equivalent:
// in Scala
import org.apache.spark.sql.functions.{expr, not, col}
df.withColumn("isExpensive", not(col("UnitPrice").leq(250)))
.filter("isExpensive")
.select("Description", "UnitPrice").show(5)
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))
.filter("isExpensive")
.select("Description", "UnitPrice").show(5)
Here’s our state definition:
# in Python
from pyspark.sql.functions import expr
df.withColumn("isExpensive", expr("NOT UnitPrice <= 250"))\
.where("isExpensive")\
.select("Description", "UnitPrice").show(5)
WARNING
One “gotcha” that can come up is if you’re working with null data when creating Boolean
expressions. If there is a null in your data, you’ll need to treat things a bit differently.
Here’s how you can ensure that you perform a null-safe equivalence test:
df.where(col("Description").eqNullSafe("hello")).show()
Although not currently available (Spark 2.2), IS [NOT] DISTINCT FROM will
be coming in Spark 2.3 to do the same thing in SQL.
Working with Numbers
When working with big data, the second most common task you will do after
filtering things is counting things. For the most part, we simply need to
express our computation, and that should be valid assuming that we’re
working with numerical data types.
To fabricate a contrived example, let’s imagine that we found out that we
mis-recorded the quantity in our retail dataset and the true quantity is equal to
(the current quantity * the unit price) + 5. This will introduce our first
numerical function as well as the pow function that raises a column to the
expressed power:
// in Scala
import org.apache.spark.sql.functions.{expr, pow}
val fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)
# in Python
from pyspark.sql.functions import expr, pow
fabricatedQuantity = pow(col("Quantity") * col("UnitPrice"), 2) + 5
df.select(expr("CustomerId"), fabricatedQuantity.alias("realQuantity")).show(2)
+----------+------------------+
|CustomerId| realQuantity|
+----------+------------------+
| 17850.0|239.08999999999997|
| 17850.0| 418.7156|
+----------+------------------+
Notice that we were able to multiply our columns together because they were
both numerical. Naturally we can add and subtract as necessary, as well. In
fact, we can do all of this as a SQL expression, as well:
// in Scala
df.selectExpr(
"CustomerId",
"(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)
# in Python
2
df.selectExpr(
"CustomerId",
"(POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity").show(2)
-- in SQL
SELECT customerId, (POWER((Quantity * UnitPrice), 2.0) + 5) as realQuantity
FROM dfTable
Another common numerical task is rounding. If you’d like to just round to a
whole number, oftentimes you can cast the value to an integer and that will
work just fine. However, Spark also has more detailed functions for
performing this explicitly and to a certain level of precision. In the following
example, we round to one decimal place:
// in Scala
import org.apache.spark.sql.functions.{round, bround}
df.select(round(col("UnitPrice"), 1).alias("rounded"), col("UnitPrice")).show(5)
By default, the round function rounds up if you’re exactly in between two
numbers. You can round down by using the bround:
// in Scala
import org.apache.spark.sql.functions.lit
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
# in Python
from pyspark.sql.functions import lit, round, bround
df.select(round(lit("2.5")), bround(lit("2.5"))).show(2)
-- in SQL
SELECT round(2.5), bround(2.5)
+-------------+--------------+
|round(2.5, 0)|bround(2.5, 0)|
+-------------+--------------+
| 3.0| 2.0|
| 3.0| 2.0|
+-------------+--------------+
Another numerical task is to compute the correlation of two columns. For
example, we can see the Pearson correlation coefficient for two columns to
see if cheaper things are typically bought in greater quantities. We can do this
through a function as well as through the DataFrame statistic methods:
// in Scala
import org.apache.spark.sql.functions.{corr}
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show()
# in Python
from pyspark.sql.functions import corr
df.stat.corr("Quantity", "UnitPrice")
df.select(corr("Quantity", "UnitPrice")).show()
-- in SQL
SELECT corr(Quantity, UnitPrice) FROM dfTable
+-------------------------+
|corr(Quantity, UnitPrice)|
+-------------------------+
| -0.04112314436835551|
+-------------------------+
Another common task is to compute summary statistics for a column or set of
columns. We can use the describe method to achieve exactly this. This will
take all numeric columns and calculate the count, mean, standard deviation,
min, and max. You should use this primarily for viewing in the console
because the schema might change in the future:
// in Scala
df.describe().show()
# in Python
df.describe().show()
+-------+------------------+------------------+------------------+
|summary| Quantity| UnitPrice| CustomerID|
+-------+------------------+------------------+------------------+
| count| 3108| 3108| 1968|
| mean| 8.627413127413128| 4.151946589446603|15661.388719512195|
| stddev|26.371821677029203|15.638659854603892|1854.4496996893627|
| min| -24| 0.0| 12431.0|
| max| 600| 607.49| 18229.0|
+-------+------------------+------------------+------------------+
If you need these exact numbers, you can also perform this as an aggregation
yourself by importing the functions and applying them to the columns that
you need:
// in Scala
import org.apache.spark.sql.functions.{count, mean, stddev_pop, min, max}
# in Python
from pyspark.sql.functions import count, mean, stddev_pop, min, max
There are a number of statistical functions available in the StatFunctions
Package (accessible using stat as we see in the code block below). These are
DataFrame methods that you can use to calculate a variety of different things.
For instance, you can calculate either exact or approximate quantiles of your
data using the approxQuantile method:
// in Scala
val colName = "UnitPrice"
val quantileProbs = Array(0.5)
val relError = 0.05
df.stat.approxQuantile("UnitPrice", quantileProbs, relError) // 2.51
# in Python
colName = "UnitPrice"
quantileProbs = [0.5]
relError = 0.05
df.stat.approxQuantile("UnitPrice", quantileProbs, relError) # 2.51
You also can use this to see a cross-tabulation or frequent item pairs (be
careful, this output will be large and is omitted for this reason):
// in Scala
df.stat.crosstab("StockCode", "Quantity").show()
# in Python
df.stat.crosstab("StockCode", "Quantity").show()
// in Scala
df.stat.freqItems(Seq("StockCode", "Quantity")).show()
# in Python
df.stat.freqItems(["StockCode", "Quantity"]).show()
As a last note, we can also add a unique ID to each row by using the function
monotonically_increasing_id. This function generates a unique value for
each row, starting with 0:
// in Scala
import org.apache.spark.sql.functions.monotonically_increasing_id
df.select(monotonically_increasing_id()).show(2)
# in Python
from pyspark.sql.functions import monotonically_increasing_id
df.select(monotonically_increasing_id()).show(2)
There are functions added with every release, so check the documentation for
more methods. For instance, there are some random data generation tools
(e.g., rand(), randn()) with which you can randomly generate data;
however, there are potential determinism issues when doing so. (You can find
discussions about these challenges on the Spark mailing list.) There are also a
number of more advanced tasks like bloom filtering and sketching algorithms
available in the stat package that we mentioned (and linked to) at the
beginning of this chapter. Be sure to search the API documentation for more
information and functions.
Working with Strings
String manipulation shows up in nearly every data flow, and it’s worth
explaining what you can do with strings. You might be manipulating log files
performing regular expression extraction or substitution, or checking for
simple string existence, or making all strings uppercase or lowercase.
Let’s begin with the last task because it’s the most straightforward. The
initcap function will capitalize every word in a given string when that word
is separated from another by a space.
// in Scala
import org.apache.spark.sql.functions.{initcap}
df.select(initcap(col("Description"))).show(2, false)
# in Python
from pyspark.sql.functions import initcap
df.select(initcap(col("Description"))).show()
-- in SQL
SELECT initcap(Description) FROM dfTable
+----------------------------------+
|initcap(Description) |
+----------------------------------+
|White Hanging Heart T-light Holder|
|White Metal Lantern |
+----------------------------------+
As just mentioned, you can cast strings in uppercase and lowercase, as well:
// in Scala
import org.apache.spark.sql.functions.{lower, upper}
df.select(col("Description"),
lower(col("Description")),
upper(lower(col("Description")))).show(2)
# in Python
from pyspark.sql.functions import lower, upper
df.select(col("Description"),
lower(col("Description")),
upper(lower(col("Description")))).show(2)
-- in SQL
SELECT Description, lower(Description), Upper(lower(Description)) FROM dfTable
+--------------------+--------------------+-------------------------+
| Description| lower(Description)|upper(lower(Description))|
+--------------------+--------------------+-------------------------+
|WHITE HANGING HEA...|white hanging hea...| WHITE HANGING HEA...|
| WHITE METAL LANTERN| white metal lantern| WHITE METAL LANTERN|
+--------------------+--------------------+-------------------------+
Another trivial task is adding or removing spaces around a string. You can do
this by using lpad, ltrim, rpad and rtrim, trim:
// in Scala
import org.apache.spark.sql.functions.{lit, ltrim, rtrim, rpad, lpad, trim}
df.select(
ltrim(lit(" HELLO ")).as("ltrim"),
rtrim(lit(" HELLO ")).as("rtrim"),
trim(lit(" HELLO ")).as("trim"),
lpad(lit("HELLO"), 3, " ").as("lp"),
rpad(lit("HELLO"), 10, " ").as("rp")).show(2)
# in Python
from pyspark.sql.functions import lit, ltrim, rtrim, rpad, lpad, trim
df.select(
ltrim(lit(" HELLO ")).alias("ltrim"),
rtrim(lit(" HELLO ")).alias("rtrim"),
trim(lit(" HELLO ")).alias("trim"),
lpad(lit("HELLO"), 3, " ").alias("lp"),
rpad(lit("HELLO"), 10, " ").alias("rp")).show(2)
-- in SQL
SELECT
ltrim(' HELLLOOOO '),
rtrim(' HELLLOOOO '),
trim(' HELLLOOOO '),
lpad('HELLOOOO ', 3, ' '),
rpad('HELLOOOO ', 10, ' ')
FROM dfTable
+---------+---------+-----+---+----------+
| ltrim| rtrim| trim| lp| rp|
+---------+---------+-----+---+----------+
|HELLO | HELLO|HELLO| HE|HELLO |
|HELLO | HELLO|HELLO| HE|HELLO |
+---------+---------+-----+---+----------+
Note that if lpad or rpad takes a number less than the length of the string, it
will always remove values from the right side of the string.
Regular Expressions
Probably one of the most frequently performed tasks is searching for the
existence of one string in another or replacing all mentions of a string with
another value. This is often done with a tool called regular expressions that
exists in many programming languages. Regular expressions give the user an
ability to specify a set of rules to use to either extract values from a string or
replace them with some other values.
Spark takes advantage of the complete power of Java regular expressions.
The Java regular expression syntax departs slightly from other programming
languages, so it is worth reviewing before putting anything into production.
There are two key functions in Spark that you’ll need in order to perform
regular expression tasks: regexp_extract and regexp_replace. These
functions extract values and replace values, respectively.
Let’s explore how to use the regexp_replace function to replace substitute
color names in our description column:
// in Scala
import org.apache.spark.sql.functions.regexp_replace
val simpleColors = Seq("black", "white", "red", "green", "blue")
val regexString = simpleColors.map(_.toUpperCase).mkString("|")
// the | signifies `OR` in regular expression syntax
df.select(
regexp_replace(col("Description"), regexString, "COLOR").alias("color_clean"),
col("Description")).show(2)
# in Python
from pyspark.sql.functions import regexp_replace
regex_string = "BLACK|WHITE|RED|GREEN|BLUE"
df.select(
regexp_replace(col("Description"), regex_string,
"COLOR").alias("color_clean"),
col("Description")).show(2)
-- in SQL
SELECT
regexp_replace(Description, 'BLACK|WHITE|RED|GREEN|BLUE', 'COLOR') as
color_clean, Description
FROM dfTable
+--------------------+--------------------+
| color_clean| Description|
+--------------------+--------------------+
|COLOR HANGING HEA...|WHITE HANGING HEA...|
| COLOR METAL LANTERN| WHITE METAL LANTERN|
+--------------------+--------------------+
Another task might be to replace given characters with other characters.
Building this as a regular expression could be tedious, so Spark also provides
the translate function to replace these values. This is done at the character
level and will replace all instances of a character with the indexed character
in the replacement string:
// in Scala
import org.apache.spark.sql.functions.translate
df.select(translate(col("Description"), "LEET", "1337"), col("Description"))
.show(2)
# in Python
from pyspark.sql.functions import translate
df.select(translate(col("Description"), "LEET", "1337"),col("Description"))\
.show(2)
-- in SQL
SELECT translate(Description, 'LEET', '1337'), Description FROM dfTable
+----------------------------------+--------------------+
|translate(Description, LEET, 1337)| Description|
+----------------------------------+--------------------+
| WHI73 HANGING H3A...|WHITE HANGING HEA...|
| WHI73 M37A1 1AN73RN| WHITE METAL LANTERN|
+----------------------------------+--------------------+
We can also perform something similar, like pulling out the first mentioned
color:
// in Scala
import org.apache.spark.sql.functions.regexp_extract
val regexString = simpleColors.map(_.toUpperCase).mkString("(", "|", ")")
// the | signifies OR in regular expression syntax
df.select(
regexp_extract(col("Description"), regexString, 1).alias("color_clean"),
col("Description")).show(2)
# in Python
from pyspark.sql.functions import regexp_extract
extract_str = "(BLACK|WHITE|RED|GREEN|BLUE)"
df.select(
regexp_extract(col("Description"), extract_str, 1).alias("color_clean"),
col("Description")).show(2)
-- in SQL
SELECT regexp_extract(Description, '(BLACK|WHITE|RED|GREEN|BLUE)', 1),
Description
FROM dfTable
+-------------+--------------------+
| color_clean| Description|
+-------------+--------------------+
| WHITE|WHITE HANGING HEA...|
| WHITE| WHITE METAL LANTERN|
+-------------+--------------------+
Sometimes, rather than extracting values, we simply want to check for their
existence. We can do this with the contains method on each column. This
will return a Boolean declaring whether the value you specify is in the
column’s string:
// in Scala
val containsBlack = col("Description").contains("BLACK")
val containsWhite = col("DESCRIPTION").contains("WHITE")
df.withColumn("hasSimpleColor", containsBlack.or(containsWhite))
.where("hasSimpleColor")
.select("Description").show(3, false)
In Python and SQL, we can use the instr function:
# in Python
from pyspark.sql.functions import instr
containsBlack = instr(col("Description"), "BLACK") >= 1
containsWhite = instr(col("Description"), "WHITE") >= 1
df.withColumn("hasSimpleColor", containsBlack | containsWhite)\
.where("hasSimpleColor")\
.select("Description").show(3, False)
-- in SQL
SELECT Description FROM dfTable
WHERE instr(Description, 'BLACK') >= 1 OR instr(Description, 'WHITE') >= 1
+----------------------------------+
|Description |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN |
|RED WOOLLY HOTTIE WHITE HEART. |
+----------------------------------+
This is trivial with just two values, but it becomes more complicated when
there are values.
Let’s work through this in a more rigorous way and take advantage of
Spark’s ability to accept a dynamic number of arguments. When we convert a
list of values into a set of arguments and pass them into a function, we use a
language feature called varargs. Using this feature, we can effectively
unravel an array of arbitrary length and pass it as arguments to a function.
This, coupled with select makes it possible for us to create arbitrary
numbers of columns dynamically:
// in Scala
val simpleColors = Seq("black", "white", "red", "green", "blue")
val selectedColumns = simpleColors.map(color => {
col("Description").contains(color.toUpperCase).alias(s"is_$color")
}):+expr("*") // could also append this value
df.select(selectedColumns:_*).where(col("is_white").or(col("is_red")))
.select("Description").show(3, false)
+----------------------------------+
|Description |
+----------------------------------+
|WHITE HANGING HEART T-LIGHT HOLDER|
|WHITE METAL LANTERN |
|RED WOOLLY HOTTIE WHITE HEART. |
+----------------------------------+
We can also do this quite easily in Python. In this case, we’re going to use a
different function, locate, that returns the integer location (1 based location).
We then convert that to a Boolean before using it as the same basic feature:
# in Python
from pyspark.sql.functions import expr, locate
simpleColors = ["black", "white", "red", "green", "blue"]
def color_locator(column, color_string):
return locate(color_string.upper(), column)\
.cast("boolean")\
.alias("is_" + c)
selectedColumns = [color_locator(df.Description, c) for c in simpleColors]
selectedColumns.append(expr("*")) # has to a be Column type
df.select(*selectedColumns).where(expr("is_white OR is_red"))\
.select("Description").show(3, False)
This simple feature can often help you programmatically generate columns or
Boolean filters in a way that is simple to understand and extend. We could
extend this to calculating the smallest common denominator for a given input
value, or whether a number is a prime.
Working with Dates and Timestamps
Dates and times are a constant challenge in programming languages and
databases. It’s always necessary to keep track of timezones and ensure that
formats are correct and valid. Spark does its best to keep things simple by
focusing explicitly on two kinds of time-related information. There are dates,
which focus exclusively on calendar dates, and timestamps, which include
both date and time information. Spark, as we saw with our current dataset,
will make a best effort to correctly identify column types, including dates and
timestamps when we enable inferSchema. We can see that this worked quite
well with our current dataset because it was able to identify and read our date
format without us having to provide some specification for it.
As we hinted earlier, working with dates and timestamps closely relates to
working with strings because we often store our timestamps or dates as
strings and convert them into date types at runtime. This is less common
when working with databases and structured data but much more common
when we are working with text and CSV files. We will experiment with that
shortly.
WARNING
There are a lot of caveats, unfortunately, when working with dates and timestamps,
especially when it comes to timezone handling. In version 2.1 and before, Spark parsed
according to the machine’s timezone if timezones are not explicitly specified in the value
that you are parsing. You can set a session local timezone if necessary by setting
spark.conf.sessionLocalTimeZone in the SQL configurations. This should be set
according to the Java TimeZone format.
df.printSchema()
root
|-- InvoiceNo: string (nullable = true)
|-- StockCode: string (nullable = true)
|-- Description: string (nullable = true)
|-- Quantity: integer (nullable = true)
|-- InvoiceDate: timestamp (nullable = true)
|-- UnitPrice: double (nullable = true)
|-- CustomerID: double (nullable = true)
|-- Country: string (nullable = true)
Although Spark will do read dates or times on a best-effort basis. However,
sometimes there will be no getting around working with strangely formatted
dates and times. The key to understanding the transformations that you are
going to need to apply is to ensure that you know exactly what type and
format you have at each given step of the way. Another common “gotcha” is
that Spark’s TimestampType class supports only second-level precision,
which means that if you’re going to be working with milliseconds or
microseconds, you’ll need to work around this problem by potentially
operating on them as longs. Any more precision when coercing to a
TimestampType will be removed.
Spark can be a bit particular about what format you have at any given point in
time. It’s important to be explicit when parsing or converting to ensure that
there are no issues in doing so. At the end of the day, Spark is working with
Java dates and timestamps and therefore conforms to those standards. Let’s
begin with the basics and get the current date and the current timestamps:
// in Scala
import org.apache.spark.sql.functions.{current_date, current_timestamp}
val dateDF = spark.range(10)
.withColumn("today", current_date())
.withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
# in Python
from pyspark.sql.functions import current_date, current_timestamp
dateDF = spark.range(10)\
.withColumn("today", current_date())\
.withColumn("now", current_timestamp())
dateDF.createOrReplaceTempView("dateTable")
dateDF.printSchema()
root
|-- id: long (nullable = false)
|-- today: date (nullable = false)
|-- now: timestamp (nullable = false)
Now that we have a simple DataFrame to work with, let’s add and subtract
five days from today. These functions take a column and then the number of
days to either add or subtract as the arguments:
// in Scala
import org.apache.spark.sql.functions.{date_add, date_sub}
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)
# in Python
from pyspark.sql.functions import date_add, date_sub
dateDF.select(date_sub(col("today"), 5), date_add(col("today"), 5)).show(1)
-- in SQL
SELECT date_sub(today, 5), date_add(today, 5) FROM dateTable
+------------------+------------------+
|date_sub(today, 5)|date_add(today, 5)|
+------------------+------------------+
| 2017-06-12| 2017-06-22|
+------------------+------------------+
Another common task is to take a look at the difference between two dates.
We can do this with the datediff function that will return the number of
days in between two dates. Most often we just care about the days, and
because the number of days varies from month to month, there also exists a
function, months_between, that gives you the number of months between
two dates:
// in Scala
import org.apache.spark.sql.functions.{datediff, months_between, to_date}
dateDF.withColumn("week_ago", date_sub(col("today"), 7))
.select(datediff(col("week_ago"), col("today"))).show(1)
dateDF.select(
to_date(lit("2016-01-01")).alias("start"),
to_date(lit("2017-05-22")).alias("end"))
.select(months_between(col("start"), col("end"))).show(1)
# in Python
from pyspark.sql.functions import datediff, months_between, to_date
dateDF.withColumn("week_ago", date_sub(col("today"), 7))\
.select(datediff(col("week_ago"), col("today"))).show(1)
dateDF.select(
to_date(lit("2016-01-01")).alias("start"),
to_date(lit("2017-05-22")).alias("end"))\
.select(months_between(col("start"), col("end"))).show(1)
-- in SQL
SELECT to_date('2016-01-01'), months_between('2016-01-01', '2017-01-01'),
datediff('2016-01-01', '2017-01-01')
FROM dateTable
+-------------------------+
|datediff(week_ago, today)|
+-------------------------+
| -7|
+-------------------------+
+--------------------------+
|months_between(start, end)|
+--------------------------+
| -16.67741935|
+--------------------------+
Notice that we introduced a new function: the to_date function. The
to_date function allows you to convert a string to a date, optionally with a
specified format. We specify our format in the Java SimpleDateFormat which
will be important to reference if you use this function:
// in Scala
import org.apache.spark.sql.functions.{to_date, lit}
spark.range(5).withColumn("date", lit("2017-01-01"))
.select(to_date(col("date"))).show(1)
# in Python
from pyspark.sql.functions import to_date, lit
spark.range(5).withColumn("date", lit("2017-01-01"))\
.select(to_date(col("date"))).show(1)
Spark will not throw an error if it cannot parse the date; rather, it will just
return null. This can be a bit tricky in larger pipelines because you might be
expecting your data in one format and getting it in another. To illustrate, let’s
take a look at the date format that has switched from year-month-day to year-
day-month. Spark will fail to parse this date and silently return null instead:
dateDF.select(to_date(lit("2016-20-12")),to_date(lit("2017-12-11"))).show(1)
+-------------------+-------------------+
|to_date(2016-20-12)|to_date(2017-12-11)|
+-------------------+-------------------+
| null| 2017-12-11|
+-------------------+-------------------+
We find this to be an especially tricky situation for bugs because some dates
might match the correct format, whereas others do not. In the previous
example, notice how the second date appears as Decembers 11th instead of
the correct day, November 12th. Spark doesn’t throw an error because it
cannot know whether the days are mixed up or that specific row is incorrect.
Let’s fix this pipeline, step by step, and come up with a robust way to avoid
these issues entirely. The first step is to remember that we need to specify our
date format according to the Java SimpleDateFormat standard.
We will use two functions to fix this: to_date and to_timestamp. The
former optionally expects a format, whereas the latter requires one:
// in Scala
import org.apache.spark.sql.functions.to_date
val dateFormat = "yyyy-dd-MM"
val cleanDateDF = spark.range(1).select(
to_date(lit("2017-12-11"), dateFormat).alias("date"),
to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.createOrReplaceTempView("dateTable2")
# in Python
from pyspark.sql.functions import to_date
dateFormat = "yyyy-dd-MM"
cleanDateDF = spark.range(1).select(
to_date(lit("2017-12-11"), dateFormat).alias("date"),
to_date(lit("2017-20-12"), dateFormat).alias("date2"))
cleanDateDF.createOrReplaceTempView("dateTable2")
-- in SQL
SELECT to_date(date, 'yyyy-dd-MM'), to_date(date2, 'yyyy-dd-MM'), to_date(date)
FROM dateTable2
+----------+----------+
| date| date2|
+----------+----------+
|2017-11-12|2017-12-20|
+----------+----------+
Now let’s use an example of to_timestamp, which always requires a format
to be specified:
// in Scala
import org.apache.spark.sql.functions.to_timestamp
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()
# in Python
from pyspark.sql.functions import to_timestamp
cleanDateDF.select(to_timestamp(col("date"), dateFormat)).show()
-- in SQL
SELECT to_timestamp(date, 'yyyy-dd-MM'), to_timestamp(date2, 'yyyy-dd-MM')
FROM dateTable2
+----------------------------------+
|to_timestamp(`date`, 'yyyy-dd-MM')|
+----------------------------------+
| 2017-11-12 00:00:00|
+----------------------------------+
Casting between dates and timestamps is simple in all languages—in SQL,
we would do it in the following way:
-- in SQL
SELECT cast(to_date("2017-01-01", "yyyy-dd-MM") as timestamp)
After we have our date or timestamp in the correct format and type,
comparing between them is actually quite easy. We just need to be sure to
either use a date/timestamp type or specify our string according to the right
format of yyyy-MM-dd if we’re comparing a date:
cleanDateDF.filter(col("date2") > lit("2017-12-12")).show()
One minor point is that we can also set this as a string, which Spark parses to
a literal:
cleanDateDF.filter(col("date2") > "'2017-12-12'").show()
WARNING
Implicit type casting is an easy way to shoot yourself in the foot, especially when dealing
with null values or dates in different timezones or formats. We recommend that you parse
them explicitly instead of relying on implicit conversions.
Working with Nulls in Data
As a best practice, you should always use nulls to represent missing or empty
data in your DataFrames. Spark can optimize working with null values more
than it can if you use empty strings or other values. The primary way of
interacting with null values, at DataFrame scale, is to use the .na subpackage
on a DataFrame. There are also several functions for performing operations
and explicitly specifying how Spark should handle null values. For more
information, see Chapter 5 (where we discuss ordering), and also refer back
to “Working with Booleans”.
WARNING
Nulls are a challenging part of all programming, and Spark is no exception. In our opinion,
being explicit is always better than being implicit when handling null values. For instance,
in this part of the book, we saw how we can define columns as having null types.
However, this comes with a catch. When we declare a column as not having a null time,
that is not actually enforced. To reiterate, when you define a schema in which all columns
are declared to not have null values, Spark will not enforce that and will happily let null
values into that column. The nullable signal is simply to help Spark SQL optimize for
handling that column. If you have null values in columns that should not have null values,
you can get an incorrect result or see strange exceptions that can be difficult to debug.
There are two things you can do with null values: you can explicitly drop
nulls or you can fill them with a value (globally or on a per-column basis).
Let’s experiment with each of these now.
Coalesce
Spark includes a function to allow you to select the first non-null value from
a set of columns by using the coalesce function. In this case, there are no
null values, so it simply returns the first column:
// in Scala
import org.apache.spark.sql.functions.coalesce
df.select(coalesce(col("Description"), col("CustomerId"))).show()
# in Python
from pyspark.sql.functions import coalesce
df.select(coalesce(col("Description"), col("CustomerId"))).show()
ifnull, nullIf, nvl, and nvl2
There are several other SQL functions that you can use to achieve similar
things. ifnull allows you to select the second value if the first is null, and
defaults to the first. Alternatively, you could use nullif, which returns null
if the two values are equal or else returns the second if they are not. nvl
returns the second value if the first is null, but defaults to the first. Finally,
nvl2 returns the second value if the first is not null; otherwise, it will return
the last specified value (else_value in the following example):
-- in SQL
SELECT
ifnull(null, 'return_value'),
nullif('value', 'value'),
nvl(null, 'return_value'),
nvl2('not_null', 'return_value', "else_value")
FROM dfTable LIMIT 1
+------------+----+------------+------------+
| a| b| c| d|
+------------+----+------------+------------+
|return_value|null|return_value|return_value|
+------------+----+------------+------------+
Naturally, we can use these in select expressions on DataFrames, as well.
drop
The simplest function is drop, which removes rows that contain nulls. The
default is to drop any row in which any value is null:
df.na.drop()
df.na.drop("any")
In SQL, we have to do this column by column:
-- in SQL
SELECT * FROM dfTable WHERE Description IS NOT NULL
Specifying "any" as an argument drops a row if any of the values are null.
Using “all” drops the row only if all values are null or NaN for that row:
df.na.drop("all")
We can also apply this to certain sets of columns by passing in an array of
columns:
// in Scala
df.na.drop("all", Seq("StockCode", "InvoiceNo"))
# in Python
df.na.drop("all", subset=["StockCode", "InvoiceNo"])
fill
Using the fill function, you can fill one or more columns with a set of
values. This can be done by specifying a map—that is a particular value and a
set of columns.
For example, to fill all null values in columns of type String, you might
specify the following:
df.na.fill("All Null values become this string")
We could do the same for columns of type Integer by using
df.na.fill(5:Integer), or for Doubles df.na.fill(5:Double). To
specify columns, we just pass in an array of column names like we did in the
previous example:
// in Scala
df.na.fill(5, Seq("StockCode", "InvoiceNo"))
# in Python
df.na.fill("all", subset=["StockCode", "InvoiceNo"])
We can also do this with with a Scala Map, where the key is the column name
and the value is the value we would like to use to fill null values:
// in Scala
val fillColValues = Map("StockCode" -> 5, "Description" -> "No Value")
df.na.fill(fillColValues)
# in Python
fill_cols_vals = {"StockCode": 5, "Description" : "No Value"}
df.na.fill(fill_cols_vals)
replace
In addition to replacing null values like we did with drop and fill, there are
more flexible options that you can use with more than just null values.
Probably the most common use case is to replace all values in a certain
column according to their current value. The only requirement is that this
value be the same type as the original value:
// in Scala
df.na.replace("Description", Map("" -> "UNKNOWN"))
# in Python
df.na.replace([""], ["UNKNOWN"], "Description")
Ordering
As we discussed in Chapter 5, you can use asc_nulls_first,
desc_nulls_first, asc_nulls_last, or desc_nulls_last to specify
where you would like your null values to appear in an ordered DataFrame.
Working with Complex Types
Complex types can help you organize and structure your data in ways that
make more sense for the problem that you are hoping to solve. There are
three kinds of complex types: structs, arrays, and maps.
Structs
You can think of structs as DataFrames within DataFrames. A worked
example will illustrate this more clearly. We can create a struct by wrapping a
set of columns in parenthesis in a query:
df.selectExpr("(Description, InvoiceNo) as complex", "*")
df.selectExpr("struct(Description, InvoiceNo) as complex", "*")
// in Scala
import org.apache.spark.sql.functions.struct
val complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")
# in Python
from pyspark.sql.functions import struct
complexDF = df.select(struct("Description", "InvoiceNo").alias("complex"))
complexDF.createOrReplaceTempView("complexDF")
We now have a DataFrame with a column complex. We can query it just as
we might another DataFrame, the only difference is that we use a dot syntax
to do so, or the column method getField:
complexDF.select("complex.Description")
complexDF.select(col("complex").getField("Description"))
We can also query all values in the struct by using *. This brings up all the
columns to the top-level DataFrame:
complexDF.select("complex.*")
-- in SQL
SELECT complex.* FROM complexDF
Arrays
To define arrays, let’s work through a use case. With our current data, our
objective is to take every single word in our Description column and
convert that into a row in our DataFrame.
The first task is to turn our Description column into a complex type, an
array.
split
We do this by using the split function and specify the delimiter:
// in Scala
import org.apache.spark.sql.functions.split
df.select(split(col("Description"), " ")).show(2)
# in Python
from pyspark.sql.functions import split
df.select(split(col("Description"), " ")).show(2)
-- in SQL
SELECT split(Description, ' ') FROM dfTable
+---------------------+
|split(Description, )|
+---------------------+
| [WHITE, HANGING, ...|
| [WHITE, METAL, LA...|
+---------------------+
This is quite powerful because Spark allows us to manipulate this complex
type as another column. We can also query the values of the array using
Python-like syntax:
// in Scala
df.select(split(col("Description"), " ").alias("array_col"))
.selectExpr("array_col[0]").show(2)
# in Python
df.select(split(col("Description"), " ").alias("array_col"))\
.selectExpr("array_col[0]").show(2)
-- in SQL
SELECT split(Description, ' ')[0] FROM dfTable
This gives us the following result:
+------------+
|array_col[0]|
+------------+
| WHITE|
| WHITE|
+------------+
Array Length
We can determine the array’s length by querying for its size:
// in Scala
import org.apache.spark.sql.functions.size
df.select(size(split(col("Description"), " "))).show(2) // shows 5 and 3
# in Python
from pyspark.sql.functions import size
df.select(size(split(col("Description"), " "))).show(2) # shows 5 and 3
array_contains
We can also see whether this array contains a value:
// in Scala
import org.apache.spark.sql.functions.array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)
# in Python
from pyspark.sql.functions import array_contains
df.select(array_contains(split(col("Description"), " "), "WHITE")).show(2)
-- in SQL
SELECT array_contains(split(Description, ' '), 'WHITE') FROM dfTable
This gives us the following result:
+--------------------------------------------+
|array_contains(split(Description, ), WHITE)|
+--------------------------------------------+
| true|
| true|
+--------------------------------------------+
However, this does not solve our current problem. To convert a complex type
into a set of rows (one per value in our array), we need to use the explode
function.
explode
The explode function takes a column that consists of arrays and creates one
row (with the rest of the values duplicated) per value in the array. Figure 6-1
illustrates the process.
Figure 6-1. Exploding a column of text
// in Scala
import org.apache.spark.sql.functions.{split, explode}
df.withColumn("splitted", split(col("Description"), " "))
.withColumn("exploded", explode(col("splitted")))
.select("Description", "InvoiceNo", "exploded").show(2)
# in Python
from pyspark.sql.functions import split, explode
df.withColumn("splitted", split(col("Description"), " "))\
.withColumn("exploded", explode(col("splitted")))\
.select("Description", "InvoiceNo", "exploded").show(2)
-- in SQL
SELECT Description, InvoiceNo, exploded
FROM (SELECT *, split(Description, " ") as splitted FROM dfTable)
LATERAL VIEW explode(splitted) as exploded
This gives us the following result:
+--------------------+---------+--------+
| Description|InvoiceNo|exploded|
+--------------------+---------+--------+
|WHITE HANGING HEA...| 536365| WHITE|
|WHITE HANGING HEA...| 536365| HANGING|
+--------------------+---------+--------+
Maps
Maps are created by using the map function and key-value pairs of columns.
You then can select them just like you might select from an array:
// in Scala
import org.apache.spark.sql.functions.map
df.select(map(col("Description"),
col("InvoiceNo")).alias("complex_map")).show(2)
# in Python
from pyspark.sql.functions import create_map
df.select(create_map(col("Description"),
col("InvoiceNo")).alias("complex_map"))\
.show(2)
-- in SQL
SELECT map(Description, InvoiceNo) as complex_map FROM dfTable
WHERE Description IS NOT NULL
This produces the following result:
+--------------------+
| complex_map|
+--------------------+
|Map(WHITE HANGING...|
|Map(WHITE METAL L...|
+--------------------+
You can query them by using the proper key. A missing key returns null:
// in Scala
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))
.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)
# in Python
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
.selectExpr("complex_map['WHITE METAL LANTERN']").show(2)
This gives us the following result:
+--------------------------------+
|complex_map[WHITE METAL LANTERN]|
+--------------------------------+
| null|
| 536365|
+--------------------------------+
You can also explode map types, which will turn them into columns:
// in Scala
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))
.selectExpr("explode(complex_map)").show(2)
# in Python
df.select(map(col("Description"), col("InvoiceNo")).alias("complex_map"))\
.selectExpr("explode(complex_map)").show(2)
This gives us the following result:
+--------------------+------+
| key| value|
+--------------------+------+
|WHITE HANGING HEA...|536365|
| WHITE METAL LANTERN|536365|
+--------------------+------+
Working with JSON
Spark has some unique support for working with JSON data. You can operate
directly on strings of JSON in Spark and parse from JSON or extract JSON
objects. Let’s begin by creating a JSON column:
// in Scala
val jsonDF = spark.range(1).selectExpr("""
'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")
# in Python
jsonDF = spark.range(1).selectExpr("""
'{"myJSONKey" : {"myJSONValue" : [1, 2, 3]}}' as jsonString""")
You can use the get_json_object to inline query a JSON object, be it a
dictionary or array. You can use json_tuple if this object has only one level
of nesting:
// in Scala
import org.apache.spark.sql.functions.{get_json_object, json_tuple}
jsonDF.select(
get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as
"column",
json_tuple(col("jsonString"), "myJSONKey")).show(2)
# in Python
from pyspark.sql.functions import get_json_object, json_tuple
jsonDF.select(
get_json_object(col("jsonString"), "$.myJSONKey.myJSONValue[1]") as
"column",
json_tuple(col("jsonString"), "myJSONKey")).show(2)
Here’s the equivalent in SQL:
jsonDF.selectExpr(
"json_tuple(jsonString, '$.myJSONKey.myJSONValue[1]') as column").show(2)
This results in the following table:
+------+--------------------+
|column| c0|
+------+--------------------+
| 2|{"myJSONValue":[1...|
+------+--------------------+
You can also turn a StructType into a JSON string by using the to_json
function:
// in Scala
import org.apache.spark.sql.functions.to_json
df.selectExpr("(InvoiceNo, Description) as myStruct")
.select(to_json(col("myStruct")))
# in Python
from pyspark.sql.functions import to_json
df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(to_json(col("myStruct")))
This function also accepts a dictionary (map) of parameters that are the same
as the JSON data source. You can use the from_json function to parse this
(or other JSON data) back in. This naturally requires you to specify a schema,
and optionally you can specify a map of options, as well:
// in Scala
import org.apache.spark.sql.functions.from_json
import org.apache.spark.sql.types._
val parseSchema = new StructType(Array(
new StructField("InvoiceNo",StringType,true),
new StructField("Description",StringType,true)))
df.selectExpr("(InvoiceNo, Description) as myStruct")
.select(to_json(col("myStruct")).alias("newJSON"))
.select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)
# in Python
from pyspark.sql.functions import from_json
from pyspark.sql.types import *
parseSchema = StructType((
StructField("InvoiceNo",StringType(),True),
StructField("Description",StringType(),True)))
df.selectExpr("(InvoiceNo, Description) as myStruct")\
.select(to_json(col("myStruct")).alias("newJSON"))\
.select(from_json(col("newJSON"), parseSchema), col("newJSON")).show(2)
This gives us the following result:
+----------------------+--------------------+
|jsontostructs(newJSON)| newJSON|
+----------------------+--------------------+
| [536365,WHITE HAN...|{"InvoiceNo":"536...|
| [536365,WHITE MET...|{"InvoiceNo":"536...|
+----------------------+--------------------+
User-Defined Functions
One of the most powerful things that you can do in Spark is define your own
functions. These user-defined functions (UDFs) make it possible for you to
write your own custom transformations using Python or Scala and even use
external libraries. UDFs can take and return one or more columns as input.
Spark UDFs are incredibly powerful because you can write them in several
different programming languages; you do not need to create them in an
esoteric format or domain-specific language. They’re just functions that
operate on the data, record by record. By default, these functions are
registered as temporary functions to be used in that specific SparkSession or
Context.
Although you can write UDFs in Scala, Python, or Java, there are
performance considerations that you should be aware of. To illustrate this,
we’re going to walk through exactly what happens when you create UDF,
pass that into Spark, and then execute code using that UDF.
The first step is the actual function. We’ll create a simple one for this
example. Let’s write a power3 function that takes a number and raises it to a
power of three:
// in Scala
val udfExampleDF = spark.range(5).toDF("num")
def power3(number:Double):Double = number * number * number
power3(2.0)
# in Python
udfExampleDF = spark.range(5).toDF("num")
def power3(double_value):
return double_value ** 3
power3(2.0)
In this trivial example, we can see that our functions work as expected. We
are able to provide an individual input and produce the expected result (with
this simple test case). Thus far, our expectations for the input are high: it
must be a specific type and cannot be a null value (see “Working with Nulls
in Data”).
Now that we’ve created these functions and tested them, we need to register
them with Spark so that we can use them on all of our worker machines.
Spark will serialize the function on the driver and transfer it over the network
to all executor processes. This happens regardless of language.
When you use the function, there are essentially two different things that
occur. If the function is written in Scala or Java, you can use it within the
Java Virtual Machine (JVM). This means that there will be little performance
penalty aside from the fact that you can’t take advantage of code generation
capabilities that Spark has for built-in functions. There can be performance
issues if you create or use a lot of objects; we cover that in the section on
optimization in Chapter 19.
If the function is written in Python, something quite different happens. Spark
starts a Python process on the worker, serializes all of the data to a format
that Python can understand (remember, it was in the JVM earlier), executes
the function row by row on that data in the Python process, and then finally
returns the results of the row operations to the JVM and Spark. Figure 6-2
provides an overview of the process.
Figure 6-2. Figure caption
WARNING
Starting this Python process is expensive, but the real cost is in serializing the data to
Python. This is costly for two reasons: it is an expensive computation, but also, after the
data enters Python, Spark cannot manage the memory of the worker. This means that you
could potentially cause a worker to fail if it becomes resource constrained (because both
the JVM and Python are competing for memory on the same machine). We recommend
that you write your UDFs in Scala or Java—the small amount of time it should take you to
write the function in Scala will always yield significant speed ups, and on top of that, you
can still use the function from Python!
Now that you have an understanding of the process, let’s work through an
example. First, we need to register the function to make it available as a
DataFrame function:
// in Scala
import org.apache.spark.sql.functions.udf
val power3udf = udf(power3(_:Double):Double)
We can use that just like any other DataFrame function:
// in Scala
udfExampleDF.select(power3udf(col("num"))).show()
The same applies to Python—first, we register it:
# in Python
from pyspark.sql.functions import udf
power3udf = udf(power3)
Then, we can use it in our DataFrame code:
# in Python
from pyspark.sql.functions import col
udfExampleDF.select(power3udf(col("num"))).show(2)
+-----------+
|power3(num)|
+-----------+
| 0|
| 1|
+-----------+
At this juncture, we can use this only as a DataFrame function. That is to say,
we can’t use it within a string expression, only on an expression. However,
we can also register this UDF as a Spark SQL function. This is valuable
because it makes it simple to use this function within SQL as well as across
languages.
Let’s register the function in Scala:
// in Scala
spark.udf.register("power3", power3(_:Double):Double)
udfExampleDF.selectExpr("power3(num)").show(2)
Because this function is registered with Spark SQL—and we’ve learned that
any Spark SQL function or expression is valid to use as an expression when
working with DataFrames—we can turn around and use the UDF that we
wrote in Scala, in Python. However, rather than using it as a DataFrame
function, we use it as a SQL expression:
# in Python
udfExampleDF.selectExpr("power3(num)").show(2)
# registered in Scala
We can also register our Python function to be available as a SQL function
and use that in any language, as well.
One thing we can also do to ensure that our functions are working correctly is
specify a return type. As we saw in the beginning of this section, Spark
manages its own type information, which does not align exactly with
Python’s types. Therefore, it’s a best practice to define the return type for
your function when you define it. It is important to note that specifying the
return type is not necessary, but it is a best practice.
If you specify the type that doesn’t align with the actual type returned by the
function, Spark will not throw an error but will just return null to designate a
failure. You can see this if you were to switch the return type in the following
function to be a DoubleType:
# in Python
from pyspark.sql.types import IntegerType, DoubleType
spark.udf.register("power3py", power3, DoubleType())
# in Python
udfExampleDF.selectExpr("power3py(num)").show(2)
# registered via Python
This is because the range creates integers. When integers are operated on in
Python, Python won’t convert them into floats (the corresponding type to
Spark’s double type), therefore we see null. We can remedy this by ensuring
that our Python function returns a float instead of an integer and the function
will behave correctly.
Naturally, we can use either of these from SQL, too, after we register them:
-- in SQL
SELECT power3(12), power3py(12) -- doesn't work because of return type
When you want to optionally return a value from a UDF, you should return
None in Python and an Option type in Scala:
## Hive UDFs
As a last note, you can also use UDF/UDAF creation via a Hive syntax. To
allow for this, first you must enable Hive support when they create their
SparkSession (via SparkSession.builder().enableHiveSupport()).
Then you can register UDFs in SQL. This is only supported with precompiled
Scala and Java packages, so you’ll need to specify them as a dependency:
-- in SQL
CREATE TEMPORARY FUNCTION myFunc AS 'com.organization.hive.udf.FunctionName'
Additionally, you can register this as a permanent function in the Hive
Metastore by removing TEMPORARY.
Conclusion
This chapter demonstrated how easy it is to extend Spark SQL to your own
purposes and do so in a way that is not some esoteric, domain-specific
language but rather simple functions that are easy to test and maintain
without even using Spark! This is an amazingly powerful tool that you can
use to specify sophisticated business logic that can run on five rows on your
local machines or on terabytes of data on a 100-node cluster!
Chapter 7. Aggregations
Aggregating is the act of collecting something together and is a cornerstone
of big data analytics. In an aggregation, you will specify a key or grouping
and an aggregation function that specifies how you should transform one or
more columns. This function must produce one result for each group, given
multiple input values. Spark’s aggregation capabilities are sophisticated and
mature, with a variety of different use cases and possibilities. In general, you
use aggregations to summarize numerical data usually by means of some
grouping. This might be a summation, a product, or simple counting. Also,
with Spark you can aggregate any kind of value into an array, list, or map, as
we will see in “Aggregating to Complex Types”.
In addition to working with any type of values, Spark also allows us to create
the following groupings types:
The simplest grouping is to just summarize a complete DataFrame
by performing an aggregation in a select statement.
A “group by” allows you to specify one or more keys as well as one
or more aggregation functions to transform the value columns.
A “window” gives you the ability to specify one or more keys as
well as one or more aggregation functions to transform the value
columns. However, the rows input to the function are somehow
related to the current row.
A “grouping set,” which you can use to aggregate at multiple
different levels. Grouping sets are available as a primitive in SQL
and via rollups and cubes in DataFrames.
A “rollup” makes it possible for you to specify one or more keys as
well as one or more aggregation functions to transform the value
columns, which will be summarized hierarchically.
A “cube” allows you to specify one or more keys as well as one or
more aggregation functions to transform the value columns, which
will be summarized across all combinations of columns.
Each grouping returns a RelationalGroupedDataset on which we specify
our aggregations.
NOTE
An important thing to consider is how exact you need an answer to be. When performing
calculations over big data, it can be quite expensive to get an exact answer to a question,
and it’s often much cheaper to simply request an approximate to a reasonable degree of
accuracy. You’ll note that we mention some approximation functions throughout the book
and oftentimes this is a good opportunity to improve the speed and execution of your
Spark jobs, especially for interactive and ad hoc analysis.
Let’s begin by reading in our data on purchases, repartitioning the data to
have far fewer partitions (because we know it’s a small volume of data stored
in a lot of small files), and caching the results for rapid access:
// in Scala
val df = spark.read.format("csv")
.option("header", "true")
.option("inferSchema", "true")
.load("/data/retail-data/all/*.csv")
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")
# in Python
df = spark.read.format("csv")\
.option("header", "true")\
.option("inferSchema", "true")\
.load("/data/retail-data/all/*.csv")\
.coalesce(5)
df.cache()
df.createOrReplaceTempView("dfTable")
Here’s a sample of the data so that you can reference the output of some of
the functions:
+---------+---------+--------------------+--------+--------------+---------+----
-
|InvoiceNo|StockCode| Description|Quantity|
InvoiceDate|UnitPrice|Cu...
+---------+---------+--------------------+--------+--------------+---------+----
-
| 536365| 85123A|WHITE HANGING... | 6|12/1/2010 8:26| 2.55|
...
| 536365| 71053|WHITE METAL... | 6|12/1/2010 8:26| 3.39|
...
...
| 536367| 21755|LOVE BUILDING BLO...| 3|12/1/2010 8:34| 5.95|
...
| 536367| 21777|RECIPE BOX WITH M...| 4|12/1/2010 8:34| 7.95|
...
+---------+---------+--------------------+--------+--------------+---------+----
-
As mentioned, basic aggregations apply to an entire DataFrame. The simplest
example is the count method:
df.count() == 541909
If you’ve been reading this book chapter by chapter, you know that count is
actually an action as opposed to a transformation, and so it returns
immediately. You can use count to get an idea of the total size of your
dataset but another common pattern is to use it to cache an entire DataFrame
in memory, just like we did in this example.
Now, this method is a bit of an outlier because it exists as a method (in this
case) as opposed to a function and is eagerly evaluated instead of a lazy
transformation. In the next section, we will see count used as a lazy function,
as well.
Aggregation Functions
All aggregations are available as functions, in addition to the special cases
that can appear on DataFrames or via .stat, like we saw in Chapter 6. You
can find most aggregation functions in the
org.apache.spark.sql.functions package.
NOTE
There are some gaps between the available SQL functions and the functions that we can
import in Scala and Python. This changes every release, so it’s impossible to include a
definitive list. This section covers the most common functions.
count
The first function worth going over is count, except in this example it will
perform as a transformation instead of an action. In this case, we can do one
of two things: specify a specific column to count, or all the columns by using
count(*) or count(1) to represent that we want to count every row as the
literal one, as shown in this example:
// in Scala
import org.apache.spark.sql.functions.count
df.select(count("StockCode")).show() // 541909
# in Python
from pyspark.sql.functions import count
df.select(count("StockCode")).show() # 541909
-- in SQL
SELECT COUNT(*) FROM dfTable
WARNING
There are a number of gotchas when it comes to null values and counting. For instance,
when performing a count(*), Spark will count null values (including rows containing all
nulls). However, when counting an individual column, Spark will not count the null
values.
countDistinct
Sometimes, the total number is not relevant; rather, it’s the number of unique
groups that you want. To get this number, you can use the countDistinct
function. This is a bit more relevant for individual columns:
// in Scala
import org.apache.spark.sql.functions.countDistinct
df.select(countDistinct("StockCode")).show() // 4070
# in Python
from pyspark.sql.functions import countDistinct
df.select(countDistinct("StockCode")).show() # 4070
-- in SQL
SELECT COUNT(DISTINCT *) FROM DFTABLE
approx_count_distinct
Often, we find ourselves working with large datasets and the exact distinct
count is irrelevant. There are times when an approximation to a certain
degree of accuracy will work just fine, and for that, you can use the
approx_count_distinct function:
// in Scala
import org.apache.spark.sql.functions.approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show() // 3364
# in Python
from pyspark.sql.functions import approx_count_distinct
df.select(approx_count_distinct("StockCode", 0.1)).show() # 3364
-- in SQL
SELECT approx_count_distinct(StockCode, 0.1) FROM DFTABLE
You will notice that approx_count_distinct took another parameter with
which you can specify the maximum estimation error allowed. In this case,
we specified a rather large error and thus receive an answer that is quite far
off but does complete more quickly than countDistinct. You will see much
greater performance gains with larger datasets.
first and last
You can get the first and last values from a DataFrame by using these two
obviously named functions. This will be based on the rows in the DataFrame,
not on the values in the DataFrame:
// in Scala
import org.apache.spark.sql.functions.{first, last}
df.select(first("StockCode"), last("StockCode")).show()
# in Python
from pyspark.sql.functions import first, last
df.select(first("StockCode"), last("StockCode")).show()
-- in SQL
SELECT first(StockCode), last(StockCode) FROM dfTable
+-----------------------+----------------------+
|first(StockCode, false)|last(StockCode, false)|
+-----------------------+----------------------+
| 85123A| 22138|
+-----------------------+----------------------+
min and max
To extract the minimum and maximum values from a DataFrame, use the min
and max functions:
// in Scala
import org.apache.spark.sql.functions.{min, max}
df.select(min("Quantity"), max("Quantity")).show()
# in Python
from pyspark.sql.functions import min, max
df.select(min("Quantity"), max("Quantity")).show()
-- in SQL
SELECT min(Quantity), max(Quantity) FROM dfTable
+-------------+-------------+
|min(Quantity)|max(Quantity)|
+-------------+-------------+
| -80995| 80995|
+-------------+-------------+
sum
Another simple task is to add all the values in a row using the sum function:
// in Scala
import org.apache.spark.sql.functions.sum
df.select(sum("Quantity")).show() // 5176450
# in Python
from pyspark.sql.functions import sum
df.select(sum("Quantity")).show() # 5176450
-- in SQL
SELECT sum(Quantity) FROM dfTable
sumDistinct
In addition to summing a total, you also can sum a distinct set of values by
using the sumDistinct function:
// in Scala
import org.apache.spark.sql.functions.sumDistinct
df.select(sumDistinct("Quantity")).show() // 29310
# in Python
from pyspark.sql.functions import sumDistinct
df.select(sumDistinct("Quantity")).show() # 29310
-- in SQL
SELECT SUM(Quantity) FROM dfTable -- 29310
avg
Although you can calculate average by dividing sum by count, Spark
provides an easier way to get that value via the avg or mean functions. In this
example, we use alias in order to more easily reuse these columns later:
// in Scala
import org.apache.spark.sql.functions.{sum, count, avg, expr}
df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))
.selectExpr(
"total_purchases/total_transactions",
"avg_purchases",
"mean_purchases").show()
# in Python
from pyspark.sql.functions import sum, count, avg, expr
df.select(
count("Quantity").alias("total_transactions"),
sum("Quantity").alias("total_purchases"),
avg("Quantity").alias("avg_purchases"),
expr("mean(Quantity)").alias("mean_purchases"))\
.selectExpr(
"total_purchases/total_transactions",
"avg_purchases",
"mean_purchases").show()
+--------------------------------------+----------------+----------------+
|(total_purchases / total_transactions)| avg_purchases| mean_purchases|
+--------------------------------------+----------------+----------------+
| 9.55224954743324|9.55224954743324|9.55224954743324|
+--------------------------------------+----------------+----------------+
NOTE
You can also average all the distinct values by specifying distinct. In fact, most aggregate
functions support doing so only on distinct values.
Variance and Standard Deviation
Calculating the mean naturally brings up questions about the variance and
standard deviation. These are both measures of the spread of the data around
the mean. The variance is the average of the squared differences from the
mean, and the standard deviation is the square root of the variance. You can
calculate these in Spark by using their respective functions. However,
something to note is that Spark has both the formula for the sample standard
deviation as well as the formula for the population standard deviation. These
are fundamentally different statistical formulae, and we need to differentiate
between them. By default, Spark performs the formula for the sample
standard deviation or variance if you use the variance or stddev functions.
You can also specify these explicitly or refer to the population standard
deviation or variance:
// in Scala
import org.apache.spark.sql.functions.{var_pop, stddev_pop}
import org.apache.spark.sql.functions.{var_samp, stddev_samp}
df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()
# in Python
from pyspark.sql.functions import var_pop, stddev_pop
from pyspark.sql.functions import var_samp, stddev_samp
df.select(var_pop("Quantity"), var_samp("Quantity"),
stddev_pop("Quantity"), stddev_samp("Quantity")).show()
-- in SQL
SELECT var_pop(Quantity), var_samp(Quantity),
stddev_pop(Quantity), stddev_samp(Quantity)
FROM dfTable
+------------------+------------------+--------------------+-------------------+
| var_pop(Quantity)|var_samp(Quantity)|stddev_pop(Quantity)|stddev_samp(Quan...|
+------------------+------------------+--------------------+-------------------+
|47559.303646609056|47559.391409298754| 218.08095663447796| 218.081157850...|
+------------------+------------------+--------------------+-------------------+
skewness and kurtosis
Skewness and kurtosis are both measurements of extreme points in your data.
Skewness measures the asymmetry of the values in your data around the
mean, whereas kurtosis is a measure of the tail of data. These are both
relevant specifically when modeling your data as a probability distribution of
a random variable. Although here we won’t go into the math behind these
specifically, you can look up definitions quite easily on the internet. You can
calculate these by using the functions:
import org.apache.spark.sql.functions.{skewness, kurtosis}
df.select(skewness("Quantity"), kurtosis("Quantity")).show()
# in Python
from pyspark.sql.functions import skewness, kurtosis
df.select(skewness("Quantity"), kurtosis("Quantity")).show()
-- in SQL
SELECT skewness(Quantity), kurtosis(Quantity) FROM dfTable
+-------------------+------------------+
| skewness(Quantity)|kurtosis(Quantity)|
+-------------------+------------------+
|-0.2640755761052562|119768.05495536952|
+-------------------+------------------+
Covariance and Correlation
We discussed single column aggregations, but some functions compare the
interactions of the values in two difference columns together. Two of these
functions are cov and corr, for covariance and correlation, respectively.
Correlation measures the Pearson correlation coefficient, which is scaled
between –1 and +1. The covariance is scaled according to the inputs in the
data.
Like the var function, covariance can be calculated either as the sample
covariance or the population covariance. Therefore it can be important to
specify which formula you want to use. Correlation has no notion of this and
therefore does not have calculations for population or sample. Here’s how
they work:
// in Scala
import org.apache.spark.sql.functions.{corr, covar_pop, covar_samp}
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
covar_pop("InvoiceNo", "Quantity")).show()
# in Python
from pyspark.sql.functions import corr, covar_pop, covar_samp
df.select(corr("InvoiceNo", "Quantity"), covar_samp("InvoiceNo", "Quantity"),
covar_pop("InvoiceNo", "Quantity")).show()
-- in SQL
SELECT corr(InvoiceNo, Quantity), covar_samp(InvoiceNo, Quantity),
covar_pop(InvoiceNo, Quantity)
FROM dfTable
+-------------------------+-------------------------------+---------------------
+
|corr(InvoiceNo, Quantity)|covar_samp(InvoiceNo,
Quantity)|covar_pop(InvoiceN...|
+-------------------------+-------------------------------+---------------------
+
| 4.912186085635685E-4| 1052.7280543902734|
1052.7...|
+-------------------------+-------------------------------+---------------------
+
Aggregating to Complex Types
In Spark, you can perform aggregations not just of numerical values using
formulas, you can also perform them on complex types. For example, we can
collect a list of values present in a given column or only the unique values by
collecting to a set.
You can use this to carry out some more programmatic access later on in the
pipeline or pass the entire collection in a user-defined function (UDF):
// in Scala
import org.apache.spark.sql.functions.{collect_set, collect_list}
df.agg(collect_set("Country"), collect_list("Country")).show()
# in Python
from pyspark.sql.functions import collect_set, collect_list
df.agg(collect_set("Country"), collect_list("Country")).show()
-- in SQL
SELECT collect_set(Country), collect_set(Country) FROM dfTable
+--------------------+---------------------+
|collect_set(Country)|collect_list(Country)|
+--------------------+---------------------+
|[Portugal, Italy,...| [United Kingdom, ...|
+--------------------+---------------------+
Grouping
Thus far, we have performed only DataFrame-level aggregations. A more
common task is to perform calculations based on groups in the data. This is
typically done on categorical data for which we group our data on one
column and perform some calculations on the other columns that end up in
that group.
The best way to explain this is to begin performing some groupings. The first
will be a count, just as we did before. We will group by each unique invoice
number and get the count of items on that invoice. Note that this returns
another DataFrame and is lazily performed.
We do this grouping in two phases. First we specify the column(s) on which
we would like to group, and then we specify the aggregation(s). The first step
returns a RelationalGroupedDataset, and the second step returns a
DataFrame.
As mentioned, we can specify any number of columns on which we want to
group:
df.groupBy("InvoiceNo", "CustomerId").count().show()
-- in SQL
SELECT count(*) FROM dfTable GROUP BY InvoiceNo, CustomerId
+---------+----------+-----+
|InvoiceNo|CustomerId|count|
+---------+----------+-----+
| 536846| 14573| 76|
...
| C544318| 12989| 1|
+---------+----------+-----+
Grouping with Expressions
As we saw earlier, counting is a bit of a special case because it exists as a
method. For this, usually we prefer to use the count function. Rather than
passing that function as an expression into a select statement, we specify it
as within agg. This makes it possible for you to pass-in arbitrary expressions
that just need to have some aggregation specified. You can even do things
like alias a column after transforming it for later use in your data flow:
// in Scala
import org.apache.spark.sql.functions.count
df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
expr("count(Quantity)")).show()
# in Python
from pyspark.sql.functions import count
df.groupBy("InvoiceNo").agg(
count("Quantity").alias("quan"),
expr("count(Quantity)")).show()
+---------+----+---------------+
|InvoiceNo|quan|count(Quantity)|
+---------+----+---------------+
| 536596| 6| 6|
...
| C542604| 8| 8|
+---------+----+---------------+
Grouping with Maps
Sometimes, it can be easier to specify your transformations as a series of
Maps for which the key is the column, and the value is the aggregation
function (as a string) that you would like to perform. You can reuse multiple
column names if you specify them inline, as well:
// in Scala
df.groupBy("InvoiceNo").agg("Quantity"->"avg", "Quantity"->"stddev_pop").show()
# in Python
df.groupBy("InvoiceNo").agg(expr("avg(Quantity)"),expr("stddev_pop(Quantity)"))\
.show()
-- in SQL
SELECT avg(Quantity), stddev_pop(Quantity), InvoiceNo FROM dfTable
GROUP BY InvoiceNo
+---------+------------------+--------------------+
|InvoiceNo| avg(Quantity)|stddev_pop(Quantity)|
+---------+------------------+--------------------+
| 536596| 1.5| 1.1180339887498947|
...
| C542604| -8.0| 15.173990905493518|
+---------+------------------+--------------------+
Window Functions
You can also use window functions to carry out some unique aggregations by
either computing some aggregation on a specific “window” of data, which
you define by using a reference to the current data. This window specification
determines which rows will be passed in to this function. Now this is a bit
abstract and probably similar to a standard group-by, so let’s differentiate
them a bit more.
A group-by takes data, and every row can go only into one grouping. A
window function calculates a return value for every input row of a table
based on a group of rows, called a frame. Each row can fall into one or more
frames. A common use case is to take a look at a rolling average of some
value for which each row represents one day. If you were to do this, each row
would end up in seven different frames. We cover defining frames a little
later, but for your reference, Spark supports three kinds of window functions:
ranking functions, analytic functions, and aggregate functions.
Figure 7-1 illustrates how a given row can fall into multiple frames.
Figure 7-1. Visualizing window functions
To demonstrate, we will add a date column that will convert our invoice date
into a column that contains only date information (not time information, too):
// in Scala
import org.apache.spark.sql.functions.{col, to_date}
val dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"),
"MM/d/yyyy H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
# in Python
from pyspark.sql.functions import col, to_date
dfWithDate = df.withColumn("date", to_date(col("InvoiceDate"), "MM/d/yyyy
H:mm"))
dfWithDate.createOrReplaceTempView("dfWithDate")
The first step to a window function is to create a window specification. Note
that the partition by is unrelated to the partitioning scheme concept that
we have covered thus far. It’s just a similar concept that describes how we
will be breaking up our group. The ordering determines the ordering within a
given partition, and, finally, the frame specification (the rowsBetween
statement) states which rows will be included in the frame based on its
reference to the current input row. In the following example, we look at all
previous rows up to the current row:
// in Scala
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col
val windowSpec = Window
.partitionBy("CustomerId", "date")
.orderBy(col("Quantity").desc)
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
# in Python
from pyspark.sql.window import Window
from pyspark.sql.functions import desc
windowSpec = Window\
.partitionBy("CustomerId", "date")\
.orderBy(desc("Quantity"))\
.rowsBetween(Window.unboundedPreceding, Window.currentRow)
Now we want to use an aggregation function to learn more about each
specific customer. An example might be establishing the maximum purchase
quantity over all time. To answer this, we use the same aggregation functions
that we saw earlier by passing a column name or expression. In addition, we
indicate the window specification that defines to which frames of data this
function will apply:
import org.apache.spark.sql.functions.max
val maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
# in Python
from pyspark.sql.functions import max
maxPurchaseQuantity = max(col("Quantity")).over(windowSpec)
You will notice that this returns a column (or expressions). We can now use
this in a DataFrame select statement. Before doing so, though, we will create
the purchase quantity rank. To do that we use the dense_rank function to
determine which date had the maximum purchase quantity for every
customer. We use dense_rank as opposed to rank to avoid gaps in the
ranking sequence when there are tied values (or in our case, duplicate rows):
// in Scala
import org.apache.spark.sql.functions.{dense_rank, rank}
val purchaseDenseRank = dense_rank().over(windowSpec)
val purchaseRank = rank().over(windowSpec)
# in Python
from pyspark.sql.functions import dense_rank, rank
purchaseDenseRank = dense_rank().over(windowSpec)
purchaseRank = rank().over(windowSpec)
This also returns a column that we can use in select statements. Now we can
perform a select to view the calculated window values:
// in Scala
import org.apache.spark.sql.functions.col
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()
# in Python
from pyspark.sql.functions import col
dfWithDate.where("CustomerId IS NOT NULL").orderBy("CustomerId")\
.select(
col("CustomerId"),
col("date"),
col("Quantity"),
purchaseRank.alias("quantityRank"),
purchaseDenseRank.alias("quantityDenseRank"),
maxPurchaseQuantity.alias("maxPurchaseQuantity")).show()
-- in SQL
SELECT CustomerId, date, Quantity,
rank(Quantity) OVER (PARTITION BY CustomerId, date
ORDER BY Quantity DESC NULLS LAST
ROWS BETWEEN
UNBOUNDED PRECEDING AND
CURRENT ROW) as rank,
dense_rank(Quantity) OVER (PARTITION BY CustomerId, date
ORDER BY Quantity DESC NULLS LAST
ROWS BETWEEN
UNBOUNDED PRECEDING AND
CURRENT ROW) as dRank,
max(Quantity) OVER (PARTITION BY CustomerId, date
ORDER BY Quantity DESC NULLS LAST
ROWS BETWEEN
UNBOUNDED PRECEDING AND
CURRENT ROW) as maxPurchase
FROM dfWithDate WHERE CustomerId IS NOT NULL ORDER BY CustomerId
+----------+----------+--------+------------+-----------------+---------------+
|CustomerId| date|Quantity|quantityRank|quantityDenseRank|maxP...Quantity|
+----------+----------+--------+------------+-----------------+---------------+
| 12346|2011-01-18| 74215| 1| 1| 74215|
| 12346|2011-01-18| -74215| 2| 2| 74215|
| 12347|2010-12-07| 36| 1| 1| 36|
| 12347|2010-12-07| 30| 2| 2| 36|
...
| 12347|2010-12-07| 12| 4| 4| 36|
| 12347|2010-12-07| 6| 17| 5| 36|
| 12347|2010-12-07| 6| 17| 5| 36|
+----------+----------+--------+------------+-----------------+---------------+
Grouping Sets
Thus far in this chapter, we’ve seen simple group-by expressions that we can
use to aggregate on a set of columns with the values in those columns.
However, sometimes we want something a bit more complete—an
aggregation across multiple groups. We achieve this by using grouping sets.
Grouping sets are a low-level tool for combining sets of aggregations
together. They give you the ability to create arbitrary aggregation in their
group-by statements.
Let’s work through an example to gain a better understanding. Here, we
would like to get the total quantity of all stock codes and customers. To do
so, we’ll use the following SQL expression:
// in Scala
val dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
# in Python
dfNoNull = dfWithDate.drop()
dfNoNull.createOrReplaceTempView("dfNoNull")
-- in SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode
ORDER BY CustomerId DESC, stockCode DESC
+----------+---------+-------------+
|CustomerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
| 18287| 85173| 48|
| 18287| 85040A| 48|
| 18287| 85039B| 120|
...
| 18287| 23269| 36|
+----------+---------+-------------+
You can do the exact same thing by using a grouping set:
-- in SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode))
ORDER BY CustomerId DESC, stockCode DESC
+----------+---------+-------------+
|CustomerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
| 18287| 85173| 48|
| 18287| 85040A| 48|
| 18287| 85039B| 120|
...
| 18287| 23269| 36|
+----------+---------+-------------+
WARNING
Grouping sets depend on null values for aggregation levels. If you do not filter-out null
values, you will get incorrect results. This applies to cubes, rollups, and grouping sets.
Simple enough, but what if you also want to include the total number of
items, regardless of customer or stock code? With a conventional group-by
statement, this would be impossible. But, it’s simple with grouping sets: we
simply specify that we would like to aggregate at that level, as well, in our
grouping set. This is, effectively, the union of several different groupings
together:
-- in SQL
SELECT CustomerId, stockCode, sum(Quantity) FROM dfNoNull
GROUP BY customerId, stockCode GROUPING SETS((customerId, stockCode),())
ORDER BY CustomerId DESC, stockCode DESC
+----------+---------+-------------+
|customerId|stockCode|sum(Quantity)|
+----------+---------+-------------+
| 18287| 85173| 48|
| 18287| 85040A| 48|
| 18287| 85039B| 120|
...
| 18287| 23269| 36|
+----------+---------+-------------+
The GROUPING SETS operator is only available in SQL. To perform the same
in DataFrames, you use the rollup and cube operators—which allow us to
get the same results. Let’s go through those.
Rollups
Thus far, we’ve been looking at explicit groupings. When we set our
grouping keys of multiple columns, Spark looks at those as well as the actual
combinations that are visible in the dataset. A rollup is a multidimensional
aggregation that performs a variety of group-by style calculations for us.
Let’s create a rollup that looks across time (with our new Date column) and
space (with the Country column) and creates a new DataFrame that includes
the grand total over all dates, the grand total for each date in the DataFrame,
and the subtotal for each country on each date in the DataFrame:
val rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")
.orderBy("Date")
rolledUpDF.show()
# in Python
rolledUpDF = dfNoNull.rollup("Date", "Country").agg(sum("Quantity"))\
.selectExpr("Date", "Country", "`sum(Quantity)` as total_quantity")\
.orderBy("Date")
rolledUpDF.show()
+----------+--------------+--------------+
| Date| Country|total_quantity|
+----------+--------------+--------------+
| null| null| 5176450|
|2010-12-01|United Kingdom| 23949|
|2010-12-01| Germany| 117|
|2010-12-01| France| 449|
...
|2010-12-03| France| 239|
|2010-12-03| Italy| 164|
|2010-12-03| Belgium| 528|
+----------+--------------+--------------+
Now where you see the null values is where you’ll find the grand totals. A
null in both rollup columns specifies the grand total across both of those
columns:
rolledUpDF.where("Country IS NULL").show()
rolledUpDF.where("Date IS NULL").show()
+----+-------+--------------+
|Date|Country|total_quantity|
+----+-------+--------------+
|null| null| 5176450|
+----+-------+--------------+
Cube
A cube takes the rollup to a level deeper. Rather than treating elements
hierarchically, a cube does the same thing across all dimensions. This means
that it won’t just go by date over the entire time period, but also the country.
To pose this as a question again, can you make a table that includes the
following?
The total across all dates and countries
The total for each date across all countries
The total for each country on each date
The total for each country across all dates
The method call is quite similar, but instead of calling rollup, we call cube:
// in Scala
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))
.select("Date", "Country", "sum(Quantity)").orderBy("Date").show()
# in Python
from pyspark.sql.functions import sum
dfNoNull.cube("Date", "Country").agg(sum(col("Quantity")))\
.select("Date", "Country", "sum(Quantity)").orderBy("Date").show()
+----+--------------------+-------------+
|Date| Country|sum(Quantity)|
+----+--------------------+-------------+
|null| Japan| 25218|
|null| Portugal| 16180|
|null| Unspecified| 3300|
|null| null| 5176450|
|null| Australia| 83653|
...
|null| Norway| 19247|
|null| Hong Kong| 4769|
|null| Spain| 26824|
|null| Czech Republic| 592|
+----+--------------------+-------------+
This is a quick and easily accessible summary of nearly all of the information
in our table, and it’s a great way to create a quick summary table that others
can use later on.
Grouping Metadata
Sometimes when using cubes and rollups, you want to be able to query the
aggregation levels so that you can easily filter them down accordingly. We
can do this by using the grouping_id, which gives us a column specifying
the level of aggregation that we have in our result set. The query in the
example that follows returns four distinct grouping IDs:
Table 7-1. Purpose of grouping IDs
Grouping
ID Description
3This will appear for the highest-level aggregation, which will gives us the total
quantity regardless of customerId and stockCode.
2This will appear for all aggregations of individual stock codes. This gives us the
total quantity per stock code, regardless of customer.
1This will give us the total quantity on a per-customer basis, regardless of item
purchased.
0This will give us the total quantity for individual customerId and stockCode
combinations.
This is a bit abstract, so it’s well worth trying out to understand the behavior
yourself:
// in Scala
import org.apache.spark.sql.functions.{grouping_id, sum, expr}
dfNoNull.cube("customerId", "stockCode").agg(grouping_id(), sum("Quantity"))
.orderBy(expr("grouping_id()").desc)
.show()
+----------+---------+-------------+-------------+
|customerId|stockCode|grouping_id()|sum(Quantity)|
+----------+---------+-------------+-------------+
| null| null| 3| 5176450|
| null| 23217| 2| 1309|
| null| 90059E| 2| 19|
...
+----------+---------+-------------+-------------+
Pivot
Pivots make it possible for you to convert a row into a column. For example,
in our current data we have a Country column. With a pivot, we can
aggregate according to some function for each of those given countries and
display them in an easy-to-query way:
// in Scala
val pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
# in Python
pivoted = dfWithDate.groupBy("date").pivot("Country").sum()
This DataFrame will now have a column for every combination of country,
numeric variable, and a column specifying the date. For example, for USA
we have the following columns: USA_sum(Quantity),
USA_sum(UnitPrice), USA_sum(CustomerID). This represents one for each
numeric column in our dataset (because we just performed an aggregation
over all of them).
Here’s an example query and result from this data:
pivoted.where("date > '2011-12-05'").select("date"
,"`USA_sum(Quantity)`").show()
+----------+-----------------+
| date|USA_sum(Quantity)|
+----------+-----------------+
|2011-12-06| null|
|2011-12-09| null|
|2011-12-08| -196|
|2011-12-07| null|
+----------+-----------------+
Now all of the columns can be calculated with single groupings, but the value
of a pivot comes down to how you would like to explore the data. It can be
useful, if you have low enough cardinality in a certain column to transform it
into columns so that users can see the schema and immediately know what to
query for.
User-Defined Aggregation Functions
User-defined aggregation functions (UDAFs) are a way for users to define
their own aggregation functions based on custom formulae or business rules.
You can use UDAFs to compute custom calculations over groups of input
data (as opposed to single rows). Spark maintains a single
AggregationBuffer to store intermediate results for every group of input
data.
To create a UDAF, you must inherit from the
UserDefinedAggregateFunction base class and implement the following
methods:
inputSchema represents input arguments as a StructType
bufferSchema represents intermediate UDAF results as a
StructType
dataType represents the return DataType
deterministic is a Boolean value that specifies whether this
UDAF will return the same result for a given input
initialize allows you to initialize values of an aggregation buffer
update describes how you should update the internal buffer based
on a given row
merge describes how two aggregation buffers should be merged
evaluate will generate the final result of the aggregation
The following example implements a BoolAnd, which will inform us whether
all the rows (for a given column) are true; if they’re not, it will return false:
// in Scala
import org.apache.spark.sql.expressions.MutableAggregationBuffer
import org.apache.spark.sql.expressions.UserDefinedAggregateFunction
import org.apache.spark.sql.Row
import org.apache.spark.sql.types._
class BoolAnd extends UserDefinedAggregateFunction {
def inputSchema: org.apache.spark.sql.types.StructType =
StructType(StructField("value", BooleanType) :: Nil)
def bufferSchema: StructType = StructType(
StructField("result", BooleanType) :: Nil
)
def dataType: DataType = BooleanType
def deterministic: Boolean = true
def initialize(buffer: MutableAggregationBuffer): Unit = {
buffer(0) = true
}
def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
buffer(0) = buffer.getAs[Boolean](0) && input.getAs[Boolean](0)
}
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
buffer1(0) = buffer1.getAs[Boolean](0) && buffer2.getAs[Boolean](0)
}
def evaluate(buffer: Row): Any = {
buffer(0)
}
}
Now, we simply instantiate our class and/or register it as a function:
// in Scala
val ba = new BoolAnd
spark.udf.register("booland", ba)
import org.apache.spark.sql.functions._
spark.range(1)
.selectExpr("explode(array(TRUE, TRUE, TRUE)) as t")
.selectExpr("explode(array(TRUE, FALSE, TRUE)) as f", "t")
.select(ba(col("t")), expr("booland(f)"))
.show()
+----------+----------+
|booland(t)|booland(f)|
+----------+----------+
| true| false|
+----------+----------+
UDAFs are currently available only in Scala or Java. However, in Spark 2.3,
you will also be able to call Scala or Java UDFs and UDAFs by registering
the function just as we showed in the UDF section in Chapter 6. For more
information, go to SPARK-19439.
Conclusion
This chapter walked through the different types and kinds of aggregations
that you can perform in Spark. You learned about simple grouping-to
window functions as well as rollups and cubes. Chapter 8 discusses how to
perform joins to combine different data sources together.
Chapter 8. Joins
Chapter 7 covered aggregating single datasets, which is helpful, but more
often than not, your Spark applications are going to bring together a large
number of different datasets. For this reason, joins are an essential part of
nearly all Spark workloads. Spark’s ability to talk to different data means that
you gain the ability to tap into a variety of data sources across your company.
This chapter covers not just what joins exist in Spark and how to use them,
but some of the basic internals so that you can think about how Spark
actually goes about executing the join on the cluster. This basic knowledge
can help you avoid running out of memory and tackle problems that you
could not solve before.
Join Expressions
A join brings together two sets of data, the left and the right, by comparing
the value of one or more keys of the left and right and evaluating the result of
a join expression that determines whether Spark should bring together the left
set of data with the right set of data. The most common join expression, an
equi-join, compares whether the specified keys in your left and right
datasets are equal. If they are equal, Spark will combine the left and right
datasets. The opposite is true for keys that do not match; Spark discards the
rows that do not have matching keys. Spark also allows for much more
sophsticated join policies in addition to equi-joins. We can even use complex
types and perform something like checking whether a key exists within an
array when you perform a join.
Join Types
Whereas the join expression determines whether two rows should join, the
join type determines what should be in the result set. There are a variety of
different join types available in Spark for you to use:
Inner joins (keep rows with keys that exist in the left and right
datasets)
Outer joins (keep rows with keys in either the left or right datasets)
Left outer joins (keep rows with keys in the left dataset)
Right outer joins (keep rows with keys in the right dataset)
Left semi joins (keep the rows in the left, and only the left, dataset
where the key appears in the right dataset)
Left anti joins (keep the rows in the left, and only the left, dataset
where they do not appear in the right dataset)
Natural joins (perform a join by implicitly matching the columns
between the two datasets with the same names)
Cross (or Cartesian) joins (match every row in the left dataset with
every row in the right dataset)
If you have ever interacted with a relational database system, or even an
Excel spreadsheet, the concept of joining different datasets together should
not be too abstract. Let’s move on to showing examples of each join type.
This will make it easy to understand exactly how you can apply these to your
own problems. To do this, let’s create some simple datasets that we can use
in our examples:
// in Scala
val person = Seq(
(0, "Bill Chambers", 0, Seq(100)),
(1, "Matei Zaharia", 1, Seq(500, 250, 100)),
(2, "Michael Armbrust", 1, Seq(250, 100)))
.toDF("id", "name", "graduate_program", "spark_status")
val graduateProgram = Seq(
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley"))
.toDF("id", "degree", "department", "school")
val sparkStatus = Seq(
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor"))
.toDF("id", "status")
# in Python
person = spark.createDataFrame([
(0, "Bill Chambers", 0, [100]),
(1, "Matei Zaharia", 1, [500, 250, 100]),
(2, "Michael Armbrust", 1, [250, 100])])\
.toDF("id", "name", "graduate_program", "spark_status")
graduateProgram = spark.createDataFrame([
(0, "Masters", "School of Information", "UC Berkeley"),
(2, "Masters", "EECS", "UC Berkeley"),
(1, "Ph.D.", "EECS", "UC Berkeley")])\
.toDF("id", "degree", "department", "school")
sparkStatus = spark.createDataFrame([
(500, "Vice President"),
(250, "PMC Member"),
(100, "Contributor")])\
.toDF("id", "status")
Next, let’s register these as tables so that we use them throughout the chapter:
person.createOrReplaceTempView("person")
graduateProgram.createOrReplaceTempView("graduateProgram")
sparkStatus.createOrReplaceTempView("sparkStatus")
Inner Joins
Inner joins evaluate the keys in both of the DataFrames or tables and include
(and join together) only the rows that evaluate to true. In the following
example, we join the graduateProgram DataFrame with the person
DataFrame to create a new DataFrame:
// in Scala
val joinExpression = person.col("graduate_program") ===
graduateProgram.col("id")
# in Python
joinExpression = person["graduate_program"] == graduateProgram['id']
Keys that do not exist in both DataFrames will not show in the resulting
DataFrame. For example, the following expression would result in zero
values in the resulting DataFrame:
// in Scala
val wrongJoinExpression = person.col("name") === graduateProgram.col("school")
# in Python
wrongJoinExpression = person["name"] == graduateProgram["school"]
Inner joins are the default join, so we just need to specify our left DataFrame
and join the right in the JOIN expression:
person.join(graduateProgram, joinExpression).show()
-- in SQL
SELECT * FROM person JOIN graduateProgram
ON person.graduate_program = graduateProgram.id
+---+----------------+----------------+---------------+---+-------+----------+--
-
| id| name|graduate_program| spark_status| id|
degree|department|...
+---+----------------+----------------+---------------+---+-------+----------+--
-
| 0| Bill Chambers| 0| [100]| 0|Masters|
School...|...
| 1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.|
EECS|...
| 2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.|
EECS|...
+---+----------------+----------------+---------------+---+-------+----------+--
-
We can also specify this explicitly by passing in a third parameter, the
joinType:
// in Scala
var joinType = "inner"
# in Python
joinType = "inner"
person.join(graduateProgram, joinExpression, joinType).show()
-- in SQL
SELECT * FROM person INNER JOIN graduateProgram
ON person.graduate_program = graduateProgram.id
+---+----------------+----------------+---------------+---+-------+-------------
-
| id| name|graduate_program| spark_status| id| degree|
department...
+---+----------------+----------------+---------------+---+-------+-------------
-
| 0| Bill Chambers| 0| [100]| 0|Masters|
School...
| 1| Matei Zaharia| 1|[500, 250, 100]| 1| Ph.D.|
EECS...
| 2|Michael Armbrust| 1| [250, 100]| 1| Ph.D.|
EECS...
+---+----------------+----------------+---------------+---+-------+-------------
-
Outer Joins
Outer joins evaluate the keys in both of the DataFrames or tables and
includes (and joins together) the rows that evaluate to true or false. If there is
no equivalent row in either the left or right DataFrame, Spark will insert
null:
joinType = "outer"
person.join(graduateProgram, joinExpression, joinType).show()