Hadoop: The Definitive Guide Hadoop
User Manual:
Open the PDF directly: View PDF .
Page Count: 756
on ed
iti dat
Ed p
h &U
4t s e d
Re
vi
Hadoop: The Definitive Guide
Using Hadoop 2 exclusively, author Tom White presents new chapters
on YARN and several Hadoop-related projects such as Parquet, Flume,
Crunch, and Spark. You’ll learn about recent changes to Hadoop, and
explore new case studies on Hadoop’s role in healthcare systems and
genomics data processing.
■■
Learn fundamental components such as MapReduce, HDFS,
and YARN
■■
Explore MapReduce in depth, including steps for developing
applications with it
■■
Set up and maintain a Hadoop cluster running HDFS and
MapReduce on YARN
■■
Learn two data formats: Avro for data serialization and Parquet
for nested data
■■
Use data ingestion tools such as Flume (for streaming data) and
Sqoop (for bulk data transfer)
■■
Understand how high-level data processing tools like Pig, Hive,
Crunch, and Spark work with Hadoop
■■
Learn the HBase distributed database and the ZooKeeper
distributed configuration service
you have the
“Now
opportunity
to learn
about Hadoop from a
master—not only of the
technology, but also
of common sense and
plain talk.
”
—Doug Cutting
Cloudera
Tom White, an engineer at Cloudera and member of the Apache Software
Foundation, has been an Apache Hadoop committer since 2007. He has written
numerous articles for oreilly.com, java.net, and IBM’s developerWorks, and speaks
regularly about Hadoop at industry conferences.
US $49.99
Twitter: @oreillymedia
facebook.com/oreilly
White
PROGR AMMING L ANGUAGES/HADOOP
FOURTH EDITION
Hadoop:
The Definitive Guide
Get ready to unlock the power of your data. With the fourth edition of
this comprehensive guide, you’ll learn how to build and maintain reliable,
scalable, distributed systems with Apache Hadoop. This book is ideal for
programmers looking to analyze datasets of any size, and for administrators
who want to set up and run Hadoop clusters.
Hadoop
The Definitive Guide
STORAGE AND ANALYSIS AT INTERNET SCALE
CAN $57.99
ISBN: 978-1-491-90163-2
Tom White
on ed
iti dat
Ed p
h &U
4t s e d
Re
vi
Hadoop: The Definitive Guide
Using Hadoop 2 exclusively, author Tom White presents new chapters
on YARN and several Hadoop-related projects such as Parquet, Flume,
Crunch, and Spark. You’ll learn about recent changes to Hadoop, and
explore new case studies on Hadoop’s role in healthcare systems and
genomics data processing.
■■
Learn fundamental components such as MapReduce, HDFS,
and YARN
■■
Explore MapReduce in depth, including steps for developing
applications with it
■■
Set up and maintain a Hadoop cluster running HDFS and
MapReduce on YARN
■■
Learn two data formats: Avro for data serialization and Parquet
for nested data
■■
Use data ingestion tools such as Flume (for streaming data) and
Sqoop (for bulk data transfer)
■■
Understand how high-level data processing tools like Pig, Hive,
Crunch, and Spark work with Hadoop
■■
Learn the HBase distributed database and the ZooKeeper
distributed configuration service
you have the
“Now
opportunity
to learn
about Hadoop from a
master—not only of the
technology, but also
of common sense and
plain talk.
”
—Doug Cutting
Cloudera
Tom White, an engineer at Cloudera and member of the Apache Software
Foundation, has been an Apache Hadoop committer since 2007. He has written
numerous articles for oreilly.com, java.net, and IBM’s developerWorks, and speaks
regularly about Hadoop at industry conferences.
US $49.99
Twitter: @oreillymedia
facebook.com/oreilly
White
PROGR AMMING L ANGUAGES/HADOOP
FOURTH EDITION
Hadoop:
The Definitive Guide
Get ready to unlock the power of your data. With the fourth edition of
this comprehensive guide, you’ll learn how to build and maintain reliable,
scalable, distributed systems with Apache Hadoop. This book is ideal for
programmers looking to analyze datasets of any size, and for administrators
who want to set up and run Hadoop clusters.
Hadoop
The Definitive Guide
STORAGE AND ANALYSIS AT INTERNET SCALE
CAN $57.99
ISBN: 978-1-491-90163-2
Tom White
FOURTH EDITION
Hadoop: The Definitive Guide
Tom White
Hadoop: The Definitive Guide, Fourth Edition
by Tom White
Copyright © 2015 Tom White. All rights reserved.
Printed in the United States of America.
Published by O’Reilly Media, Inc., 1005 Gravenstein Highway North, Sebastopol, CA 95472.
O’Reilly books may be purchased for educational, business, or sales promotional use. Online editions are
also available for most titles (http://safaribooksonline.com). For more information, contact our corporate/
institutional sales department: 800-998-9938 or corporate@oreilly.com.
Editors: Mike Loukides and Meghan Blanchette
Production Editor: Matthew Hacker
Copyeditor: Jasmine Kwityn
Proofreader: Rachel Head
June 2009:
First Edition
October 2010:
Second Edition
May 2012:
Third Edition
April 2015:
Fourth Edition
Indexer: Lucie Haskins
Cover Designer: Ellie Volckhausen
Interior Designer: David Futato
Illustrator: Rebecca Demarest
Revision History for the Fourth Edition:
2015-03-19:
First release
2015-04-17:
Second release
See http://oreilly.com/catalog/errata.csp?isbn=9781491901632 for release details.
The O’Reilly logo is a registered trademark of O’Reilly Media, Inc. Hadoop: The Definitive Guide, the cover
image of an African elephant, and related trade dress are trademarks of O’Reilly Media, Inc.
Many of the designations used by manufacturers and sellers to distinguish their products are claimed as
trademarks. Where those designations appear in this book, and O’Reilly Media, Inc. was aware of a trademark
claim, the designations have been printed in caps or initial caps.
While the publisher and the author have used good faith efforts to ensure that the information and instruc‐
tions contained in this work are accurate, the publisher and the author disclaim all responsibility for errors
or omissions, including without limitation responsibility for damages resulting from the use of or reliance
on this work. Use of the information and instructions contained in this work is at your own risk. If any code
samples or other technology this work contains or describes is subject to open source licenses or the intel‐
lectual property rights of others, it is your responsibility to ensure that your use thereof complies with such
licenses and/or rights.
ISBN: 978-1-491-90163-2
[LSI]
For Eliane, Emilia, and Lottie
Table of Contents
Foreword. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xvii
Preface. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xix
Part I.
Hadoop Fundamentals
1. Meet Hadoop. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 3
Data!
Data Storage and Analysis
Querying All Your Data
Beyond Batch
Comparison with Other Systems
Relational Database Management Systems
Grid Computing
Volunteer Computing
A Brief History of Apache Hadoop
What’s in This Book?
3
5
6
6
8
8
10
11
12
15
2. MapReduce. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 19
A Weather Dataset
Data Format
Analyzing the Data with Unix Tools
Analyzing the Data with Hadoop
Map and Reduce
Java MapReduce
Scaling Out
Data Flow
Combiner Functions
Running a Distributed MapReduce Job
Hadoop Streaming
19
19
21
22
22
24
30
30
34
37
37
v
Ruby
Python
37
40
3. The Hadoop Distributed Filesystem. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 43
The Design of HDFS
HDFS Concepts
Blocks
Namenodes and Datanodes
Block Caching
HDFS Federation
HDFS High Availability
The Command-Line Interface
Basic Filesystem Operations
Hadoop Filesystems
Interfaces
The Java Interface
Reading Data from a Hadoop URL
Reading Data Using the FileSystem API
Writing Data
Directories
Querying the Filesystem
Deleting Data
Data Flow
Anatomy of a File Read
Anatomy of a File Write
Coherency Model
Parallel Copying with distcp
Keeping an HDFS Cluster Balanced
43
45
45
46
47
48
48
50
51
53
54
56
57
58
61
63
63
68
69
69
72
74
76
77
4. YARN. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 79
Anatomy of a YARN Application Run
Resource Requests
Application Lifespan
Building YARN Applications
YARN Compared to MapReduce 1
Scheduling in YARN
Scheduler Options
Capacity Scheduler Configuration
Fair Scheduler Configuration
Delay Scheduling
Dominant Resource Fairness
Further Reading
vi
| Table of Contents
80
81
82
82
83
85
86
88
90
94
95
96
5. Hadoop I/O. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 97
Data Integrity
Data Integrity in HDFS
LocalFileSystem
ChecksumFileSystem
Compression
Codecs
Compression and Input Splits
Using Compression in MapReduce
Serialization
The Writable Interface
Writable Classes
Implementing a Custom Writable
Serialization Frameworks
File-Based Data Structures
SequenceFile
MapFile
Other File Formats and Column-Oriented Formats
Part II.
97
98
99
99
100
101
105
107
109
110
113
121
126
127
127
135
136
MapReduce
6. Developing a MapReduce Application. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 141
The Configuration API
Combining Resources
Variable Expansion
Setting Up the Development Environment
Managing Configuration
GenericOptionsParser, Tool, and ToolRunner
Writing a Unit Test with MRUnit
Mapper
Reducer
Running Locally on Test Data
Running a Job in a Local Job Runner
Testing the Driver
Running on a Cluster
Packaging a Job
Launching a Job
The MapReduce Web UI
Retrieving the Results
Debugging a Job
Hadoop Logs
141
143
143
144
146
148
152
153
156
156
157
158
160
160
162
165
167
168
172
Table of Contents
|
vii
Remote Debugging
Tuning a Job
Profiling Tasks
MapReduce Workflows
Decomposing a Problem into MapReduce Jobs
JobControl
Apache Oozie
174
175
175
177
177
178
179
7. How MapReduce Works. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 185
Anatomy of a MapReduce Job Run
Job Submission
Job Initialization
Task Assignment
Task Execution
Progress and Status Updates
Job Completion
Failures
Task Failure
Application Master Failure
Node Manager Failure
Resource Manager Failure
Shuffle and Sort
The Map Side
The Reduce Side
Configuration Tuning
Task Execution
The Task Execution Environment
Speculative Execution
Output Committers
185
186
187
188
189
190
192
193
193
194
195
196
197
197
198
201
203
203
204
206
8. MapReduce Types and Formats. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 209
MapReduce Types
The Default MapReduce Job
Input Formats
Input Splits and Records
Text Input
Binary Input
Multiple Inputs
Database Input (and Output)
Output Formats
Text Output
Binary Output
viii
| Table of Contents
209
214
220
220
232
236
237
238
238
239
239
Multiple Outputs
Lazy Output
Database Output
240
245
245
9. MapReduce Features. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 247
Counters
Built-in Counters
User-Defined Java Counters
User-Defined Streaming Counters
Sorting
Preparation
Partial Sort
Total Sort
Secondary Sort
Joins
Map-Side Joins
Reduce-Side Joins
Side Data Distribution
Using the Job Configuration
Distributed Cache
MapReduce Library Classes
Part III.
247
247
251
255
255
256
257
259
262
268
269
270
273
273
274
279
Hadoop Operations
10. Setting Up a Hadoop Cluster. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 283
Cluster Specification
Cluster Sizing
Network Topology
Cluster Setup and Installation
Installing Java
Creating Unix User Accounts
Installing Hadoop
Configuring SSH
Configuring Hadoop
Formatting the HDFS Filesystem
Starting and Stopping the Daemons
Creating User Directories
Hadoop Configuration
Configuration Management
Environment Settings
Important Hadoop Daemon Properties
284
285
286
288
288
288
289
289
290
290
290
292
292
293
294
296
Table of Contents
|
ix
Hadoop Daemon Addresses and Ports
Other Hadoop Properties
Security
Kerberos and Hadoop
Delegation Tokens
Other Security Enhancements
Benchmarking a Hadoop Cluster
Hadoop Benchmarks
User Jobs
304
307
309
309
312
313
314
314
316
11. Administering Hadoop. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 317
HDFS
Persistent Data Structures
Safe Mode
Audit Logging
Tools
Monitoring
Logging
Metrics and JMX
Maintenance
Routine Administration Procedures
Commissioning and Decommissioning Nodes
Upgrades
Part IV.
317
317
322
324
325
330
330
331
332
332
334
337
Related Projects
12. Avro. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 345
Avro Data Types and Schemas
In-Memory Serialization and Deserialization
The Specific API
Avro Datafiles
Interoperability
Python API
Avro Tools
Schema Resolution
Sort Order
Avro MapReduce
Sorting Using Avro MapReduce
Avro in Other Languages
x
|
Table of Contents
346
349
351
352
354
354
355
355
358
359
363
365
13. Parquet. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 367
Data Model
Nested Encoding
Parquet File Format
Parquet Configuration
Writing and Reading Parquet Files
Avro, Protocol Buffers, and Thrift
Parquet MapReduce
368
370
370
372
373
375
377
14. Flume. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 381
Installing Flume
An Example
Transactions and Reliability
Batching
The HDFS Sink
Partitioning and Interceptors
File Formats
Fan Out
Delivery Guarantees
Replicating and Multiplexing Selectors
Distribution: Agent Tiers
Delivery Guarantees
Sink Groups
Integrating Flume with Applications
Component Catalog
Further Reading
381
382
384
385
385
387
387
388
389
390
390
393
395
398
399
400
15. Sqoop. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 401
Getting Sqoop
Sqoop Connectors
A Sample Import
Text and Binary File Formats
Generated Code
Additional Serialization Systems
Imports: A Deeper Look
Controlling the Import
Imports and Consistency
Incremental Imports
Direct-Mode Imports
Working with Imported Data
Imported Data and Hive
Importing Large Objects
401
403
403
406
407
407
408
410
411
411
411
412
413
415
Table of Contents
|
xi
Performing an Export
Exports: A Deeper Look
Exports and Transactionality
Exports and SequenceFiles
Further Reading
417
419
420
421
422
16. Pig. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 423
Installing and Running Pig
Execution Types
Running Pig Programs
Grunt
Pig Latin Editors
An Example
Generating Examples
Comparison with Databases
Pig Latin
Structure
Statements
Expressions
Types
Schemas
Functions
Macros
User-Defined Functions
A Filter UDF
An Eval UDF
A Load UDF
Data Processing Operators
Loading and Storing Data
Filtering Data
Grouping and Joining Data
Sorting Data
Combining and Splitting Data
Pig in Practice
Parallelism
Anonymous Relations
Parameter Substitution
Further Reading
424
424
426
426
427
427
429
430
432
432
433
438
439
441
445
447
448
448
452
453
456
456
457
459
465
466
466
467
467
467
469
17. Hive. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 471
Installing Hive
The Hive Shell
xii
|
Table of Contents
472
473
An Example
Running Hive
Configuring Hive
Hive Services
The Metastore
Comparison with Traditional Databases
Schema on Read Versus Schema on Write
Updates, Transactions, and Indexes
SQL-on-Hadoop Alternatives
HiveQL
Data Types
Operators and Functions
Tables
Managed Tables and External Tables
Partitions and Buckets
Storage Formats
Importing Data
Altering Tables
Dropping Tables
Querying Data
Sorting and Aggregating
MapReduce Scripts
Joins
Subqueries
Views
User-Defined Functions
Writing a UDF
Writing a UDAF
Further Reading
474
475
475
478
480
482
482
483
484
485
486
488
489
490
491
496
500
502
502
503
503
503
505
508
509
510
511
513
518
18. Crunch. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 519
An Example
The Core Crunch API
Primitive Operations
Types
Sources and Targets
Functions
Materialization
Pipeline Execution
Running a Pipeline
Stopping a Pipeline
Inspecting a Crunch Plan
520
523
523
528
531
533
535
538
538
539
540
Table of Contents
|
xiii
Iterative Algorithms
Checkpointing a Pipeline
Crunch Libraries
Further Reading
543
545
545
548
19. Spark. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 549
Installing Spark
An Example
Spark Applications, Jobs, Stages, and Tasks
A Scala Standalone Application
A Java Example
A Python Example
Resilient Distributed Datasets
Creation
Transformations and Actions
Persistence
Serialization
Shared Variables
Broadcast Variables
Accumulators
Anatomy of a Spark Job Run
Job Submission
DAG Construction
Task Scheduling
Task Execution
Executors and Cluster Managers
Spark on YARN
Further Reading
550
550
552
552
554
555
556
556
557
560
562
564
564
564
565
565
566
569
570
570
571
574
20. HBase. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 575
HBasics
Backdrop
Concepts
Whirlwind Tour of the Data Model
Implementation
Installation
Test Drive
Clients
Java
MapReduce
REST and Thrift
Building an Online Query Application
xiv
| Table of Contents
575
576
576
576
578
581
582
584
584
587
589
589
Schema Design
Loading Data
Online Queries
HBase Versus RDBMS
Successful Service
HBase
Praxis
HDFS
UI
Metrics
Counters
Further Reading
590
591
594
597
598
599
600
600
601
601
601
601
21. ZooKeeper. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 603
Installing and Running ZooKeeper
An Example
Group Membership in ZooKeeper
Creating the Group
Joining a Group
Listing Members in a Group
Deleting a Group
The ZooKeeper Service
Data Model
Operations
Implementation
Consistency
Sessions
States
Building Applications with ZooKeeper
A Configuration Service
The Resilient ZooKeeper Application
A Lock Service
More Distributed Data Structures and Protocols
ZooKeeper in Production
Resilience and Performance
Configuration
Further Reading
604
606
606
607
609
610
612
613
614
616
620
621
623
625
627
627
630
634
636
637
637
639
640
Table of Contents
|
xv
Part V.
Case Studies
22. Composable Data at Cerner. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 643
From CPUs to Semantic Integration
Enter Apache Crunch
Building a Complete Picture
Integrating Healthcare Data
Composability over Frameworks
Moving Forward
643
644
644
647
650
651
23. Biological Data Science: Saving Lives with Software. . . . . . . . . . . . . . . . . . . . . . . . . . . . 653
The Structure of DNA
The Genetic Code: Turning DNA Letters into Proteins
Thinking of DNA as Source Code
The Human Genome Project and Reference Genomes
Sequencing and Aligning DNA
ADAM, A Scalable Genome Analysis Platform
Literate programming with the Avro interface description language (IDL)
Column-oriented access with Parquet
A simple example: k-mer counting using Spark and ADAM
From Personalized Ads to Personalized Medicine
Join In
655
656
657
659
660
661
662
663
665
667
668
24. Cascading. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 669
Fields, Tuples, and Pipes
Operations
Taps, Schemes, and Flows
Cascading in Practice
Flexibility
Hadoop and Cascading at ShareThis
Summary
670
673
675
676
679
680
684
A. Installing Apache Hadoop. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 685
B. Cloudera’s Distribution Including Apache Hadoop. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 691
C. Preparing the NCDC Weather Data. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 693
D. The Old and New Java MapReduce APIs. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 697
Index. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 701
xvi
|
Table of Contents
Foreword
Hadoop got its start in Nutch. A few of us were attempting to build an open source web
search engine and having trouble managing computations running on even a handful
of computers. Once Google published its GFS and MapReduce papers, the route became
clear. They’d devised systems to solve precisely the problems we were having with Nutch.
So we started, two of us, half-time, to try to re-create these systems as a part of Nutch.
We managed to get Nutch limping along on 20 machines, but it soon became clear that
to handle the Web’s massive scale, we’d need to run it on thousands of machines, and
moreover, that the job was bigger than two half-time developers could handle.
Around that time, Yahoo! got interested, and quickly put together a team that I joined.
We split off the distributed computing part of Nutch, naming it Hadoop. With the help
of Yahoo!, Hadoop soon grew into a technology that could truly scale to the Web.
In 2006, Tom White started contributing to Hadoop. I already knew Tom through an
excellent article he’d written about Nutch, so I knew he could present complex ideas in
clear prose. I soon learned that he could also develop software that was as pleasant to
read as his prose.
From the beginning, Tom’s contributions to Hadoop showed his concern for users and
for the project. Unlike most open source contributors, Tom is not primarily interested
in tweaking the system to better meet his own needs, but rather in making it easier for
anyone to use.
Initially, Tom specialized in making Hadoop run well on Amazon’s EC2 and S3 services.
Then he moved on to tackle a wide variety of problems, including improving the Map‐
Reduce APIs, enhancing the website, and devising an object serialization framework.
In all cases, Tom presented his ideas precisely. In short order, Tom earned the role of
Hadoop committer and soon thereafter became a member of the Hadoop Project Man‐
agement Committee.
xvii
Tom is now a respected senior member of the Hadoop developer community. Though
he’s an expert in many technical corners of the project, his specialty is making Hadoop
easier to use and understand.
Given this, I was very pleased when I learned that Tom intended to write a book about
Hadoop. Who could be better qualified? Now you have the opportunity to learn about
Hadoop from a master—not only of the technology, but also of common sense and
plain talk.
—Doug Cutting, April 2009
Shed in the Yard, California
xviii
|
Foreword
Preface
Martin Gardner, the mathematics and science writer, once said in an interview:
Beyond calculus, I am lost. That was the secret of my column’s success. It took me so long
to understand what I was writing about that I knew how to write in a way most readers
would understand.1
In many ways, this is how I feel about Hadoop. Its inner workings are complex, resting
as they do on a mixture of distributed systems theory, practical engineering, and com‐
mon sense. And to the uninitiated, Hadoop can appear alien.
But it doesn’t need to be like this. Stripped to its core, the tools that Hadoop provides
for working with big data are simple. If there’s a common theme, it is about raising the
level of abstraction—to create building blocks for programmers who have lots of data
to store and analyze, and who don’t have the time, the skill, or the inclination to become
distributed systems experts to build the infrastructure to handle it.
With such a simple and generally applicable feature set, it seemed obvious to me when
I started using it that Hadoop deserved to be widely used. However, at the time (in early
2006), setting up, configuring, and writing programs to use Hadoop was an art. Things
have certainly improved since then: there is more documentation, there are more ex‐
amples, and there are thriving mailing lists to go to when you have questions. And yet
the biggest hurdle for newcomers is understanding what this technology is capable of,
where it excels, and how to use it. That is why I wrote this book.
The Apache Hadoop community has come a long way. Since the publication of the first
edition of this book, the Hadoop project has blossomed. “Big data” has become a house‐
hold term. 2 In this time, the software has made great leaps in adoption, performance,
reliability, scalability, and manageability. The number of things being built and run on
the Hadoop platform has grown enormously. In fact, it’s difficult for one person to keep
1. Alex Bellos, “The science of fun,” The Guardian, May 31, 2008.
2. It was added to the Oxford English Dictionary in 2013.
xix
track. To gain even wider adoption, I believe we need to make Hadoop even easier to
use. This will involve writing more tools; integrating with even more systems; and writ‐
ing new, improved APIs. I’m looking forward to being a part of this, and I hope this
book will encourage and enable others to do so, too.
Administrative Notes
During discussion of a particular Java class in the text, I often omit its package name to
reduce clutter. If you need to know which package a class is in, you can easily look it up
in the Java API documentation for Hadoop (linked to from the Apache Hadoop home
page), or the relevant project. Or if you’re using an integrated development environment
(IDE), its auto-complete mechanism can help find what you’re looking for.
Similarly, although it deviates from usual style guidelines, program listings that import
multiple classes from the same package may use the asterisk wildcard character to save
space (for example, import org.apache.hadoop.io.*).
The sample programs in this book are available for download from the book’s website.
You will also find instructions there for obtaining the datasets that are used in examples
throughout the book, as well as further notes for running the programs in the book and
links to updates, additional resources, and my blog.
What’s New in the Fourth Edition?
The fourth edition covers Hadoop 2 exclusively. The Hadoop 2 release series is the
current active release series and contains the most stable versions of Hadoop.
There are new chapters covering YARN (Chapter 4), Parquet (Chapter 13), Flume
(Chapter 14), Crunch (Chapter 18), and Spark (Chapter 19). There’s also a new section
to help readers navigate different pathways through the book (“What’s in This Book?”
on page 15).
This edition includes two new case studies (Chapters 22 and 23): one on how Hadoop
is used in healthcare systems, and another on using Hadoop technologies for genomics
data processing. Case studies from the previous editions can now be found online.
Many corrections, updates, and improvements have been made to existing chapters to
bring them up to date with the latest releases of Hadoop and its related projects.
What’s New in the Third Edition?
The third edition covers the 1.x (formerly 0.20) release series of Apache Hadoop, as well
as the newer 0.22 and 2.x (formerly 0.23) series. With a few exceptions, which are noted
in the text, all the examples in this book run against these versions.
xx
|
Preface
This edition uses the new MapReduce API for most of the examples. Because the old
API is still in widespread use, it continues to be discussed in the text alongside the new
API, and the equivalent code using the old API can be found on the book’s website.
The major change in Hadoop 2.0 is the new MapReduce runtime, MapReduce 2, which
is built on a new distributed resource management system called YARN. This edition
includes new sections covering MapReduce on YARN: how it works (Chapter 7) and
how to run it (Chapter 10).
There is more MapReduce material, too, including development practices such as pack‐
aging MapReduce jobs with Maven, setting the user’s Java classpath, and writing tests
with MRUnit (all in Chapter 6). In addition, there is more depth on features such as
output committers and the distributed cache (both in Chapter 9), as well as task memory
monitoring (Chapter 10). There is a new section on writing MapReduce jobs to process
Avro data (Chapter 12), and one on running a simple MapReduce workflow in Oozie
(Chapter 6).
The chapter on HDFS (Chapter 3) now has introductions to high availability, federation,
and the new WebHDFS and HttpFS filesystems.
The chapters on Pig, Hive, Sqoop, and ZooKeeper have all been expanded to cover the
new features and changes in their latest releases.
In addition, numerous corrections and improvements have been made throughout the
book.
What’s New in the Second Edition?
The second edition has two new chapters on Sqoop and Hive (Chapters 15 and 17,
respectively), a new section covering Avro (in Chapter 12), an introduction to the new
security features in Hadoop (in Chapter 10), and a new case study on analyzing massive
network graphs using Hadoop.
This edition continues to describe the 0.20 release series of Apache Hadoop, because
this was the latest stable release at the time of writing. New features from later releases
are occasionally mentioned in the text, however, with reference to the version that they
were introduced in.
Conventions Used in This Book
The following typographical conventions are used in this book:
Italic
Indicates new terms, URLs, email addresses, filenames, and file extensions.
Preface
|
xxi
Constant width
Used for program listings, as well as within paragraphs to refer to commands and
command-line options and to program elements such as variable or function
names, databases, data types, environment variables, statements, and keywords.
Constant width bold
Shows commands or other text that should be typed literally by the user.
Constant width italic
Shows text that should be replaced with user-supplied values or by values deter‐
mined by context.
This icon signifies a general note.
This icon signifies a tip or suggestion.
This icon indicates a warning or caution.
Using Code Examples
Supplemental material (code, examples, exercise, etc.) is available for download at this
book’s website and on GitHub.
This book is here to help you get your job done. In general, you may use the code in
this book in your programs and documentation. You do not need to contact us for
permission unless you’re reproducing a significant portion of the code. For example,
writing a program that uses several chunks of code from this book does not require
permission. Selling or distributing a CD-ROM of examples from O’Reilly books does
require permission. Answering a question by citing this book and quoting example code
does not require permission. Incorporating a significant amount of example code from
this book into your product’s documentation does require permission.
xxii
|
Preface
We appreciate, but do not require, attribution. An attribution usually includes the title,
author, publisher, and ISBN. For example: “Hadoop: The Definitive Guide, Fourth Ed‐
ition, by Tom White (O’Reilly). Copyright 2015 Tom White, 978-1-491-90163-2.”
If you feel your use of code examples falls outside fair use or the permission given here,
feel free to contact us at permissions@oreilly.com.
Safari® Books Online
Safari Books Online is an on-demand digital library that
delivers expert content in both book and video form from
the world’s leading authors in technology and business.
Technology professionals, software developers, web designers, and business and crea‐
tive professionals use Safari Books Online as their primary resource for research, prob‐
lem solving, learning, and certification training.
Safari Books Online offers a range of plans and pricing for enterprise, government,
education, and individuals.
Members have access to thousands of books, training videos, and prepublication manu‐
scripts in one fully searchable database from publishers like O’Reilly Media, Prentice
Hall Professional, Addison-Wesley Professional, Microsoft Press, Sams, Que, Peachpit
Press, Focal Press, Cisco Press, John Wiley & Sons, Syngress, Morgan Kaufmann, IBM
Redbooks, Packt, Adobe Press, FT Press, Apress, Manning, New Riders, McGraw-Hill,
Jones & Bartlett, Course Technology, and hundreds more. For more information about
Safari Books Online, please visit us online.
How to Contact Us
Please address comments and questions concerning this book to the publisher:
O’Reilly Media, Inc.
1005 Gravenstein Highway North
Sebastopol, CA 95472
800-998-9938 (in the United States or Canada)
707-829-0515 (international or local)
707-829-0104 (fax)
We have a web page for this book, where we list errata, examples, and any additional
information. You can access this page at http://bit.ly/hadoop_tdg_4e.
To comment or ask technical questions about this book, send email to
bookquestions@oreilly.com.
Preface
|
xxiii
For more information about our books, courses, conferences, and news, see our website
at http://www.oreilly.com.
Find us on Facebook: http://facebook.com/oreilly
Follow us on Twitter: http://twitter.com/oreillymedia
Watch us on YouTube: http://www.youtube.com/oreillymedia
Acknowledgments
I have relied on many people, both directly and indirectly, in writing this book. I would
like to thank the Hadoop community, from whom I have learned, and continue to learn,
a great deal.
In particular, I would like to thank Michael Stack and Jonathan Gray for writing the
chapter on HBase. Thanks also go to Adrian Woodhead, Marc de Palol, Joydeep Sen
Sarma, Ashish Thusoo, Andrzej Białecki, Stu Hood, Chris K. Wensel, and Owen
O’Malley for contributing case studies.
I would like to thank the following reviewers who contributed many helpful suggestions
and improvements to my drafts: Raghu Angadi, Matt Biddulph, Christophe Bisciglia,
Ryan Cox, Devaraj Das, Alex Dorman, Chris Douglas, Alan Gates, Lars George, Patrick
Hunt, Aaron Kimball, Peter Krey, Hairong Kuang, Simon Maxen, Olga Natkovich,
Benjamin Reed, Konstantin Shvachko, Allen Wittenauer, Matei Zaharia, and Philip
Zeyliger. Ajay Anand kept the review process flowing smoothly. Philip (“flip”) Kromer
kindly helped me with the NCDC weather dataset featured in the examples in this book.
Special thanks to Owen O’Malley and Arun C. Murthy for explaining the intricacies of
the MapReduce shuffle to me. Any errors that remain are, of course, to be laid at my
door.
For the second edition, I owe a debt of gratitude for the detailed reviews and feedback
from Jeff Bean, Doug Cutting, Glynn Durham, Alan Gates, Jeff Hammerbacher, Alex
Kozlov, Ken Krugler, Jimmy Lin, Todd Lipcon, Sarah Sproehnle, Vinithra Varadharajan,
and Ian Wrigley, as well as all the readers who submitted errata for the first edition. I
would also like to thank Aaron Kimball for contributing the chapter on Sqoop, and
Philip (“flip”) Kromer for the case study on graph processing.
For the third edition, thanks go to Alejandro Abdelnur, Eva Andreasson, Eli Collins,
Doug Cutting, Patrick Hunt, Aaron Kimball, Aaron T. Myers, Brock Noland, Arvind
Prabhakar, Ahmed Radwan, and Tom Wheeler for their feedback and suggestions. Rob
Weltman kindly gave very detailed feedback for the whole book, which greatly improved
the final manuscript. Thanks also go to all the readers who submitted errata for the
second edition.
xxiv
|
Preface
For the fourth edition, I would like to thank Jodok Batlogg, Meghan Blanchette, Ryan
Blue, Jarek Jarcec Cecho, Jules Damji, Dennis Dawson, Matthew Gast, Karthik Kam‐
batla, Julien Le Dem, Brock Noland, Sandy Ryza, Akshai Sarma, Ben Spivey, Michael
Stack, Kate Ting, Josh Walter, Josh Wills, and Adrian Woodhead for all of their invaluable
review feedback. Ryan Brush, Micah Whitacre, and Matt Massie kindly contributed new
case studies for this edition. Thanks again to all the readers who submitted errata.
I am particularly grateful to Doug Cutting for his encouragement, support, and friend‐
ship, and for contributing the Foreword.
Thanks also go to the many others with whom I have had conversations or email
discussions over the course of writing the book.
Halfway through writing the first edition of this book, I joined Cloudera, and I want to
thank my colleagues for being incredibly supportive in allowing me the time to write
and to get it finished promptly.
I am grateful to my editors, Mike Loukides and Meghan Blanchette, and their colleagues
at O’Reilly for their help in the preparation of this book. Mike and Meghan have been
there throughout to answer my questions, to read my first drafts, and to keep me on
schedule.
Finally, the writing of this book has been a great deal of work, and I couldn’t have done
it without the constant support of my family. My wife, Eliane, not only kept the home
going, but also stepped in to help review, edit, and chase case studies. My daughters,
Emilia and Lottie, have been very understanding, and I’m looking forward to spending
lots more time with all of them.
Preface
|
xxv
PART I
Hadoop Fundamentals
CHAPTER 1
Meet Hadoop
In pioneer days they used oxen for heavy pulling, and when one ox couldn’t budge a log,
they didn’t try to grow a larger ox. We shouldn’t be trying for bigger computers, but for
more systems of computers.
—Grace Hopper
Data!
We live in the data age. It’s not easy to measure the total volume of data stored elec‐
tronically, but an IDC estimate put the size of the “digital universe” at 4.4 zettabytes in
2013 and is forecasting a tenfold growth by 2020 to 44 zettabytes.1 A zettabyte is 1021
bytes, or equivalently one thousand exabytes, one million petabytes, or one billion
terabytes. That’s more than one disk drive for every person in the world.
This flood of data is coming from many sources. Consider the following:2
• The New York Stock Exchange generates about 4−5 terabytes of data per day.
• Facebook hosts more than 240 billion photos, growing at 7 petabytes per month.
• Ancestry.com, the genealogy site, stores around 10 petabytes of data.
• The Internet Archive stores around 18.5 petabytes of data.
1. These statistics were reported in a study entitled “The Digital Universe of Opportunities: Rich Data and the
Increasing Value of the Internet of Things.”
2. All figures are from 2013 or 2014. For more information, see Tom Groenfeldt, “At NYSE, The Data Deluge
Overwhelms Traditional Databases”; Rich Miller, “Facebook Builds Exabyte Data Centers for Cold Stor‐
age”; Ancestry.com’s “Company Facts”; Archive.org’s “Petabox”; and the Worldwide LHC Computing Grid
project’s welcome page.
3
• The Large Hadron Collider near Geneva, Switzerland, produces about 30 petabytes
of data per year.
So there’s a lot of data out there. But you are probably wondering how it affects you.
Most of the data is locked up in the largest web properties (like search engines) or in
scientific or financial institutions, isn’t it? Does the advent of big data affect smaller
organizations or individuals?
I argue that it does. Take photos, for example. My wife’s grandfather was an avid pho‐
tographer and took photographs throughout his adult life. His entire corpus of mediumformat, slide, and 35mm film, when scanned in at high resolution, occupies around 10
gigabytes. Compare this to the digital photos my family took in 2008, which take up
about 5 gigabytes of space. My family is producing photographic data at 35 times the
rate my wife’s grandfather’s did, and the rate is increasing every year as it becomes easier
to take more and more photos.
More generally, the digital streams that individuals are producing are growing apace.
Microsoft Research’s MyLifeBits project gives a glimpse of the archiving of personal
information that may become commonplace in the near future. MyLifeBits was an ex‐
periment where an individual’s interactions—phone calls, emails, documents—were
captured electronically and stored for later access. The data gathered included a photo
taken every minute, which resulted in an overall data volume of 1 gigabyte per month.
When storage costs come down enough to make it feasible to store continuous audio
and video, the data volume for a future MyLifeBits service will be many times that.
The trend is for every individual’s data footprint to grow, but perhaps more significantly,
the amount of data generated by machines as a part of the Internet of Things will be
even greater than that generated by people. Machine logs, RFID readers, sensor net‐
works, vehicle GPS traces, retail transactions—all of these contribute to the growing
mountain of data.
The volume of data being made publicly available increases every year, too. Organiza‐
tions no longer have to merely manage their own data; success in the future will be
dictated to a large extent by their ability to extract value from other organizations’ data.
Initiatives such as Public Data Sets on Amazon Web Services and Infochimps.org exist
to foster the “information commons,” where data can be freely (or for a modest price)
shared for anyone to download and analyze. Mashups between different information
sources make for unexpected and hitherto unimaginable applications.
Take, for example, the Astrometry.net project, which watches the Astrometry group on
Flickr for new photos of the night sky. It analyzes each image and identifies which part
of the sky it is from, as well as any interesting celestial bodies, such as stars or galaxies.
This project shows the kinds of things that are possible when data (in this case, tagged
photographic images) is made available and used for something (image analysis) that
was not anticipated by the creator.
4
|
Chapter 1: Meet Hadoop
It has been said that “more data usually beats better algorithms,” which is to say that for
some problems (such as recommending movies or music based on past preferences),
however fiendish your algorithms, often they can be beaten simply by having more data
(and a less sophisticated algorithm).3
The good news is that big data is here. The bad news is that we are struggling to store
and analyze it.
Data Storage and Analysis
The problem is simple: although the storage capacities of hard drives have increased
massively over the years, access speeds—the rate at which data can be read from drives—
have not kept up. One typical drive from 1990 could store 1,370 MB of data and had a
transfer speed of 4.4 MB/s,4 so you could read all the data from a full drive in around
five minutes. Over 20 years later, 1-terabyte drives are the norm, but the transfer speed
is around 100 MB/s, so it takes more than two and a half hours to read all the data off
the disk.
This is a long time to read all data on a single drive—and writing is even slower. The
obvious way to reduce the time is to read from multiple disks at once. Imagine if we had
100 drives, each holding one hundredth of the data. Working in parallel, we could read
the data in under two minutes.
Using only one hundredth of a disk may seem wasteful. But we can store 100 datasets,
each of which is 1 terabyte, and provide shared access to them. We can imagine that the
users of such a system would be happy to share access in return for shorter analysis
times, and statistically, that their analysis jobs would be likely to be spread over time,
so they wouldn’t interfere with each other too much.
There’s more to being able to read and write data in parallel to or from multiple disks,
though.
The first problem to solve is hardware failure: as soon as you start using many pieces of
hardware, the chance that one will fail is fairly high. A common way of avoiding data
loss is through replication: redundant copies of the data are kept by the system so that
in the event of failure, there is another copy available. This is how RAID works, for
instance, although Hadoop’s filesystem, the Hadoop Distributed Filesystem (HDFS),
takes a slightly different approach, as you shall see later.
3. The quote is from Anand Rajaraman’s blog post “More data usually beats better algorithms,” in which he
writes about the Netflix Challenge. Alon Halevy, Peter Norvig, and Fernando Pereira make the same point
in “The Unreasonable Effectiveness of Data,” IEEE Intelligent Systems, March/April 2009.
4. These specifications are for the Seagate ST-41600n.
Data Storage and Analysis
|
5
The second problem is that most analysis tasks need to be able to combine the data in
some way, and data read from one disk may need to be combined with data from any
of the other 99 disks. Various distributed systems allow data to be combined from mul‐
tiple sources, but doing this correctly is notoriously challenging. MapReduce provides
a programming model that abstracts the problem from disk reads and writes, trans‐
forming it into a computation over sets of keys and values. We look at the details of this
model in later chapters, but the important point for the present discussion is that there
are two parts to the computation—the map and the reduce—and it’s the interface be‐
tween the two where the “mixing” occurs. Like HDFS, MapReduce has built-in
reliability.
In a nutshell, this is what Hadoop provides: a reliable, scalable platform for storage and
analysis. What’s more, because it runs on commodity hardware and is open source,
Hadoop is affordable.
Querying All Your Data
The approach taken by MapReduce may seem like a brute-force approach. The premise
is that the entire dataset—or at least a good portion of it—can be processed for each
query. But this is its power. MapReduce is a batch query processor, and the ability to
run an ad hoc query against your whole dataset and get the results in a reasonable time
is transformative. It changes the way you think about data and unlocks data that was
previously archived on tape or disk. It gives people the opportunity to innovate with
data. Questions that took too long to get answered before can now be answered, which
in turn leads to new questions and new insights.
For example, Mailtrust, Rackspace’s mail division, used Hadoop for processing email
logs. One ad hoc query they wrote was to find the geographic distribution of their users.
In their words:
This data was so useful that we’ve scheduled the MapReduce job to run monthly and we
will be using this data to help us decide which Rackspace data centers to place new mail
servers in as we grow.
By bringing several hundred gigabytes of data together and having the tools to analyze
it, the Rackspace engineers were able to gain an understanding of the data that they
otherwise would never have had, and furthermore, they were able to use what they had
learned to improve the service for their customers.
Beyond Batch
For all its strengths, MapReduce is fundamentally a batch processing system, and is not
suitable for interactive analysis. You can’t run a query and get results back in a few
seconds or less. Queries typically take minutes or more, so it’s best for offline use, where
there isn’t a human sitting in the processing loop waiting for results.
6
|
Chapter 1: Meet Hadoop
However, since its original incarnation, Hadoop has evolved beyond batch processing.
Indeed, the term “Hadoop” is sometimes used to refer to a larger ecosystem of projects,
not just HDFS and MapReduce, that fall under the umbrella of infrastructure for dis‐
tributed computing and large-scale data processing. Many of these are hosted by the
Apache Software Foundation, which provides support for a community of open source
software projects, including the original HTTP Server from which it gets its name.
The first component to provide online access was HBase, a key-value store that uses
HDFS for its underlying storage. HBase provides both online read/write access of in‐
dividual rows and batch operations for reading and writing data in bulk, making it a
good solution for building applications on.
The real enabler for new processing models in Hadoop was the introduction of YARN
(which stands for Yet Another Resource Negotiator) in Hadoop 2. YARN is a cluster
resource management system, which allows any distributed program (not just MapRe‐
duce) to run on data in a Hadoop cluster.
In the last few years, there has been a flowering of different processing patterns that
work with Hadoop. Here is a sample:
Interactive SQL
By dispensing with MapReduce and using a distributed query engine that uses
dedicated “always on” daemons (like Impala) or container reuse (like Hive on Tez),
it’s possible to achieve low-latency responses for SQL queries on Hadoop while still
scaling up to large dataset sizes.
Iterative processing
Many algorithms—such as those in machine learning—are iterative in nature, so
it’s much more efficient to hold each intermediate working set in memory, com‐
pared to loading from disk on each iteration. The architecture of MapReduce does
not allow this, but it’s straightforward with Spark, for example, and it enables a
highly exploratory style of working with datasets.
Stream processing
Streaming systems like Storm, Spark Streaming, or Samza make it possible to run
real-time, distributed computations on unbounded streams of data and emit results
to Hadoop storage or external systems.
Search
The Solr search platform can run on a Hadoop cluster, indexing documents as they
are added to HDFS, and serving search queries from indexes stored in HDFS.
Despite the emergence of different processing frameworks on Hadoop, MapReduce still
has a place for batch processing, and it is useful to understand how it works since it
introduces several concepts that apply more generally (like the idea of input formats,
or how a dataset is split into pieces).
Beyond Batch
|
7
Comparison with Other Systems
Hadoop isn’t the first distributed system for data storage and analysis, but it has some
unique properties that set it apart from other systems that may seem similar. Here we
look at some of them.
Relational Database Management Systems
Why can’t we use databases with lots of disks to do large-scale analysis? Why is Hadoop
needed?
The answer to these questions comes from another trend in disk drives: seek time is
improving more slowly than transfer rate. Seeking is the process of moving the disk’s
head to a particular place on the disk to read or write data. It characterizes the latency
of a disk operation, whereas the transfer rate corresponds to a disk’s bandwidth.
If the data access pattern is dominated by seeks, it will take longer to read or write large
portions of the dataset than streaming through it, which operates at the transfer rate.
On the other hand, for updating a small proportion of records in a database, a traditional
B-Tree (the data structure used in relational databases, which is limited by the rate at
which it can perform seeks) works well. For updating the majority of a database, a BTree is less efficient than MapReduce, which uses Sort/Merge to rebuild the database.
In many ways, MapReduce can be seen as a complement to a Relational Database Man‐
agement System (RDBMS). (The differences between the two systems are shown in
Table 1-1.) MapReduce is a good fit for problems that need to analyze the whole dataset
in a batch fashion, particularly for ad hoc analysis. An RDBMS is good for point queries
or updates, where the dataset has been indexed to deliver low-latency retrieval and
update times of a relatively small amount of data. MapReduce suits applications where
the data is written once and read many times, whereas a relational database is good for
datasets that are continually updated.5
Table 1-1. RDBMS compared to MapReduce
Traditional RDBMS
MapReduce
Data size
Gigabytes
Petabytes
Access
Interactive and batch
Batch
Updates
Read and write many times
Write once, read many times
Transactions
ACID
None
5. In January 2007, David J. DeWitt and Michael Stonebraker caused a stir by publishing “MapReduce: A major
step backwards,” in which they criticized MapReduce for being a poor substitute for relational databases.
Many commentators argued that it was a false comparison (see, for example, Mark C. Chu-Carroll’s “Data‐
bases are hammers; MapReduce is a screwdriver”), and DeWitt and Stonebraker followed up with “MapRe‐
duce II,” where they addressed the main topics brought up by others.
8
| Chapter 1: Meet Hadoop
Traditional RDBMS
MapReduce
Structure
Schema-on-write
Schema-on-read
Integrity
High
Low
Scaling
Nonlinear
Linear
However, the differences between relational databases and Hadoop systems are blurring.
Relational databases have started incorporating some of the ideas from Hadoop, and
from the other direction, Hadoop systems such as Hive are becoming more interactive
(by moving away from MapReduce) and adding features like indexes and transactions
that make them look more and more like traditional RDBMSs.
Another difference between Hadoop and an RDBMS is the amount of structure in the
datasets on which they operate. Structured data is organized into entities that have a
defined format, such as XML documents or database tables that conform to a particular
predefined schema. This is the realm of the RDBMS. Semi-structured data, on the other
hand, is looser, and though there may be a schema, it is often ignored, so it may be used
only as a guide to the structure of the data: for example, a spreadsheet, in which the
structure is the grid of cells, although the cells themselves may hold any form of data.
Unstructured data does not have any particular internal structure: for example, plain
text or image data. Hadoop works well on unstructured or semi-structured data because
it is designed to interpret the data at processing time (so called schema-on-read). This
provides flexibility and avoids the costly data loading phase of an RDBMS, since in
Hadoop it is just a file copy.
Relational data is often normalized to retain its integrity and remove redundancy.
Normalization poses problems for Hadoop processing because it makes reading a record
a nonlocal operation, and one of the central assumptions that Hadoop makes is that it
is possible to perform (high-speed) streaming reads and writes.
A web server log is a good example of a set of records that is not normalized (for example,
the client hostnames are specified in full each time, even though the same client may
appear many times), and this is one reason that logfiles of all kinds are particularly well
suited to analysis with Hadoop. Note that Hadoop can perform joins; it’s just that they
are not used as much as in the relational world.
MapReduce—and the other processing models in Hadoop—scales linearly with the size
of the data. Data is partitioned, and the functional primitives (like map and reduce) can
work in parallel on separate partitions. This means that if you double the size of the
input data, a job will run twice as slowly. But if you also double the size of the cluster, a
job will run as fast as the original one. This is not generally true of SQL queries.
Comparison with Other Systems
|
9
Grid Computing
The high-performance computing (HPC) and grid computing communities have been
doing large-scale data processing for years, using such application program interfaces
(APIs) as the Message Passing Interface (MPI). Broadly, the approach in HPC is to
distribute the work across a cluster of machines, which access a shared filesystem, hosted
by a storage area network (SAN). This works well for predominantly compute-intensive
jobs, but it becomes a problem when nodes need to access larger data volumes (hundreds
of gigabytes, the point at which Hadoop really starts to shine), since the network band‐
width is the bottleneck and compute nodes become idle.
Hadoop tries to co-locate the data with the compute nodes, so data access is fast because
it is local.6 This feature, known as data locality, is at the heart of data processing in
Hadoop and is the reason for its good performance. Recognizing that network band‐
width is the most precious resource in a data center environment (it is easy to saturate
network links by copying data around), Hadoop goes to great lengths to conserve it by
explicitly modeling network topology. Notice that this arrangement does not preclude
high-CPU analyses in Hadoop.
MPI gives great control to programmers, but it requires that they explicitly handle the
mechanics of the data flow, exposed via low-level C routines and constructs such as
sockets, as well as the higher-level algorithms for the analyses. Processing in Hadoop
operates only at the higher level: the programmer thinks in terms of the data model
(such as key-value pairs for MapReduce), while the data flow remains implicit.
Coordinating the processes in a large-scale distributed computation is a challenge. The
hardest aspect is gracefully handling partial failure—when you don’t know whether or
not a remote process has failed—and still making progress with the overall computation.
Distributed processing frameworks like MapReduce spare the programmer from having
to think about failure, since the implementation detects failed tasks and reschedules
replacements on machines that are healthy. MapReduce is able to do this because it is a
shared-nothing architecture, meaning that tasks have no dependence on one other. (This
is a slight oversimplification, since the output from mappers is fed to the reducers, but
this is under the control of the MapReduce system; in this case, it needs to take more
care rerunning a failed reducer than rerunning a failed map, because it has to make sure
it can retrieve the necessary map outputs and, if not, regenerate them by running the
relevant maps again.) So from the programmer’s point of view, the order in which the
tasks run doesn’t matter. By contrast, MPI programs have to explicitly manage their own
checkpointing and recovery, which gives more control to the programmer but makes
them more difficult to write.
6. Jim Gray was an early advocate of putting the computation near the data. See “Distributed Computing Eco‐
nomics,” March 2003.
10
|
Chapter 1: Meet Hadoop
Volunteer Computing
When people first hear about Hadoop and MapReduce they often ask, “How is it dif‐
ferent from SETI@home?” SETI, the Search for Extra-Terrestrial Intelligence, runs a
project called SETI@home in which volunteers donate CPU time from their otherwise
idle computers to analyze radio telescope data for signs of intelligent life outside Earth.
SETI@home is the most well known of many volunteer computing projects; others in‐
clude the Great Internet Mersenne Prime Search (to search for large prime numbers)
and Folding@home (to understand protein folding and how it relates to disease).
Volunteer computing projects work by breaking the problems they are trying to
solve into chunks called work units, which are sent to computers around the world to
be analyzed. For example, a SETI@home work unit is about 0.35 MB of radio telescope
data, and takes hours or days to analyze on a typical home computer. When the analysis
is completed, the results are sent back to the server, and the client gets another work
unit. As a precaution to combat cheating, each work unit is sent to three different ma‐
chines and needs at least two results to agree to be accepted.
Although SETI@home may be superficially similar to MapReduce (breaking a problem
into independent pieces to be worked on in parallel), there are some significant differ‐
ences. The SETI@home problem is very CPU-intensive, which makes it suitable for
running on hundreds of thousands of computers across the world7 because the time to
transfer the work unit is dwarfed by the time to run the computation on it. Volunteers
are donating CPU cycles, not bandwidth.
7. In January 2008, SETI@home was reported to be processing 300 gigabytes a day, using 320,000 computers
(most of which are not dedicated to SETI@home; they are used for other things, too).
Comparison with Other Systems
|
11
MapReduce is designed to run jobs that last minutes or hours on trusted, dedicated
hardware running in a single data center with very high aggregate bandwidth
interconnects. By contrast, SETI@home runs a perpetual computation on untrusted
machines on the Internet with highly variable connection speeds and no data locality.
A Brief History of Apache Hadoop
Hadoop was created by Doug Cutting, the creator of Apache Lucene, the widely used
text search library. Hadoop has its origins in Apache Nutch, an open source web search
engine, itself a part of the Lucene project.
The Origin of the Name “Hadoop”
The name Hadoop is not an acronym; it’s a made-up name. The project’s creator, Doug
Cutting, explains how the name came about:
The name my kid gave a stuffed yellow elephant. Short, relatively easy to spell and
pronounce, meaningless, and not used elsewhere: those are my naming criteria. Kids
are good at generating such. Googol is a kid’s term.
Projects in the Hadoop ecosystem also tend to have names that are unrelated to their
function, often with an elephant or other animal theme (“Pig,” for example). Smaller
components are given more descriptive (and therefore more mundane) names. This is
a good principle, as it means you can generally work out what something does from its
name. For example, the namenode8 manages the filesystem namespace.
Building a web search engine from scratch was an ambitious goal, for not only is the
software required to crawl and index websites complex to write, but it is also a challenge
to run without a dedicated operations team, since there are so many moving parts. It’s
expensive, too: Mike Cafarella and Doug Cutting estimated a system supporting a
one-billion-page index would cost around $500,000 in hardware, with a monthly run‐
ning cost of $30,000.9 Nevertheless, they believed it was a worthy goal, as it would open
up and ultimately democratize search engine algorithms.
Nutch was started in 2002, and a working crawler and search system quickly emerged.
However, its creators realized that their architecture wouldn’t scale to the billions of
pages on the Web. Help was at hand with the publication of a paper in 2003 that described
the architecture of Google’s distributed filesystem, called GFS, which was being used in
8. In this book, we use the lowercase form, “namenode,” to denote the entity when it’s being referred to generally,
and the CamelCase form NameNode to denote the Java class that implements it.
9. See Mike Cafarella and Doug Cutting, “Building Nutch: Open Source Search,” ACM Queue, April 2004.
12
|
Chapter 1: Meet Hadoop
production at Google.10 GFS, or something like it, would solve their storage needs for
the very large files generated as a part of the web crawl and indexing process. In par‐
ticular, GFS would free up time being spent on administrative tasks such as managing
storage nodes. In 2004, Nutch’s developers set about writing an open source implemen‐
tation, the Nutch Distributed Filesystem (NDFS).
In 2004, Google published the paper that introduced MapReduce to the world.11 Early
in 2005, the Nutch developers had a working MapReduce implementation in Nutch,
and by the middle of that year all the major Nutch algorithms had been ported to run
using MapReduce and NDFS.
NDFS and the MapReduce implementation in Nutch were applicable beyond the realm
of search, and in February 2006 they moved out of Nutch to form an independent
subproject of Lucene called Hadoop. At around the same time, Doug Cutting joined
Yahoo!, which provided a dedicated team and the resources to turn Hadoop into a
system that ran at web scale (see the following sidebar). This was demonstrated in Feb‐
ruary 2008 when Yahoo! announced that its production search index was being gener‐
ated by a 10,000-core Hadoop cluster.12
Hadoop at Yahoo!
Building Internet-scale search engines requires huge amounts of data and therefore large
numbers of machines to process it. Yahoo! Search consists of four primary components:
the Crawler, which downloads pages from web servers; the WebMap, which builds a
graph of the known Web; the Indexer, which builds a reverse index to the best pages;
and the Runtime, which answers users’ queries. The WebMap is a graph that consists of
roughly 1 trillion (1012) edges, each representing a web link, and 100 billion (1011) nodes,
each representing distinct URLs. Creating and analyzing such a large graph requires a
large number of computers running for many days. In early 2005, the infrastructure for
the WebMap, named Dreadnaught, needed to be redesigned to scale up to more nodes.
Dreadnaught had successfully scaled from 20 to 600 nodes, but required a complete
redesign to scale out further. Dreadnaught is similar to MapReduce in many ways, but
provides more flexibility and less structure. In particular, each fragment in a Dread‐
naught job could send output to each of the fragments in the next stage of the job, but
the sort was all done in library code. In practice, most of the WebMap phases were pairs
that corresponded to MapReduce. Therefore, the WebMap applications would not re‐
quire extensive refactoring to fit into MapReduce.
10. Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung, “The Google File System,” October 2003.
11. Jeffrey Dean and Sanjay Ghemawat, “MapReduce: Simplified Data Processing on Large Clusters,” December
2004.
12. “Yahoo! Launches World’s Largest Hadoop Production Application,” February 19, 2008.
A Brief History of Apache Hadoop
|
13
Eric Baldeschwieler (aka Eric14) created a small team, and we started designing and
prototyping a new framework, written in C++ modeled and after GFS and MapReduce,
to replace Dreadnaught. Although the immediate need was for a new framework for
WebMap, it was clear that standardization of the batch platform across Yahoo! Search
was critical and that by making the framework general enough to support other users,
we could better leverage investment in the new platform.
At the same time, we were watching Hadoop, which was part of Nutch, and its progress.
In January 2006, Yahoo! hired Doug Cutting, and a month later we decided to abandon
our prototype and adopt Hadoop. The advantage of Hadoop over our prototype and
design was that it was already working with a real application (Nutch) on 20 nodes. That
allowed us to bring up a research cluster two months later and start helping real cus‐
tomers use the new framework much sooner than we could have otherwise. Another
advantage, of course, was that since Hadoop was already open source, it was easier
(although far from easy!) to get permission from Yahoo!’s legal department to work in
open source. So, we set up a 200-node cluster for the researchers in early 2006 and put
the WebMap conversion plans on hold while we supported and improved Hadoop for
the research users.
—Owen O’Malley, 2009
In January 2008, Hadoop was made its own top-level project at Apache, confirming its
success and its diverse, active community. By this time, Hadoop was being used by many
other companies besides Yahoo!, such as Last.fm, Facebook, and the New York Times.
In one well-publicized feat, the New York Times used Amazon’s EC2 compute cloud to
crunch through 4 terabytes of scanned archives from the paper, converting them to
PDFs for the Web.13 The processing took less than 24 hours to run using 100 machines,
and the project probably wouldn’t have been embarked upon without the combination
of Amazon’s pay-by-the-hour model (which allowed the NYT to access a large number
of machines for a short period) and Hadoop’s easy-to-use parallel programming model.
In April 2008, Hadoop broke a world record to become the fastest system to sort an
entire terabyte of data. Running on a 910-node cluster, Hadoop sorted 1 terabyte in 209
seconds (just under 3.5 minutes), beating the previous year’s winner of 297 seconds.14
In November of the same year, Google reported that its MapReduce implementation
sorted 1 terabyte in 68 seconds.15 Then, in April 2009, it was announced that a team at
Yahoo! had used Hadoop to sort 1 terabyte in 62 seconds.16
13. Derek Gottfrid, “Self-Service, Prorated Super Computing Fun!” November 1, 2007.
14. Owen O’Malley, “TeraByte Sort on Apache Hadoop,” May 2008.
15. Grzegorz Czajkowski, “Sorting 1PB with MapReduce,” November 21, 2008.
16. Owen O’Malley and Arun C. Murthy, “Winning a 60 Second Dash with a Yellow Elephant,” April 2009.
14
|
Chapter 1: Meet Hadoop
The trend since then has been to sort even larger volumes of data at ever faster rates. In
the 2014 competition, a team from Databricks were joint winners of the Gray Sort
benchmark. They used a 207-node Spark cluster to sort 100 terabytes of data in 1,406
seconds, a rate of 4.27 terabytes per minute.17
Today, Hadoop is widely used in mainstream enterprises. Hadoop’s role as a generalpurpose storage and analysis platform for big data has been recognized by the industry,
and this fact is reflected in the number of products that use or incorporate Hadoop in
some way. Commercial Hadoop support is available from large, established enterprise
vendors, including EMC, IBM, Microsoft, and Oracle, as well as from specialist Hadoop
companies such as Cloudera, Hortonworks, and MapR.
What’s in This Book?
The book is divided into five main parts: Parts I to III are about core Hadoop, Part IV
covers related projects in the Hadoop ecosystem, and Part V contains Hadoop case
studies. You can read the book from cover to cover, but there are alternative pathways
through the book that allow you to skip chapters that aren’t needed to read later ones.
See Figure 1-1.
Part I is made up of five chapters that cover the fundamental components in Hadoop
and should be read before tackling later chapters. Chapter 1 (this chapter) is a high-level
introduction to Hadoop. Chapter 2 provides an introduction to MapReduce. Chap‐
ter 3 looks at Hadoop filesystems, and in particular HDFS, in depth. Chapter 4 discusses
YARN, Hadoop’s cluster resource management system. Chapter 5 covers the I/O build‐
ing blocks in Hadoop: data integrity, compression, serialization, and file-based data
structures.
Part II has four chapters that cover MapReduce in depth. They provide useful under‐
standing for later chapters (such as the data processing chapters in Part IV), but could
be skipped on a first reading. Chapter 6 goes through the practical steps needed to
develop a MapReduce application. Chapter 7 looks at how MapReduce is implemented
in Hadoop, from the point of view of a user. Chapter 8 is about the MapReduce pro‐
gramming model and the various data formats that MapReduce can work with. Chap‐
ter 9 is on advanced MapReduce topics, including sorting and joining data.
Part III concerns the administration of Hadoop: Chapters 10 and 11 describe how to
set up and maintain a Hadoop cluster running HDFS and MapReduce on YARN.
Part IV of the book is dedicated to projects that build on Hadoop or are closely related
to it. Each chapter covers one project and is largely independent of the other chapters
in this part, so they can be read in any order.
17. Reynold Xin et al., “GraySort on Apache Spark by Databricks,” November 2014.
What’s in This Book?
|
15
The first two chapters in this part are about data formats. Chapter 12 looks at Avro, a
cross-language data serialization library for Hadoop, and Chapter 13 covers Parquet,
an efficient columnar storage format for nested data.
The next two chapters look at data ingestion, or how to get your data into Hadoop.
Chapter 14 is about Flume, for high-volume ingestion of streaming data. Chapter 15 is
about Sqoop, for efficient bulk transfer of data between structured data stores (like
relational databases) and HDFS.
The common theme of the next four chapters is data processing, and in particular using
higher-level abstractions than MapReduce. Pig (Chapter 16) is a data flow language for
exploring very large datasets. Hive (Chapter 17) is a data warehouse for managing data
stored in HDFS and provides a query language based on SQL. Crunch (Chapter 18) is
a high-level Java API for writing data processing pipelines that can run on MapReduce
or Spark. Spark (Chapter 19) is a cluster computing framework for large-scale data
processing; it provides a directed acyclic graph (DAG) engine, and APIs in Scala, Java,
and Python.
Chapter 20 is an introduction to HBase, a distributed column-oriented real-time data‐
base that uses HDFS for its underlying storage. And Chapter 21 is about ZooKeeper, a
distributed, highly available coordination service that provides useful primitives for
building distributed applications.
Finally, Part V is a collection of case studies contributed by people using Hadoop in
interesting ways.
Supplementary information about Hadoop, such as how to install it on your machine,
can be found in the appendixes.
16
|
Chapter 1: Meet Hadoop
Figure 1-1. Structure of the book: there are various pathways through the content
What’s in This Book?
|
17
CHAPTER 2
MapReduce
MapReduce is a programming model for data processing. The model is simple, yet not
too simple to express useful programs in. Hadoop can run MapReduce programs written
in various languages; in this chapter, we look at the same program expressed in Java,
Ruby, and Python. Most importantly, MapReduce programs are inherently parallel, thus
putting very large-scale data analysis into the hands of anyone with enough machines
at their disposal. MapReduce comes into its own for large datasets, so let’s start by looking
at one.
A Weather Dataset
For our example, we will write a program that mines weather data. Weather sensors
collect data every hour at many locations across the globe and gather a large volume of
log data, which is a good candidate for analysis with MapReduce because we want to
process all the data, and the data is semi-structured and record-oriented.
Data Format
The data we will use is from the National Climatic Data Center, or NCDC. The data is
stored using a line-oriented ASCII format, in which each line is a record. The format
supports a rich set of meteorological elements, many of which are optional or with
variable data lengths. For simplicity, we focus on the basic elements, such as temperature,
which are always present and are of fixed width.
Example 2-1 shows a sample line with some of the salient fields annotated. The line has
been split into multiple lines to show each field; in the real file, fields are packed into
one line with no delimiters.
19
Example 2-1. Format of a National Climatic Data Center record
0057
332130
99999
19500101
0300
4
+51317
+028783
FM-12
+0171
99999
V020
320
1
N
0072
1
00450
1
C
N
010000
1
N
9
-0128
1
-0139
1
10268
1
#
#
#
#
USAF weather station identifier
WBAN weather station identifier
observation date
observation time
# latitude (degrees x 1000)
# longitude (degrees x 1000)
# elevation (meters)
# wind direction (degrees)
# quality code
# sky ceiling height (meters)
# quality code
# visibility distance (meters)
# quality code
#
#
#
#
#
#
air temperature (degrees Celsius x 10)
quality code
dew point temperature (degrees Celsius x 10)
quality code
atmospheric pressure (hectopascals x 10)
quality code
Datafiles are organized by date and weather station. There is a directory for each year
from 1901 to 2001, each containing a gzipped file for each weather station with its
readings for that year. For example, here are the first entries for 1990:
% ls raw/1990 | head
010010-99999-1990.gz
010014-99999-1990.gz
010015-99999-1990.gz
010016-99999-1990.gz
010017-99999-1990.gz
010030-99999-1990.gz
010040-99999-1990.gz
010080-99999-1990.gz
010100-99999-1990.gz
010150-99999-1990.gz
There are tens of thousands of weather stations, so the whole dataset is made up of a
large number of relatively small files. It’s generally easier and more efficient to process
20
|
Chapter 2: MapReduce
a smaller number of relatively large files, so the data was preprocessed so that each year’s
readings were concatenated into a single file. (The means by which this was carried out
is described in Appendix C.)
Analyzing the Data with Unix Tools
What’s the highest recorded global temperature for each year in the dataset? We will
answer this first without using Hadoop, as this information will provide a performance
baseline and a useful means to check our results.
The classic tool for processing line-oriented data is awk. Example 2-2 is a small script
to calculate the maximum temperature for each year.
Example 2-2. A program for finding the maximum recorded temperature by year from
NCDC weather records
#!/usr/bin/env bash
for year in all/*
do
echo -ne `basename $year .gz`"\t"
gunzip -c $year | \
awk '{ temp = substr($0, 88, 5) + 0;
q = substr($0, 93, 1);
if (temp !=9999 && q ~ /[01459]/ && temp > max) max = temp }
END { print max }'
done
The script loops through the compressed year files, first printing the year, and then
processing each file using awk. The awk script extracts two fields from the data: the air
temperature and the quality code. The air temperature value is turned into an integer
by adding 0. Next, a test is applied to see whether the temperature is valid (the value
9999 signifies a missing value in the NCDC dataset) and whether the quality code in‐
dicates that the reading is not suspect or erroneous. If the reading is OK, the value is
compared with the maximum value seen so far, which is updated if a new maximum is
found. The END block is executed after all the lines in the file have been processed, and
it prints the maximum value.
Here is the beginning of a run:
% ./max_temperature.sh
1901 317
1902 244
1903 289
1904 256
1905 283
...
The temperature values in the source file are scaled by a factor of 10, so this works out
as a maximum temperature of 31.7°C for 1901 (there were very few readings at the
Analyzing the Data with Unix Tools
|
21
beginning of the century, so this is plausible). The complete run for the century took 42
minutes in one run on a single EC2 High-CPU Extra Large instance.
To speed up the processing, we need to run parts of the program in parallel. In theory,
this is straightforward: we could process different years in different processes, using all
the available hardware threads on a machine. There are a few problems with this,
however.
First, dividing the work into equal-size pieces isn’t always easy or obvious. In this case,
the file size for different years varies widely, so some processes will finish much earlier
than others. Even if they pick up further work, the whole run is dominated by the longest
file. A better approach, although one that requires more work, is to split the input into
fixed-size chunks and assign each chunk to a process.
Second, combining the results from independent processes may require further pro‐
cessing. In this case, the result for each year is independent of other years, and they may
be combined by concatenating all the results and sorting by year. If using the fixed-size
chunk approach, the combination is more delicate. For this example, data for a particular
year will typically be split into several chunks, each processed independently. We’ll end
up with the maximum temperature for each chunk, so the final step is to look for the
highest of these maximums for each year.
Third, you are still limited by the processing capacity of a single machine. If the best
time you can achieve is 20 minutes with the number of processors you have, then that’s
it. You can’t make it go faster. Also, some datasets grow beyond the capacity of a single
machine. When we start using multiple machines, a whole host of other factors come
into play, mainly falling into the categories of coordination and reliability. Who runs
the overall job? How do we deal with failed processes?
So, although it’s feasible to parallelize the processing, in practice it’s messy. Using a
framework like Hadoop to take care of these issues is a great help.
Analyzing the Data with Hadoop
To take advantage of the parallel processing that Hadoop provides, we need to express
our query as a MapReduce job. After some local, small-scale testing, we will be able to
run it on a cluster of machines.
Map and Reduce
MapReduce works by breaking the processing into two phases: the map phase and the
reduce phase. Each phase has key-value pairs as input and output, the types of which
may be chosen by the programmer. The programmer also specifies two functions: the
map function and the reduce function.
22
|
Chapter 2: MapReduce
The input to our map phase is the raw NCDC data. We choose a text input format that
gives us each line in the dataset as a text value. The key is the offset of the beginning of
the line from the beginning of the file, but as we have no need for this, we ignore it.
Our map function is simple. We pull out the year and the air temperature, because these
are the only fields we are interested in. In this case, the map function is just a data
preparation phase, setting up the data in such a way that the reduce function can do its
work on it: finding the maximum temperature for each year. The map function is also
a good place to drop bad records: here we filter out temperatures that are missing,
suspect, or erroneous.
To visualize the way the map works, consider the following sample lines of input data
(some unused columns have been dropped to fit the page, indicated by ellipses):
0067011990999991950051507004...9999999N9+00001+99999999999...
0043011990999991950051512004...9999999N9+00221+99999999999...
0043011990999991950051518004...9999999N9-00111+99999999999...
0043012650999991949032412004...0500001N9+01111+99999999999...
0043012650999991949032418004...0500001N9+00781+99999999999...
These lines are presented to the map function as the key-value pairs:
(0, 0067011990999991950051507004...9999999N9+00001+99999999999...)
(106, 0043011990999991950051512004...9999999N9+00221+99999999999...)
(212, 0043011990999991950051518004...9999999N9-00111+99999999999...)
(318, 0043012650999991949032412004...0500001N9+01111+99999999999...)
(424, 0043012650999991949032418004...0500001N9+00781+99999999999...)
The keys are the line offsets within the file, which we ignore in our map function. The
map function merely extracts the year and the air temperature (indicated in bold text),
and emits them as its output (the temperature values have been interpreted as
integers):
(1950,
(1950,
(1950,
(1949,
(1949,
0)
22)
−11)
111)
78)
The output from the map function is processed by the MapReduce framework before
being sent to the reduce function. This processing sorts and groups the key-value pairs
by key. So, continuing the example, our reduce function sees the following input:
(1949, [111, 78])
(1950, [0, 22, −11])
Each year appears with a list of all its air temperature readings. All the reduce function
has to do now is iterate through the list and pick up the maximum reading:
(1949, 111)
(1950, 22)
Analyzing the Data with Hadoop
|
23
This is the final output: the maximum global temperature recorded in each year.
The whole data flow is illustrated in Figure 2-1. At the bottom of the diagram is a Unix
pipeline, which mimics the whole MapReduce flow and which we will see again later in
this chapter when we look at Hadoop Streaming.
Figure 2-1. MapReduce logical data flow
Java MapReduce
Having run through how the MapReduce program works, the next step is to express it
in code. We need three things: a map function, a reduce function, and some code to run
the job. The map function is represented by the Mapper class, which declares an abstract
map() method. Example 2-3 shows the implementation of our map function.
Example 2-3. Mapper for the maximum temperature example
import java.io.IOException;
import
import
import
import
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.LongWritable;
org.apache.hadoop.io.Text;
org.apache.hadoop.mapreduce.Mapper;
public class MaxTemperatureMapper
extends Mapper {
private static final int MISSING = 9999;
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature;
if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
airTemperature = Integer.parseInt(line.substring(88, 92));
} else {
airTemperature = Integer.parseInt(line.substring(87, 92));
}
String quality = line.substring(92, 93);
24
|
Chapter 2: MapReduce
if (airTemperature != MISSING && quality.matches("[01459]")) {
context.write(new Text(year), new IntWritable(airTemperature));
}
}
}
The Mapper class is a generic type, with four formal type parameters that specify the
input key, input value, output key, and output value types of the map function. For the
present example, the input key is a long integer offset, the input value is a line of text,
the output key is a year, and the output value is an air temperature (an integer). Rather
than using built-in Java types, Hadoop provides its own set of basic types that are op‐
timized for network serialization. These are found in the org.apache.hadoop.io pack‐
age. Here we use LongWritable, which corresponds to a Java Long, Text (like Java
String), and IntWritable (like Java Integer).
The map() method is passed a key and a value. We convert the Text value containing
the line of input into a Java String, then use its substring() method to extract the
columns we are interested in.
The map() method also provides an instance of Context to write the output to. In this
case, we write the year as a Text object (since we are just using it as a key), and the
temperature is wrapped in an IntWritable. We write an output record only if the tem‐
perature is present and the quality code indicates the temperature reading is OK.
The reduce function is similarly defined using a Reducer, as illustrated in Example 2-4.
Example 2-4. Reducer for the maximum temperature example
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class MaxTemperatureReducer
extends Reducer {
@Override
public void reduce(Text key, Iterable values, Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
Analyzing the Data with Hadoop
|
25
Again, four formal type parameters are used to specify the input and output types, this
time for the reduce function. The input types of the reduce function must match the
output types of the map function: Text and IntWritable. And in this case, the output
types of the reduce function are Text and IntWritable, for a year and its maximum
temperature, which we find by iterating through the temperatures and comparing each
with a record of the highest found so far.
The third piece of code runs the MapReduce job (see Example 2-5).
Example 2-5. Application to find the maximum temperature in the weather dataset
import
import
import
import
import
import
org.apache.hadoop.fs.Path;
org.apache.hadoop.io.IntWritable;
org.apache.hadoop.io.Text;
org.apache.hadoop.mapreduce.Job;
org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class MaxTemperature {
public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: MaxTemperature