Kafka: The Definitive Guide Confluent Kafka

confluent-kafka-definitive-guide

confluent-kafka-definitive-guide

KAFKA-definitive-guide

Kafka_Definitive_Guide

Confluent%20kafka%20definitive%20guide%20

User Manual: Pdf

Open the PDF directly: View PDF PDF.
Page Count: 322 [warning: Documents this large are best viewed by clicking the View PDF Link!]

Neha Narkhede,
Gwen Shapira & Todd Palino
Kafka
The De nitive Guide
REAL-TIME DATA AND STREAM PROCESSING AT SCALE
Compliments of
Get Started With
Apache Kafka Today
Thoroughly tested and quality assured
• Additional client support, including Python, C/C++ and .NET
• Easy upgrade path to Confluent Enterprise
CONFLUENT OPEN SOURCE
CONNECTORS CLIENTS
SCHEMA REGISTRY REST PROXY
Start today at confluent.io/download
A 100% open source Apache Kafka distribution for building robust
streaming applications.
Neha Narkhede, Gwen Shapira, and Todd Palino
Kafka: The Denitive Guide
Real-Time Data and Stream Processing at Scale
Boston Farnham Sebastopol Tokyo
Beijing Boston Farnham Sebastopol Tokyo
Beijing
978-1-491-99065-0
[LSI]
Kafka: The Denitive Guide
by Neha Narkhede, Gwen Shapira, and Todd Palino
Copyright © 2017 Neha Narkhede, Gwen Shapira, Todd Palino. All rights reserved.
Printed in the United States of America.
Published by O’Reilly Media, Inc., 1005 Gravenstein Highway North, Sebastopol, CA 95472.
O’Reilly books may be purchased for educational, business, or sales promotional use. Online editions are
also available for most titles (http://oreilly.com/safari). For more information, contact our corporate/insti‐
tutional sales department: 800-998-9938 or corporate@oreilly.com.
Editor: Shannon Cutt
Production Editor: Shiny Kalapurakkel
Copyeditor: Christina Edwards
Proofreader: Amanda Kersey
Indexer: WordCo Indexing Services, Inc.
Interior Designer: David Futato
Cover Designer: Karen Montgomery
Illustrator: Rebecca Demarest
July 2017: First Edition
Revision History for the First Edition
2017-07-07: First Release
See http://oreilly.com/catalog/errata.csp?isbn=9781491936160 for release details.
The O’Reilly logo is a registered trademark of O’Reilly Media, Inc. Kaa: e Denitive Guide, the cover
image, and related trade dress are trademarks of O’Reilly Media, Inc.
While the publisher and the authors have used good faith efforts to ensure that the information and
instructions contained in this work are accurate, the publisher and the authors disclaim all responsibility
for errors or omissions, including without limitation responsibility for damages resulting from the use of
or reliance on this work. Use of the information and instructions contained in this work is at your own
risk. If any code samples or other technology this work contains or describes is subject to open source
licenses or the intellectual property rights of others, it is your responsibility to ensure that your use
thereof complies with such licenses and/or rights.
Table of Contents
Foreword. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xiii
Preface. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . xvii
1. Meet Kafka. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 1
Publish/Subscribe Messaging 1
How It Starts 2
Individual Queue Systems 3
Enter Kafka 4
Messages and Batches 4
Schemas 5
Topics and Partitions 5
Producers and Consumers 6
Brokers and Clusters 7
Multiple Clusters 8
Why Kafka? 10
Multiple Producers 10
Multiple Consumers 10
Disk-Based Retention 10
Scalable 10
High Performance 11
The Data Ecosystem 11
Use Cases 12
Kafkas Origin 14
LinkedIns Problem 14
The Birth of Kafka 15
Open Source 15
The Name 16
v
Getting Started with Kafka 16
2. Installing Kafka. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 17
First Things First 17
Choosing an Operating System 17
Installing Java 17
Installing Zookeeper 18
Installing a Kafka Broker 20
Broker Configuration 21
General Broker 21
Topic Defaults 24
Hardware Selection 28
Disk Throughput 29
Disk Capacity 29
Memory 29
Networking 30
CPU 30
Kafka in the Cloud 30
Kafka Clusters 31
How Many Brokers? 32
Broker Configuration 32
OS Tuning 32
Production Concerns 36
Garbage Collector Options 36
Datacenter Layout 37
Colocating Applications on Zookeeper 37
Summary 39
3. Kafka Producers: Writing Messages to Kafka. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 41
Producer Overview 42
Constructing a Kafka Producer 44
Sending a Message to Kafka 46
Sending a Message Synchronously 46
Sending a Message Asynchronously 47
Configuring Producers 48
Serializers 52
Custom Serializers 52
Serializing Using Apache Avro 54
Using Avro Records with Kafka 56
Partitions 59
Old Producer APIs 61
Summary 62
vi | Table of Contents
4. Kafka Consumers: Reading Data from Kafka. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 63
Kafka Consumer Concepts 63
Consumers and Consumer Groups 63
Consumer Groups and Partition Rebalance 66
Creating a Kafka Consumer 68
Subscribing to Topics 69
The Poll Loop 70
Configuring Consumers 72
Commits and Offsets 75
Automatic Commit 76
Commit Current Offset 77
Asynchronous Commit 78
Combining Synchronous and Asynchronous Commits 80
Commit Specified Offset 80
Rebalance Listeners 82
Consuming Records with Specific Offsets 84
But How Do We Exit? 86
Deserializers 88
Standalone Consumer: Why and How to Use a Consumer Without a Group 92
Older Consumer APIs 93
Summary 93
5. Kafka Internals. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 95
Cluster Membership 95
The Controller 96
Replication 97
Request Processing 99
Produce Requests 101
Fetch Requests 102
Other Requests 104
Physical Storage 105
Partition Allocation 106
File Management 107
File Format 108
Indexes 109
Compaction 110
How Compaction Works 110
Deleted Events 112
When Are Topics Compacted? 112
Summary 113
Table of Contents | vii
6. Reliable Data Delivery. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 115
Reliability Guarantees 116
Replication 117
Broker Configuration 118
Replication Factor 118
Unclean Leader Election 119
Minimum In-Sync Replicas 121
Using Producers in a Reliable System 121
Send Acknowledgments 122
Configuring Producer Retries 123
Additional Error Handling 124
Using Consumers in a Reliable System 125
Important Consumer Configuration Properties for Reliable Processing 126
Explicitly Committing Offsets in Consumers 127
Validating System Reliability 129
Validating Configuration 130
Validating Applications 131
Monitoring Reliability in Production 131
Summary 133
7. Building Data Pipelines. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 135
Considerations When Building Data Pipelines 136
Timeliness 136
Reliability 137
High and Varying Throughput 137
Data Formats 138
Transformations 139
Security 139
Failure Handling 140
Coupling and Agility 140
When to Use Kafka Connect Versus Producer and Consumer 141
Kafka Connect 142
Running Connect 142
Connector Example: File Source and File Sink 144
Connector Example: MySQL to Elasticsearch 146
A Deeper Look at Connect 151
Alternatives to Kafka Connect 154
Ingest Frameworks for Other Datastores 155
GUI-Based ETL Tools 155
Stream-Processing Frameworks 155
Summary 156
viii | Table of Contents
8. Cross-Cluster Data Mirroring. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 157
Use Cases of Cross-Cluster Mirroring 158
Multicluster Architectures 158
Some Realities of Cross-Datacenter Communication 159
Hub-and-Spokes Architecture 160
Active-Active Architecture 161
Active-Standby Architecture 163
Stretch Clusters 169
Apache Kafkas MirrorMaker 170
How to Configure 171
Deploying MirrorMaker in Production 172
Tuning MirrorMaker 175
Other Cross-Cluster Mirroring Solutions 178
Uber uReplicator 178
Confluents Replicator 179
Summary 180
9. Administering Kafka. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 181
Topic Operations 181
Creating a New Topic 182
Adding Partitions 183
Deleting a Topic 184
Listing All Topics in a Cluster 185
Describing Topic Details 185
Consumer Groups 186
List and Describe Groups 186
Delete Group 188
Offset Management 188
Dynamic Configuration Changes 190
Overriding Topic Configuration Defaults 190
Overriding Client Configuration Defaults 192
Describing Configuration Overrides 192
Removing Configuration Overrides 193
Partition Management 193
Preferred Replica Election 193
Changing a Partitions Replicas 195
Changing Replication Factor 198
Dumping Log Segments 199
Replica Verification 201
Consuming and Producing 202
Console Consumer 202
Console Producer 205
Table of Contents | ix
Client ACLs 207
Unsafe Operations 207
Moving the Cluster Controller 208
Killing a Partition Move 208
Removing Topics to Be Deleted 209
Deleting Topics Manually 209
Summary 210
10. Monitoring Kafka. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 211
Metric Basics 211
Where Are the Metrics? 211
Internal or External Measurements 212
Application Health Checks 213
Metric Coverage 213
Kafka Broker Metrics 213
Under-Replicated Partitions 214
Broker Metrics 220
Topic and Partition Metrics 229
JVM Monitoring 231
OS Monitoring 232
Logging 235
Client Monitoring 236
Producer Metrics 236
Consumer Metrics 239
Quotas 242
Lag Monitoring 243
End-to-End Monitoring 244
Summary 244
11. Stream Processing. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 247
What Is Stream Processing? 248
Stream-Processing Concepts 251
Time 251
State 252
Stream-Table Duality 253
Time Windows 254
Stream-Processing Design Patterns 256
Single-Event Processing 256
Processing with Local State 257
Multiphase Processing/Repartitioning 258
Processing with External Lookup: Stream-Table Join 259
Streaming Join 261
x | Table of Contents
Out-of-Sequence Events 262
Reprocessing 264
Kafka Streams by Example 264
Word Count 265
Stock Market Statistics 268
Click Stream Enrichment 270
Kafka Streams: Architecture Overview 272
Building a Topology 272
Scaling the Topology 273
Surviving Failures 276
Stream Processing Use Cases 277
How to Choose a Stream-Processing Framework 278
Summary 280
A. Installing Kafka on Other Operating Systems. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 281
Index. . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . . 287
Table of Contents | xi
Foreword
It’s an exciting time for Apache Kafka. Kafka is being used by tens of thousands of
organizations, including over a third of the Fortune 500 companies. It’s among the
fastest growing open source projects and has spawned an immense ecosystem around
it. It’s at the heart of a movement towards managing and processing streams of data.
So where did Kafka come from? Why did we build it? And what exactly is it?
Kafka got its start as an internal infrastructure system we built at LinkedIn. Our
observation was really simple: there were lots of databases and other systems built to
store data, but what was missing in our architecture was something that would help
us to handle the continuous ow of data. Prior to building Kafka, we experimented
with all kinds of off the shelf options; from messaging systems to log aggregation and
ETL tools, but none of them gave us what we wanted.
We eventually decided to build something from scratch. Our idea was that instead of
focusing on holding piles of data like our relational databases, key-value stores, search
indexes, or caches, we would focus on treating data as a continually evolving and ever
growing stream, and build a data system—and indeed a data architecture—oriented
around that idea.
This idea turned out to be even more broadly applicable than we expected. Though
Kafka got its start powering real-time applications and data flow behind the scenes of
a social network, you can now see it at the heart of next-generation architectures in
every industry imaginable. Big retailers are re-working their fundamental business
processes around continuous data streams; car companies are collecting and process‐
ing real-time data streams from internet-connected cars; and banks are rethinking
their fundamental processes and systems around Kafka as well.
So what is this Kafka thing all about? How does it compare to the systems you already
know and use?
We’ve come to think of Kafka as a streaming platform: a system that lets you publish
and subscribe to streams of data, store them, and process them, and that is exactly
xiii
what Apache Kafka is built to be. Getting used to this way of thinking about data
might be a little different than what youre used to, but it turns out to be an incredibly
powerful abstraction for building applications and architectures. Kafka is often com‐
pared to a couple of existing technology categories: enterprise messaging systems, big
data systems like Hadoop, and data integration or ETL tools. Each of these compari‐
sons has some validity but also falls a little short.
Kafka is like a messaging system in that it lets you publish and subscribe to streams of
messages. In this way, it is similar to products like ActiveMQ, RabbitMQ, IBM’s
MQSeries, and other products. But even with these similarities, Kafka has a number
of core differences from traditional messaging systems that make it another kind of
animal entirely. Here are the big three differences: first, it works as a modern dis‐
tributed system that runs as a cluster and can scale to handle all the applications in
even the most massive of companies. Rather than running dozens of individual mes‐
saging brokers, hand wired to different apps, this lets you have a central platform that
can scale elastically to handle all the streams of data in a company. Secondly, Kafka is
a true storage system built to store data for as long as you might like. This has huge
advantages in using it as a connecting layer as it provides real delivery guarantees—its
data is replicated, persistent, and can be kept around as long as you like. Finally, the
world of stream processing raises the level of abstraction quite significantly. Messag‐
ing systems mostly just hand out messages. The stream processing capabilities in
Kafka let you compute derived streams and datasets dynamically off of your streams
with far less code. These differences make Kafka enough of its own thing that it
doesn’t really make sense to think of it as “yet another queue.
Another view on Kafka—and one of our motivating lenses in designing and building
it—was to think of it as a kind of real-time version of Hadoop. Hadoop lets you store
and periodically process file data at a very large scale. Kafka lets you store and contin‐
uously process streams of data, also at a large scale. At a technical level, there are defi‐
nitely similarities, and many people see the emerging area of stream processing as a
superset of the kind of batch processing people have done with Hadoop and its vari‐
ous processing layers. What this comparison misses is that the use cases that continu‐
ous, low-latency processing opens up are quite different from those that naturally fall
on a batch processing system. Whereas Hadoop and big data targeted analytics appli‐
cations, often in the data warehousing space, the low latency nature of Kafka makes it
applicable for the kind of core applications that directly power a business. This makes
sense: events in a business are happening all the time and the ability to react to them
as they occur makes it much easier to build services that directly power the operation
of the business, feed back into customer experiences, and so on.
The final area Kafka gets compared to is ETL or data integration tools. After all, these
tools move data around, and Kafka moves data around. There is some validity to this
as well, but I think the core difference is that Kafka has inverted the problem. Rather
than a tool for scraping data out of one system and inserting it into another, Kafka is
xiv | Foreword
a platform oriented around real-time streams of events. This means that not only can
it connect off-the-shelf applications and data systems, it can power custom applica‐
tions built to trigger off of these same data streams. We think this architecture cen‐
tered around streams of events is a really important thing. In some ways these flows
of data are the most central aspect of a modern digital company, as important as the
cash flows youd see in a financial statement.
The ability to combine these three areas—to bring all the streams of data together
across all the use cases—is what makes the idea of a streaming platform so appealing
to people.
Still, all of this is a bit different, and learning how to think and build applications ori‐
ented around continuous streams of data is quite a mindshift if you are coming from
the world of request/response style applications and relational databases. This book is
absolutely the best way to learn about Kafka; from internals to APIs, written by some
of the people who know it best. I hope you enjoy reading it as much as I have!
— Jay Kreps
Cofounder and CEO at Conuent
Foreword | xv
Preface
The greatest compliment you can give an author of a technical book is “This is the
book I wish I had when I got started with this subject.” This is the goal we set for our‐
selves when we started writing this book. We looked back at our experience writing
Kafka, running Kafka in production, and helping many companies use Kafka to build
software architectures and manage their data pipelines and we asked ourselves,
“What are the most useful things we can share with new users to take them from
beginner to experts?” This book is a reflection of the work we do every day: run
Apache Kafka and help others use it in the best ways.
We included what we believe you need to know in order to successfully run Apache
Kafka in production and build robust and performant applications on top of it. We
highlighted the popular use cases: message bus for event-driven microservices,
stream-processing applications, and large-scale data pipelines. We also focused on
making the book general and comprehensive enough so it will be useful to anyone
using Kafka, no matter the use case or architecture. We cover practical matters such
as how to install and configure Kafka and how to use the Kafka APIs, and we also
dedicated space to Kafkas design principles and reliability guarantees, and explore
several of Kafkas delightful architecture details: the replication protocol, controller,
and storage layer. We believe that knowledge of Kafkas design and internals is not
only a fun read for those interested in distributed systems, but it is also incredibly
useful for those who are seeking to make informed decisions when they deploy Kafka
in production and design applications that use Kafka. The better you understand how
Kafka works, the more you can make informed decisions regarding the many trade-
offs that are involved in engineering.
One of the problems in software engineering is that there is always more than one
way to do anything. Platforms such as Apache Kafka provide plenty of flexibility,
which is great for experts but makes for a steep learning curve for beginners. Very
often, Apache Kafka tells you how to use a feature but not why you should or
shouldn’t use it. Whenever possible, we try to clarify the existing choices, the trade‐
xvii
offs involved, and when you should and shouldnt use the different options presented
by Apache Kafka.
Who Should Read This Book
Kaa: e Denitive Guide was written for software engineers who develop applica‐
tions that use Kafkas APIs and for production engineers (also called SREs, devops, or
sysadmins) who install, configure, tune, and monitor Kafka in production. We also
wrote the book with data architects and data engineers in mind—those responsible
for designing and building an organizations entire data infrastructure. Some of the
chapters, especially chapters 3, 4, and 11 are geared toward Java developers. Those
chapters assume that the reader is familiar with the basics of the Java programming
language, including topics such as exception handling and concurrency. Other chap‐
ters, especially chapters 2, 8, 9, and 10, assume the reader has some experience run‐
ning Linux and some familiarity with storage and network configuration in Linux.
The rest of the book discusses Kafka and software architectures in more general
terms and does not assume special knowledge.
Another category of people who may find this book interesting are the managers and
architects who don’t work directly with Kafka but work with the people who do. It is
just as important that they understand the guarantees that Kafka provides and the
trade-offs that their employees and coworkers will need to make while building
Kafka-based systems. The book can provide ammunition to managers who would
like to get their staff trained in Apache Kafka or ensure that their teams know what
they need to know.
Conventions Used in This Book
The following typographical conventions are used in this book:
Italic
Indicates new terms, URLs, email addresses, filenames, and file extensions.
Constant width
Used for program listings, as well as within paragraphs to refer to program ele‐
ments such as variable or function names, databases, data types, environment
variables, statements, and keywords.
Constant width bold
Shows commands or other text that should be typed literally by the user.
Constant width italic
Shows text that should be replaced with user-supplied values or by values deter‐
mined by context.
xviii | Preface
This element signifies a tip or suggestion.
This element signifies a general note.
This element indicates a warning or caution.
Using Code Examples
This book is here to help you get your job done. In general, if example code is offered
with this book, you may use it in your programs and documentation. You do not
need to contact us for permission unless youre reproducing a significant portion of
the code. For example, writing a program that uses several chunks of code from this
book does not require permission. Selling or distributing a CD-ROM of examples
from O’Reilly books does require permission. Answering a question by citing this
book and quoting example code does not require permission. Incorporating a signifi‐
cant amount of example code from this book into your products documentation does
require permission.
We appreciate, but do not require, attribution. An attribution usually includes the
title, author, publisher, and ISBN. For example: “Kaa: e Denitive Guide by Neha
Narkhede, Gwen Shapira, and Todd Palino (O’Reilly). Copyright 2017 Neha Nar‐
khede, Gwen Shapira, and Todd Palino, 978-1-491-93616-0.
If you feel your use of code examples falls outside fair use or the permission given
above, feel free to contact us at permissions@oreilly.com.
O’Reilly Safari
Safari (formerly Safari Books Online) is a membership-based
training and reference platform for enterprise, government,
educators, and individuals.
Preface | xix
Members have access to thousands of books, training videos, Learning Paths, interac‐
tive tutorials, and curated playlists from over 250 publishers, including O’Reilly
Media, Harvard Business Review, Prentice Hall Professional, Addison-Wesley Profes‐
sional, Microsoft Press, Sams, Que, Peachpit Press, Adobe, Focal Press, Cisco Press,
John Wiley & Sons, Syngress, Morgan Kaufmann, IBM Redbooks, Packt, Adobe
Press, FT Press, Apress, Manning, New Riders, McGraw-Hill, Jones & Bartlett, and
Course Technology, among others.
For more information, please visit http://oreilly.com/safari.
How to Contact Us
Please address comments and questions concerning this book to the publisher:
O’Reilly Media, Inc.
1005 Gravenstein Highway North
Sebastopol, CA 95472
800-998-9938 (in the United States or Canada)
707-829-0515 (international or local)
707-829-0104 (fax)
We have a web page for this book, where we list errata, examples, and any additional
information. You can access this page at http://oreil.ly/2tVmYjk.
To comment or ask technical questions about this book, send email to bookques‐
tions@oreilly.com.
For more information about our books, courses, conferences, and news, see our web‐
site at http://www.oreilly.com.
Find us on Facebook: http://facebook.com/oreilly
Follow us on Twitter: http://twitter.com/oreillymedia
Watch us on YouTube: http://www.youtube.com/oreillymedia
Acknowledgments
We would like to thank the many contributors to Apache Kafka and its ecosystem.
Without their work, this book would not exist. Special thanks to Jay Kreps, Neha Nar‐
khede, and Jun Rao, as well as their colleagues and the leadership at LinkedIn, for
cocreating Kafka and contributing it to the Apache Software Foundation.
Many people provided valuable feedback on early versions of the book and we appre‐
ciate their time and expertise: Apurva Mehta, Arseniy Tashoyan, Dylan Scott, Ewen
Cheslack-Postava, Grant Henke, Ismael Juma, James Cheng, Jason Gustafson, Jeff
xx | Preface
Holoman, Joel Koshy, Jonathan Seidman, Matthias Sax, Michael Noll, Paolo Castagna,
and Jesse Anderson. We also want to thank the many readers who left comments and
feedback via the rough-cuts feedback site.
Many reviewers helped us out and greatly improved the quality of this book, so any
mistakes left are our own.
Wed like to thank our O’Reilly editor Shannon Cutt for her encouragement and
patience, and for being far more on top of things than we were. Working with
O’Reilly is a great experience for an author—the support they provide, from tools to
book signings is unparallel. We are grateful to everyone involved in making this hap‐
pen and we appreciate their choice to work with us.
And wed like to thank our managers and colleagues for enabling and encouraging us
while writing the book.
Gwen wants to thank her husband, Omer Shapira, for his support and patience dur‐
ing the many months spent writing yet another book; her cats, Luke and Lea for being
cuddly; and her dad, Lior Shapira, for teaching her to always say yes to opportunities,
even when it seems daunting.
Todd would be nowhere without his wife, Marcy, and daughters, Bella and Kaylee,
behind him all the way. Their support for all the extra time writing, and long hours
running to clear his head, keeps him going.
Preface | xxi
CHAPTER 1
Meet Kafka
Every enterprise is powered by data. We take information in, analyze it, manipulate it,
and create more as output. Every application creates data, whether it is log messages,
metrics, user activity, outgoing messages, or something else. Every byte of data has a
story to tell, something of importance that will inform the next thing to be done. In
order to know what that is, we need to get the data from where it is created to where
it can be analyzed. We see this every day on websites like Amazon, where our clicks
on items of interest to us are turned into recommendations that are shown to us a
little later.
The faster we can do this, the more agile and responsive our organizations can be.
The less effort we spend on moving data around, the more we can focus on the core
business at hand. This is why the pipeline is a critical component in the data-driven
enterprise. How we move the data becomes nearly as important as the data itself.
Any time scientists disagree, it’s because we have insufficient data. Then we can agree
on what kind of data to get; we get the data; and the data solves the problem. Either I’m
right, or youre right, or were both wrong. And we move on.
—Neil deGrasse Tyson
Publish/Subscribe Messaging
Before discussing the specifics of Apache Kafka, it is important for us to understand
the concept of publish/subscribe messaging and why it is important. Publish/subscribe
messaging is a pattern that is characterized by the sender (publisher) of a piece of data
(message) not specifically directing it to a receiver. Instead, the publisher classifies the
message somehow, and that receiver (subscriber) subscribes to receive certain classes
of messages. Pub/sub systems often have a broker, a central point where messages are
published, to facilitate this.
1
How It Starts
Many use cases for publish/subscribe start out the same way: with a simple message
queue or interprocess communication channel. For example, you create an applica‐
tion that needs to send monitoring information somewhere, so you write in a direct
connection from your application to an app that displays your metrics on a dash‐
board, and push metrics over that connection, as seen in Figure 1-1.
Figure 1-1. A single, direct metrics publisher
This is a simple solution to a simple problem that works when you are getting started
with monitoring. Before long, you decide you would like to analyze your metrics over
a longer term, and that doesnt work well in the dashboard. You start a new service
that can receive metrics, store them, and analyze them. In order to support this, you
modify your application to write metrics to both systems. By now you have three
more applications that are generating metrics, and they all make the same connec‐
tions to these two services. Your coworker thinks it would be a good idea to do active
polling of the services for alerting as well, so you add a server on each of the applica‐
tions to provide metrics on request. After a while, you have more applications that
are using those servers to get individual metrics and use them for various purposes.
This architecture can look much like Figure 1-2, with connections that are even
harder to trace.
Figure 1-2. Many metrics publishers, using direct connections
2 | Chapter 1: Meet Kafka
The technical debt built up here is obvious, so you decide to pay some of it back. You
set up a single application that receives metrics from all the applications out there,
and provide a server to query those metrics for any system that needs them. This
reduces the complexity of the architecture to something similar to Figure 1-3. Con‐
gratulations, you have built a publish-subscribe messaging system!
Figure 1-3. A metrics publish/subscribe system
Individual Queue Systems
At the same time that you have been waging this war with metrics, one of your cow‐
orkers has been doing similar work with log messages. Another has been working on
tracking user behavior on the frontend website and providing that information to
developers who are working on machine learning, as well as creating some reports for
management. You have all followed a similar path of building out systems that decou‐
ple the publishers of the information from the subscribers to that information.
Figure 1-4 shows such an infrastructure, with three separate pub/sub systems.
Figure 1-4. Multiple publish/subscribe systems
Publish/Subscribe Messaging | 3
This is certainly a lot better than utilizing point-to-point connections (as in
Figure 1-2), but there is a lot of duplication. Your company is maintaining multiple
systems for queuing data, all of which have their own individual bugs and limitations.
You also know that there will be more use cases for messaging coming soon. What
you would like to have is a single centralized system that allows for publishing generic
types of data, which will grow as your business grows.
Enter Kafka
Apache Kafka is a publish/subscribe messaging system designed to solve this prob‐
lem. It is often described as a “distributed commit log” or more recently as a “distrib‐
uting streaming platform.” A filesystem or database commit log is designed to
provide a durable record of all transactions so that they can be replayed to consis‐
tently build the state of a system. Similarly, data within Kafka is stored durably, in
order, and can be read deterministically. In addition, the data can be distributed
within the system to provide additional protections against failures, as well as signifi‐
cant opportunities for scaling performance.
Messages and Batches
The unit of data within Kafka is called a message. If you are approaching Kafka from a
database background, you can think of this as similar to a row or a record. A message
is simply an array of bytes as far as Kafka is concerned, so the data contained within it
does not have a specific format or meaning to Kafka. A message can have an optional
bit of metadata, which is referred to as a key. The key is also a byte array and, as with
the message, has no specific meaning to Kafka. Keys are used when messages are to
be written to partitions in a more controlled manner. The simplest such scheme is to
generate a consistent hash of the key, and then select the partition number for that
message by taking the result of the hash modulo, the total number of partitions in the
topic. This assures that messages with the same key are always written to the same
partition. Keys are discussed in more detail in Chapter 3.
For efficiency, messages are written into Kafka in batches. A batch is just a collection
of messages, all of which are being produced to the same topic and partition. An indi‐
vidual roundtrip across the network for each message would result in excessive over‐
head, and collecting messages together into a batch reduces this. Of course, this is a
tradeoff between latency and throughput: the larger the batches, the more messages
that can be handled per unit of time, but the longer it takes an individual message to
propagate. Batches are also typically compressed, providing more efficient data trans‐
fer and storage at the cost of some processing power.
4 | Chapter 1: Meet Kafka
Schemas
While messages are opaque byte arrays to Kafka itself, it is recommended that addi‐
tional structure, or schema, be imposed on the message content so that it can be easily
understood. There are many options available for message schema, depending on
your applications individual needs. Simplistic systems, such as Javascript Object
Notation (JSON) and Extensible Markup Language (XML), are easy to use and
human-readable. However, they lack features such as robust type handling and com‐
patibility between schema versions. Many Kafka developers favor the use of Apache
Avro, which is a serialization framework originally developed for Hadoop. Avro pro‐
vides a compact serialization format; schemas that are separate from the message pay‐
loads and that do not require code to be generated when they change; and strong data
typing and schema evolution, with both backward and forward compatibility.
A consistent data format is important in Kafka, as it allows writing and reading mes‐
sages to be decoupled. When these tasks are tightly coupled, applications that sub‐
scribe to messages must be updated to handle the new data format, in parallel with
the old format. Only then can the applications that publish the messages be updated
to utilize the new format. By using well-defined schemas and storing them in a com‐
mon repository, the messages in Kafka can be understood without coordination.
Schemas and serialization are covered in more detail in Chapter 3.
Topics and Partitions
Messages in Kafka are categorized into topics. The closest analogies for a topic are a
database table or a folder in a filesystem. Topics are additionally broken down into a
number of partitions. Going back to the “commit log” description, a partition is a sin‐
gle log. Messages are written to it in an append-only fashion, and are read in order
from beginning to end. Note that as a topic typically has multiple partitions, there is
no guarantee of message time-ordering across the entire topic, just within a single
partition. Figure 1-5 shows a topic with four partitions, with writes being appended
to the end of each one. Partitions are also the way that Kafka provides redundancy
and scalability. Each partition can be hosted on a different server, which means that a
single topic can be scaled horizontally across multiple servers to provide performance
far beyond the ability of a single server.
Enter Kafka | 5
Figure 1-5. Representation of a topic with multiple partitions
The term stream is often used when discussing data within systems like Kafka. Most
often, a stream is considered to be a single topic of data, regardless of the number of
partitions. This represents a single stream of data moving from the producers to the
consumers. This way of referring to messages is most common when discussing
stream processing, which is when frameworks—some of which are Kafka Streams,
Apache Samza, and Storm—operate on the messages in real time. This method of
operation can be compared to the way offline frameworks, namely Hadoop, are
designed to work on bulk data at a later time. An overview of stream processing is
provided in Chapter 11.
Producers and Consumers
Kafka clients are users of the system, and there are two basic types: producers and
consumers. There are also advanced client APIs—Kafka Connect API for data inte‐
gration and Kafka Streams for stream processing. The advanced clients use producers
and consumers as building blocks and provide higher-level functionality on top.
Producers create new messages. In other publish/subscribe systems, these may be
called publishers or writers. In general, a message will be produced to a specific topic.
By default, the producer does not care what partition a specific message is written to
and will balance messages over all partitions of a topic evenly. In some cases, the pro‐
ducer will direct messages to specific partitions. This is typically done using the mes‐
sage key and a partitioner that will generate a hash of the key and map it to a specific
partition. This assures that all messages produced with a given key will get written to
the same partition. The producer could also use a custom partitioner that follows
other business rules for mapping messages to partitions. Producers are covered in
more detail in Chapter 3.
Consumers read messages. In other publish/subscribe systems, these clients may be
called subscribers or readers. The consumer subscribes to one or more topics and
reads the messages in the order in which they were produced. The consumer keeps
track of which messages it has already consumed by keeping track of the offset of
6 | Chapter 1: Meet Kafka
messages. The oset is another bit of metadata—an integer value that continually
increases—that Kafka adds to each message as it is produced. Each message in a given
partition has a unique offset. By storing the offset of the last consumed message for
each partition, either in Zookeeper or in Kafka itself, a consumer can stop and restart
without losing its place.
Consumers work as part of a consumer group, which is one or more consumers that
work together to consume a topic. The group assures that each partition is only con‐
sumed by one member. In Figure 1-6, there are three consumers in a single group
consuming a topic. Two of the consumers are working from one partition each, while
the third consumer is working from two partitions. The mapping of a consumer to a
partition is often called ownership of the partition by the consumer.
In this way, consumers can horizontally scale to consume topics with a large number
of messages. Additionally, if a single consumer fails, the remaining members of the
group will rebalance the partitions being consumed to take over for the missing
member. Consumers and consumer groups are discussed in more detail in Chapter 4.
Figure 1-6. A consumer group reading from a topic
Brokers and Clusters
A single Kafka server is called a broker. The broker receives messages from producers,
assigns offsets to them, and commits the messages to storage on disk. It also services
consumers, responding to fetch requests for partitions and responding with the mes‐
sages that have been committed to disk. Depending on the specific hardware and its
performance characteristics, a single broker can easily handle thousands of partitions
and millions of messages per second.
Kafka brokers are designed to operate as part of a cluster. Within a cluster of brokers,
one broker will also function as the cluster controller (elected automatically from the
live members of the cluster). The controller is responsible for administrative opera‐
Enter Kafka | 7
tions, including assigning partitions to brokers and monitoring for broker failures. A
partition is owned by a single broker in the cluster, and that broker is called the leader
of the partition. A partition may be assigned to multiple brokers, which will result in
the partition being replicated (as seen in Figure 1-7). This provides redundancy of
messages in the partition, such that another broker can take over leadership if there is
a broker failure. However, all consumers and producers operating on that partition
must connect to the leader. Cluster operations, including partition replication, are
covered in detail in Chapter 6.
Figure 1-7. Replication of partitions in a cluster
A key feature of Apache Kafka is that of retention, which is the durable storage of
messages for some period of time. Kafka brokers are configured with a default reten‐
tion setting for topics, either retaining messages for some period of time (e.g., 7 days)
or until the topic reaches a certain size in bytes (e.g., 1 GB). Once these limits are
reached, messages are expired and deleted so that the retention configuration is a
minimum amount of data available at any time. Individual topics can also be config‐
ured with their own retention settings so that messages are stored for only as long as
they are useful. For example, a tracking topic might be retained for several days,
whereas application metrics might be retained for only a few hours. Topics can also
be configured as log compacted, which means that Kafka will retain only the last mes‐
sage produced with a specific key. This can be useful for changelog-type data, where
only the last update is interesting.
Multiple Clusters
As Kafka deployments grow, it is often advantageous to have multiple clusters. There
are several reasons why this can be useful:
8 | Chapter 1: Meet Kafka
Segregation of types of data
Isolation for security requirements
Multiple datacenters (disaster recovery)
When working with multiple datacenters in particular, it is often required that mes‐
sages be copied between them. In this way, online applications can have access to user
activity at both sites. For example, if a user changes public information in their pro‐
file, that change will need to be visible regardless of the datacenter in which search
results are displayed. Or, monitoring data can be collected from many sites into a sin‐
gle central location where the analysis and alerting systems are hosted. The replica‐
tion mechanisms within the Kafka clusters are designed only to work within a single
cluster, not between multiple clusters.
The Kafka project includes a tool called MirrorMaker, used for this purpose. At its
core, MirrorMaker is simply a Kafka consumer and producer, linked together with a
queue. Messages are consumed from one Kafka cluster and produced for another.
Figure 1-8 shows an example of an architecture that uses MirrorMaker, aggregating
messages from two local clusters into an aggregate cluster, and then copying that
cluster to other datacenters. The simple nature of the application belies its power in
creating sophisticated data pipelines, which will be detailed further in Chapter 7.
Figure 1-8. Multiple datacenter architecture
Enter Kafka | 9
Why Kafka?
There are many choices for publish/subscribe messaging systems, so what makes
Apache Kafka a good choice?
Multiple Producers
Kafka is able to seamlessly handle multiple producers, whether those clients are using
many topics or the same topic. This makes the system ideal for aggregating data from
many frontend systems and making it consistent. For example, a site that serves con‐
tent to users via a number of microservices can have a single topic for page views that
all services can write to using a common format. Consumer applications can then
receive a single stream of page views for all applications on the site without having to
coordinate consuming from multiple topics, one for each application.
Multiple Consumers
In addition to multiple producers, Kafka is designed for multiple consumers to read
any single stream of messages without interfering with each other. This is in contrast
to many queuing systems where once a message is consumed by one client, it is not
available to any other. Multiple Kafka consumers can choose to operate as part of a
group and share a stream, assuring that the entire group processes a given message
only once.
Disk-Based Retention
Not only can Kafka handle multiple consumers, but durable message retention means
that consumers do not always need to work in real time. Messages are committed to
disk, and will be stored with configurable retention rules. These options can be
selected on a per-topic basis, allowing for different streams of messages to have differ‐
ent amounts of retention depending on the consumer needs. Durable retention
means that if a consumer falls behind, either due to slow processing or a burst in traf‐
fic, there is no danger of losing data. It also means that maintenance can be per‐
formed on consumers, taking applications offline for a short period of time, with no
concern about messages backing up on the producer or getting lost. Consumers can
be stopped, and the messages will be retained in Kafka. This allows them to restart
and pick up processing messages where they left off with no data loss.
Scalable
Kafkas flexible scalability makes it easy to handle any amount of data. Users can start
with a single broker as a proof of concept, expand to a small development cluster of
three brokers, and move into production with a larger cluster of tens or even hun‐
dreds of brokers that grows over time as the data scales up. Expansions can be per‐
10 | Chapter 1: Meet Kafka
formed while the cluster is online, with no impact on the availability of the system as
a whole. This also means that a cluster of multiple brokers can handle the failure of
an individual broker, and continue servicing clients. Clusters that need to tolerate
more simultaneous failures can be configured with higher replication factors. Repli‐
cation is discussed in more detail in Chapter 6.
High Performance
All of these features come together to make Apache Kafka a publish/subscribe mes‐
saging system with excellent performance under high load. Producers, consumers,
and brokers can all be scaled out to handle very large message streams with ease. This
can be done while still providing subsecond message latency from producing a mes‐
sage to availability to consumers.
The Data Ecosystem
Many applications participate in the environments we build for data processing. We
have defined inputs in the form of applications that create data or otherwise intro‐
duce it to the system. We have defined outputs in the form of metrics, reports, and
other data products. We create loops, with some components reading data from the
system, transforming it using data from other sources, and then introducing it back
into the data infrastructure to be used elsewhere. This is done for numerous types of
data, with each having unique qualities of content, size, and usage.
Apache Kafka provides the circulatory system for the data ecosystem, as shown in
Figure 1-9. It carries messages between the various members of the infrastructure,
providing a consistent interface for all clients. When coupled with a system to pro‐
vide message schemas, producers and consumers no longer require tight coupling or
direct connections of any sort. Components can be added and removed as business
cases are created and dissolved, and producers do not need to be concerned about
who is using the data or the number of consuming applications.
The Data Ecosystem | 11
Figure 1-9. A big data ecosystem
Use Cases
Activity tracking
The original use case for Kafka, as it was designed at LinkedIn, is that of user activity
tracking. A websites users interact with frontend applications, which generate mes‐
sages regarding actions the user is taking. This can be passive information, such as
page views and click tracking, or it can be more complex actions, such as information
that a user adds to their profile. The messages are published to one or more topics,
which are then consumed by applications on the backend. These applications may be
generating reports, feeding machine learning systems, updating search results, or per‐
forming other operations that are necessary to provide a rich user experience.
Messaging
Kafka is also used for messaging, where applications need to send notifications (such
as emails) to users. Those applications can produce messages without needing to be
concerned about formatting or how the messages will actually be sent. A single appli‐
cation can then read all the messages to be sent and handle them consistently,
including:
Formatting the messages (also known as decorating) using a common look and
feel
Collecting multiple messages into a single notification to be sent
Applying a user’s preferences for how they want to receive messages
12 | Chapter 1: Meet Kafka
Using a single application for this avoids the need to duplicate functionality in multi‐
ple applications, as well as allows operations like aggregation which would not other‐
wise be possible.
Metrics and logging
Kafka is also ideal for collecting application and system metrics and logs. This is a use
case in which the ability to have multiple applications producing the same type of
message shines. Applications publish metrics on a regular basis to a Kafka topic, and
those metrics can be consumed by systems for monitoring and alerting. They can also
be used in an offline system like Hadoop to perform longer-term analysis, such as
growth projections. Log messages can be published in the same way, and can be
routed to dedicated log search systems like Elastisearch or security analysis applica‐
tions. Another added benefit of Kafka is that when the destination system needs to
change (e.g., its time to update the log storage system), there is no need to alter the
frontend applications or the means of aggregation.
Commit log
Since Kafka is based on the concept of a commit log, database changes can be pub‐
lished to Kafka and applications can easily monitor this stream to receive live updates
as they happen. This changelog stream can also be used for replicating database
updates to a remote system, or for consolidating changes from multiple applications
into a single database view. Durable retention is useful here for providing a buffer for
the changelog, meaning it can be replayed in the event of a failure of the consuming
applications. Alternately, log-compacted topics can be used to provide longer reten‐
tion by only retaining a single change per key.
Stream processing
Another area that provides numerous types of applications is stream processing.
While almost all usage of Kafka can be thought of as stream processing, the term is
typically used to refer to applications that provide similar functionality to map/reduce
processing in Hadoop. Hadoop usually relies on aggregation of data over a long time
frame, either hours or days. Stream processing operates on data in real time, as
quickly as messages are produced. Stream frameworks allow users to write small
applications to operate on Kafka messages, performing tasks such as counting met‐
rics, partitioning messages for efficient processing by other applications, or trans‐
forming messages using data from multiple sources. Stream processing is covered in
Chapter 11.
The Data Ecosystem | 13
Kafka’s Origin
Kafka was created to address the data pipeline problem at LinkedIn. It was designed
to provide a high-performance messaging system that can handle many types of data
and provide clean, structured data about user activity and system metrics in real time.
Data really powers everything that we do.
—Jeff Weiner, CEO of LinkedIn
LinkedIn’s Problem
Similar to the example described at the beginning of this chapter, LinkedIn had a sys‐
tem for collecting system and application metrics that used custom collectors and
open source tools for storing and presenting data internally. In addition to traditional
metrics, such as CPU usage and application performance, there was a sophisticated
request-tracing feature that used the monitoring system and could provide introspec‐
tion into how a single user request propagated through internal applications. The
monitoring system had many faults, however. This included metrics collection based
on polling, large intervals between metrics, and no ability for application owners to
manage their own metrics. The system was high-touch, requiring human interven‐
tion for most simple tasks, and inconsistent, with differing metric names for the same
measurement across different systems.
At the same time, there was a system created for tracking user activity information.
This was an HTTP service that frontend servers would connect to periodically and
publish a batch of messages (in XML format) to the HTTP service. These batches
were then moved to offline processing, which is where the files were parsed and colla‐
ted. This system had many faults. The XML formatting was inconsistent, and parsing
it was computationally expensive. Changing the type of user activity that was tracked
required a significant amount of coordinated work between frontends and offline
processing. Even then, the system would break constantly due to changing schemas.
Tracking was built on hourly batching, so it could not be used in real-time.
Monitoring and user-activity tracking could not use the same backend service. The
monitoring service was too clunky, the data format was not oriented for activity
tracking, and the polling model for monitoring was not compatible with the push
model for tracking. At the same time, the tracking service was too fragile to use for
metrics, and the batch-oriented processing was not the right model for real-time
monitoring and alerting. However, the monitoring and tracking data shared many
traits, and correlation of the information (such as how specific types of user activity
affected application performance) was highly desirable. A drop in specific types of
user activity could indicate problems with the application that serviced it, but hours
of delay in processing activity batches meant a slow response to these types of issues.
14 | Chapter 1: Meet Kafka
At first, existing off-the-shelf open source solutions were thoroughly investigated to
find a new system that would provide real-time access to the data and scale out to
handle the amount of message traffic needed. Prototype systems were set up using
ActiveMQ, but at the time it could not handle the scale. It was also a fragile solution
for the way LinkedIn needed to use it, discovering many flaws in ActiveMQ that
would cause the brokers to pause. This would back up connections to clients and
interfere with the ability of the applications to serve requests to users. The decision
was made to move forward with a custom infrastructure for the data pipeline.
The Birth of Kafka
The development team at LinkedIn was led by Jay Kreps, a principal software engi‐
neer who was previously responsible for the development and open source release of
Voldemort, a distributed key-value storage system. The initial team also included
Neha Narkhede and, later, Jun Rao. Together, they set out to create a messaging sys‐
tem that could meet the needs of both the monitoring and tracking systems, and scale
for the future. The primary goals were to:
Decouple producers and consumers by using a push-pull model
Provide persistence for message data within the messaging system to allow multi‐
ple consumers
Optimize for high throughput of messages
Allow for horizontal scaling of the system to grow as the data streams grew
The result was a publish/subscribe messaging system that had an interface typical of
messaging systems but a storage layer more like a log-aggregation system. Combined
with the adoption of Apache Avro for message serialization, Kafka was effective for
handling both metrics and user-activity tracking at a scale of billions of messages per
day. The scalability of Kafka has helped LinkedIns usage grow in excess of one trillion
messages produced (as of August 2015) and over a petabyte of data consumed daily.
Open Source
Kafka was released as an open source project on GitHub in late 2010. As it started to
gain attention in the open source community, it was proposed and accepted as an
Apache Software Foundation incubator project in July of 2011. Apache Kafka gradu‐
ated from the incubator in October of 2012. Since then, it has continuously been
worked on and has found a robust community of contributors and committers out‐
side of LinkedIn. Kafka is now used in some of the largest data pipelines in the world.
In the fall of 2014, Jay Kreps, Neha Narkhede, and Jun Rao left LinkedIn to found
Confluent, a company centered around providing development, enterprise support,
and training for Apache Kafka. The two companies, along with ever-growing contri‐
Kafka’s Origin | 15
butions from others in the open source community, continue to develop and main‐
tain Kafka, making it the first choice for big data pipelines.
The Name
People often ask how Kafka got its name and if it has anything to do with the applica‐
tion itself. Jay Kreps offered the following insight:
I thought that since Kafka was a system optimized for writing, using a writer’s name
would make sense. I had taken a lot of lit classes in college and liked Franz Kafka. Plus
the name sounded cool for an open source project.
So basically there is not much of a relationship.
Getting Started with Kafka
Now that we know all about Kafka and its history, we can set it up and build our own
data pipeline. In the next chapter, we will explore installing and configuring Kafka.
We will also cover selecting the right hardware to run Kafka on, and some things to
keep in mind when moving to production operations.
16 | Chapter 1: Meet Kafka
CHAPTER 2
Installing Kafka
This chapter describes how to get started with the Apache Kafka broker, including
how to set up Apache Zookeeper, which is used by Kafka for storing metadata for the
brokers. The chapter will also cover the basic configuration options for a Kafka
deployment, as well as criteria for selecting the correct hardware to run the brokers
on. Finally, we cover how to install multiple Kafka brokers as part of a single cluster
and some specific concerns when using Kafka in a production environment.
First Things First
There are a few things that need to happen before using Apache Kafka. The following
sections tell you what those things are.
Choosing an Operating System
Apache Kafka is a Java application, and can run on many operating systems. This
includes Windows, MacOS, Linux, and others. The installation steps in this chapter
will be focused on setting up and using Kafka in a Linux environment, as this is the
most common OS on which it is installed. This is also the recommended OS for
deploying Kafka for general use. For information on installing Kafka on Windows
and MacOS, see Appendix A.
Installing Java
Prior to installing either Zookeeper or Kafka, you will need a Java environment set up
and functioning. This should be a Java 8 version, and can be the version provided by
your OS or one directly downloaded from java.com. Though Zookeeper and Kafka
will work with a runtime edition of Java, it may be more convenient when developing
tools and applications to have the full Java Development Kit (JDK). The installation
17
steps will assume you have installed JDK version 8 update 51 in /usr/java/
jdk1.8.0_51.
Installing Zookeeper
Apache Kafka uses Zookeeper to store metadata about the Kafka cluster, as well as
consumer client details, as shown in Figure 2-1. While it is possible to run a Zoo‐
keeper server using scripts contained in the Kafka distribution, it is trivial to install a
full version of Zookeeper from the distribution.
Figure 2-1. Kaa and Zookeeper
Kafka has been tested extensively with the stable 3.4.6 release of Zookeeper, which
can be downloaded from apache.org at http://bit.ly/2sDWSgJ.
Standalone Server
The following example installs Zookeeper with a basic configuration in /usr/local/
zookeeper, storing its data in /var/lib/zookeeper:
# tar -zxf zookeeper-3.4.6.tar.gz
# mv zookeeper-3.4.6 /usr/local/zookeeper
# mkdir -p /var/lib/zookeeper
# cat > /usr/local/zookeeper/conf/zoo.cfg << EOF
> tickTime=2000
> dataDir=/var/lib/zookeeper
> clientPort=2181
> EOF
# export JAVA_HOME=/usr/java/jdk1.8.0_51
# /usr/local/zookeeper/bin/zkServer.sh start
JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
#
You can now validate that Zookeeper is running correctly in standalone mode by
connecting to the client port and sending the four-letter command srvr:
18 | Chapter 2: Installing Kafka
# telnet localhost 2181
Trying ::1...
Connected to localhost.
Escape character is '^]'.
srvr
Zookeeper version: 3.4.6-1569965, built on 02/20/2014 09:09 GMT
Latency min/avg/max: 0/0/0
Received: 1
Sent: 0
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: standalone
Node count: 4
Connection closed by foreign host.
#
Zookeeper ensemble
A Zookeeper cluster is called an ensemble. Due to the algorithm used, it is recom‐
mended that ensembles contain an odd number of servers (e.g., 3, 5, etc.) as a major‐
ity of ensemble members (a quorum) must be working in order for Zookeeper to
respond to requests. This means that in a three-node ensemble, you can run with one
node missing. With a five-node ensemble, you can run with two nodes missing.
Sizing Your Zookeeper Ensemble
Consider running Zookeeper in a five-node ensemble. In order to
make configuration changes to the ensemble, including swapping a
node, you will need to reload nodes one at a time. If your ensemble
cannot tolerate more than one node being down, doing mainte‐
nance work introduces additional risk. It is also not recommended
to run more than seven nodes, as performance can start to degrade
due to the nature of the consensus protocol.
To configure Zookeeper servers in an ensemble, they must have a common configu‐
ration that lists all servers, and each server needs a myid file in the data directory that
specifies the ID number of the server. If the hostnames of the servers in the ensemble
are zoo1.example.com, zoo2.example.com, and zoo3.example.com, the configura‐
tion file might look like this:
tickTime=2000
dataDir=/var/lib/zookeeper
clientPort=2181
initLimit=20
syncLimit=5
server.1=zoo1.example.com:2888:3888
server.2=zoo2.example.com:2888:3888
server.3=zoo3.example.com:2888:3888
First Things First | 19
In this configuration, the initLimit is the amount of time to allow followers to con‐
nect with a leader. The syncLimit value limits how out-of-sync followers can be with
the leader. Both values are a number of tickTime units, which makes the initLimit
20 * 2000 ms, or 40 seconds. The configuration also lists each server in the ensemble.
The servers are specified in the format server.X=hostname:peerPort:leaderPort, with
the following parameters:
XThe ID number of the server. This must be an integer, but it does not need to be
zero-based or sequential.
hostname
The hostname or IP address of the server.
peerPort
The TCP port over which servers in the ensemble communicate with each other.
leaderPort
The TCP port over which leader election is performed.
Clients only need to be able to connect to the ensemble over the clientPort, but the
members of the ensemble must be able to communicate with each other over all three
ports.
In addition to the shared configuration file, each server must have a file in the data
Dir directory with the name myid. This file must contain the ID number of the server,
which must match the configuration file. Once these steps are complete, the servers
will start up and communicate with each other in an ensemble.
Installing a Kafka Broker
Once Java and Zookeeper are configured, you are ready to install Apache Kafka. The
current release of Kafka can be downloaded at http://kaa.apache.org/down
loads.html. At press time, that version is 0.9.0.1 running under Scala version 2.11.0.
The following example installs Kafka in /usr/local/kafka, configured to use the
Zookeeper server started previously and to store the message log segments stored
in /tmp/kafka-logs:
# tar -zxf kafka_2.11-0.9.0.1.tgz
# mv kafka_2.11-0.9.0.1 /usr/local/kafka
# mkdir /tmp/kafka-logs
# export JAVA_HOME=/usr/java/jdk1.8.0_51
# /usr/local/kafka/bin/kafka-server-start.sh -daemon
/usr/local/kafka/config/server.properties
#
20 | Chapter 2: Installing Kafka
Once the Kafka broker is started, we can verify that it is working by performing some
simple operations against the cluster creating a test topic, producing some messages,
and consuming the same messages.
Create and verify a topic:
# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181
--replication-factor 1 --partitions 1 --topic test
Created topic "test".
# /usr/local/kafka/bin/kafka-topics.sh --zookeeper localhost:2181
--describe --topic test
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
#
Produce messages to a test topic:
# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list
localhost:9092 --topic test
Test Message 1
Test Message 2
^D
#
Consume messages from a test topic:
# /usr/local/kafka/bin/kafka-console-consumer.sh --zookeeper
localhost:2181 --topic test --from-beginning
Test Message 1
Test Message 2
^C
Consumed 2 messages
#
Broker Conguration
The example configuration provided with the Kafka distribution is sufficient to run a
standalone server as a proof of concept, but it will not be sufficient for most installa‐
tions. There are numerous configuration options for Kafka that control all aspects of
setup and tuning. Many options can be left to the default settings, as they deal with
tuning aspects of the Kafka broker that will not be applicable until you have a specific
use case to work with and a specific use case that requires adjusting these settings.
General Broker
There are several broker configurations that should be reviewed when deploying
Kafka for any environment other than a standalone broker on a single server. These
parameters deal with the basic configuration of the broker, and most of them must be
changed to run properly in a cluster with other brokers.
Broker Conguration | 21
broker.id
Every Kafka broker must have an integer identifier, which is set using the broker.id
configuration. By default, this integer is set to 0, but it can be any value. The most
important thing is that the integer must be unique within a single Kafka cluster. The
selection of this number is arbitrary, and it can be moved between brokers if neces‐
sary for maintenance tasks. A good guideline is to set this value to something intrin‐
sic to the host so that when performing maintenance it is not onerous to map broker
ID numbers to hosts. For example, if your hostnames contain a unique number (such
as host1.example.com, host2.example.com, etc.), that is a good choice for the
broker.id value.
port
The example configuration file starts Kafka with a listener on TCP port 9092. This
can be set to any available port by changing the port configuration parameter. Keep
in mind that if a port lower than 1024 is chosen, Kafka must be started as root. Run‐
ning Kafka as root is not a recommended configuration.
zookeeper.connect
The location of the Zookeeper used for storing the broker metadata is set using the
zookeeper.connect configuration parameter. The example configuration uses a Zoo‐
keeper running on port 2181 on the local host, which is specified as localhost:2181.
The format for this parameter is a semicolon-separated list of hostname:port/path
strings, which include:
hostname, the hostname or IP address of the Zookeeper server.
port, the client port number for the server.
/path, an optional Zookeeper path to use as a chroot environment for the Kafka
cluster. If it is omitted, the root path is used.
If a chroot path is specified and does not exist, it will be created by the broker when it
starts up.
Why Use a Chroot Path
It is generally considered to be good practice to use a chroot path
for the Kafka cluster. This allows the Zookeeper ensemble to be
shared with other applications, including other Kafka clusters,
without a conflict. It is also best to specify multiple Zookeeper
servers (which are all part of the same ensemble) in this configura‐
tion. This allows the Kafka broker to connect to another member
of the Zookeeper ensemble in the event of server failure.
22 | Chapter 2: Installing Kafka
log.dirs
Kafka persists all messages to disk, and these log segments are stored in the directo‐
ries specified in the log.dirs configuration. This is a comma-separated list of paths on
the local system. If more than one path is specified, the broker will store partitions on
them in a “least-used” fashion with one partitions log segments stored within the
same path. Note that the broker will place a new partition in the path that has the
least number of partitions currently stored in it, not the least amount of disk space
used in the following situations:
num.recovery.threads.per.data.dir
Kafka uses a configurable pool of threads for handling log segments. Currently, this
thread pool is used:
When starting normally, to open each partitions log segments
When starting after a failure, to check and truncate each partitions log segments
When shutting down, to cleanly close log segments
By default, only one thread per log directory is used. As these threads are only used
during startup and shutdown, it is reasonable to set a larger number of threads in
order to parallelize operations. Specifically, when recovering from an unclean shut‐
down, this can mean the difference of several hours when restarting a broker with a
large number of partitions! When setting this parameter, remember that the number
configured is per log directory specified with log.dirs. This means that if num.recov
ery.threads.per.data.dir is set to 8, and there are 3 paths specified in log.dirs,
this is a total of 24 threads.
auto.create.topics.enable
The default Kafka configuration specifies that the broker should automatically create
a topic under the following circumstances:
When a producer starts writing messages to the topic
When a consumer starts reading messages from the topic
When any client requests metadata for the topic
In many situations, this can be undesirable behavior, especially as there is no way to
validate the existence of a topic through the Kafka protocol without causing it to be
created. If you are managing topic creation explicitly, whether manually or through a
provisioning system, you can set the auto.create.topics.enable configuration to
false.
Broker Conguration | 23
Topic Defaults
The Kafka server configuration specifies many default configurations for topics that
are created. Several of these parameters, including partition counts and message
retention, can be set per-topic using the administrative tools (covered in Chapter 9).
The defaults in the server configuration should be set to baseline values that are
appropriate for the majority of the topics in the cluster.
Using Per-Topic Overrides
In previous versions of Kafka, it was possible to specify per-topic
overrides for these configurations in the broker configuration
using the parameters log.retention.hours.per.topic,
log.retention.bytes.per.topic, and log.seg
ment.bytes.per.topic. These parameters are no longer sup‐
ported, and overrides must be specified using the administrative
tools.
num.partitions
The num.partitions parameter determines how many partitions a new topic is cre‐
ated with, primarily when automatic topic creation is enabled (which is the default
setting). This parameter defaults to one partition. Keep in mind that the number of
partitions for a topic can only be increased, never decreased. This means that if a
topic needs to have fewer partitions than num.partitions, care will need to be taken
to manually create the topic (discussed in Chapter 9).
As described in Chapter 1, partitions are the way a topic is scaled within a Kafka clus‐
ter, which makes it important to use partition counts that will balance the message
load across the entire cluster as brokers are added. Many users will have the partition
count for a topic be equal to, or a multiple of, the number of brokers in the cluster.
This allows the partitions to be evenly distributed to the brokers, which will evenly
distribute the message load. This is not a requirement, however, as you can also bal‐
ance message load by having multiple topics.
24 | Chapter 2: Installing Kafka
How to Choose the Number of Partitions
There are several factors to consider when choosing the number of
partitions:
What is the throughput you expect to achieve for the topic?
For example, do you expect to write 100 KB per second or 1
GB per second?
What is the maximum throughput you expect to achieve when
consuming from a single partition? You will always have, at
most, one consumer reading from a partition, so if you know
that your slower consumer writes the data to a database and
this database never handles more than 50 MB per second from
each thread writing to it, then you know you are limited to
60MB throughput when consuming from a partition.
You can go through the same exercise to estimate the maxi‐
mum throughput per producer for a single partition, but since
producers are typically much faster than consumers, it is usu‐
ally safe to skip this.
If you are sending messages to partitions based on keys,
adding partitions later can be very challenging, so calculate
throughput based on your expected future usage, not the cur‐
rent usage.
Consider the number of partitions you will place on each
broker and available diskspace and network bandwidth per
broker.
Avoid overestimating, as each partition uses memory and
other resources on the broker and will increase the time for
leader elections.
With all this in mind, it’s clear that you want many partitions but
not too many. If you have some estimate regarding the target
throughput of the topic and the expected throughput of the con‐
sumers, you can divide the target throughput by the expected con‐
sumer throughput and derive the number of partitions this way. So
if I want to be able to write and read 1 GB/sec from a topic, and I
know each consumer can only process 50 MB/s, then I know I need
at least 20 partitions. This way, I can have 20 consumers reading
from the topic and achieve 1 GB/sec.
If you dont have this detailed information, our experience suggests
that limiting the size of the partition on the disk to less than 6 GB
per day of retention often gives satisfactory results.
Broker Conguration | 25
log.retention.ms
The most common configuration for how long Kafka will retain messages is by time.
The default is specified in the configuration file using the log.retention.hours
parameter, and it is set to 168 hours, or one week. However, there are two other
parameters allowed, log.retention.minutes and log.retention.ms. All three of
these specify the same configuration—the amount of time after which messages may
be deleted—but the recommended parameter to use is log.retention.ms, as the
smaller unit size will take precedence if more than one is specified. This will make
sure that the value set for log.retention.ms is always the one used. If more than one
is specified, the smaller unit size will take precedence.
Retention By Time and Last Modied Times
Retention by time is performed by examining the last modified
time (mtime) on each log segment file on disk. Under normal clus‐
ter operations, this is the time that the log segment was closed, and
represents the timestamp of the last message in the file. However,
when using administrative tools to move partitions between brok‐
ers, this time is not accurate and will result in excess retention for
these partitions. More information on this is provided in Chapter 9
when discussing partition moves.
log.retention.bytes
Another way to expire messages is based on the total number of bytes of messages
retained. This value is set using the log.retention.bytes parameter, and it is
applied per-partition. This means that if you have a topic with 8 partitions, and
log.retention.bytes is set to 1 GB, the amount of data retained for the topic will be
8 GB at most. Note that all retention is performed for individual partitions, not the
topic. This means that should the number of partitions for a topic be expanded, the
retention will also increase if log.retention.bytes is used.
Conguring Retention by Size and Time
If you have specified a value for both log.retention.bytes and
log.retention.ms (or another parameter for retention by time),
messages may be removed when either criteria is met. For example,
if log.retention.ms is set to 86400000 (1 day) and log.reten
tion.bytes is set to 1000000000 (1 GB), it is possible for messages
that are less than 1 day old to get deleted if the total volume of mes‐
sages over the course of the day is greater than 1 GB. Conversely, if
the volume is less than 1 GB, messages can be deleted after 1 day
even if the total size of the partition is less than 1 GB.
26 | Chapter 2: Installing Kafka
log.segment.bytes
The log-retention settings previously mentioned operate on log segments, not indi‐
vidual messages. As messages are produced to the Kafka broker, they are appended to
the current log segment for the partition. Once the log segment has reached the size
specified by the log.segment.bytes parameter, which defaults to 1 GB, the log seg‐
ment is closed and a new one is opened. Once a log segment has been closed, it can be
considered for expiration. A smaller log-segment size means that files must be closed
and allocated more often, which reduces the overall efficiency of disk writes.
Adjusting the size of the log segments can be important if topics have a low produce
rate. For example, if a topic receives only 100 megabytes per day of messages, and
log.segment.bytes is set to the default, it will take 10 days to fill one segment. As
messages cannot be expired until the log segment is closed, if log.retention.ms is
set to 604800000 (1 week), there will actually be up to 17 days of messages retained
until the closed log segment expires. This is because once the log segment is closed
with the current 10 days of messages, that log segment must be retained for 7 days
before it expires based on the time policy (as the segment cannot be removed until
the last message in the segment can be expired).
Retrieving Osets by Timestamp
The size of the log segment also affects the behavior of fetching off‐
sets by timestamp. When requesting offsets for a partition at a spe‐
cific timestamp, Kafka finds the log segment file that was being
written at that time. It does this by using the creation and last
modified time of the file, and looking for a file that was created
before the timestamp specified and last modified after the time‐
stamp. The offset at the beginning of that log segment (which is
also the filename) is returned in the response.
log.segment.ms
Another way to control when log segments are closed is by using the log.segment.ms
parameter, which specifies the amount of time after which a log segment should be
closed. As with the log.retention.bytes and log.retention.ms parameters,
log.segment.bytes and log.segment.ms are not mutually exclusive properties.
Kafka will close a log segment either when the size limit is reached or when the time
limit is reached, whichever comes first. By default, there is no setting for log.seg
ment.ms, which results in only closing log segments by size.
Broker Conguration | 27
Disk Performance When Using Time-Based Segments
When using a time-based log segment limit, it is important to con‐
sider the impact on disk performance when multiple log segments
are closed simultaneously. This can happen when there are many
partitions that never reach the size limit for log segments, as the
clock for the time limit will start when the broker starts and will
always execute at the same time for these low-volume partitions.
message.max.bytes
The Kafka broker limits the maximum size of a message that can be produced, con‐
figured by the message.max.bytes parameter, which defaults to 1000000, or 1 MB. A
producer that tries to send a message larger than this will receive an error back from
the broker, and the message will not be accepted. As with all byte sizes specified on
the broker, this configuration deals with compressed message size, which means that
producers can send messages that are much larger than this value uncompressed,
provided they compress to under the configured message.max.bytes size.
There are noticeable performance impacts from increasing the allowable message
size. Larger messages will mean that the broker threads that deal with processing net‐
work connections and requests will be working longer on each request. Larger mes‐
sages also increase the size of disk writes, which will impact I/O throughput.
Coordinating Message Size Congurations
The message size configured on the Kafka broker must be coordi‐
nated with the fetch.message.max.bytes configuration on con‐
sumer clients. If this value is smaller than message.max.bytes,
then consumers that encounter larger messages will fail to fetch
those messages, resulting in a situation where the consumer gets
stuck and cannot proceed. The same rule applies to the rep
lica.fetch.max.bytes configuration on the brokers when config‐
ured in a cluster.
Hardware Selection
Selecting an appropriate hardware configuration for a Kafka broker can be more art
than science. Kafka itself has no strict requirement on a specific hardware configura‐
tion, and will run without issue on any system. Once performance becomes a con‐
cern, however, there are several factors that will contribute to the overall
performance: disk throughput and capacity, memory, networking, and CPU. Once
you have determined which types of performance are the most critical for your envi‐
ronment, you will be able to select an optimized hardware configuration that fits
within your budget.
28 | Chapter 2: Installing Kafka
Disk Throughput
The performance of producer clients will be most directly influenced by the through‐
put of the broker disk that is used for storing log segments. Kafka messages must be
committed to local storage when they are produced, and most clients will wait until at
least one broker has confirmed that messages have been committed before consider‐
ing the send successful. This means that faster disk writes will equal lower produce
latency.
The obvious decision when it comes to disk throughput is whether to use traditional
spinning hard drives (HDD) or solid-state disks (SSD). SSDs have drastically lower
seek and access times and will provide the best performance. HDDs, on the other
hand, are more economical and provide more capacity per unit. You can also improve
the performance of HDDs by using more of them in a broker, whether by having
multiple data directories or by setting up the drives in a redundant array of independ‐
ent disks (RAID) configuration. Other factors, such as the specific drive technology
(e.g., serial attached storage or serial ATA), as well as the quality of the drive control‐
ler, will affect throughput.
Disk Capacity
Capacity is the other side of the storage discussion. The amount of disk capacity that
is needed is determined by how many messages need to be retained at any time. If the
broker is expected to receive 1 TB of traffic each day, with 7 days of retention, then
the broker will need a minimum of 7 TB of useable storage for log segments. You
should also factor in at least 10% overhead for other files, in addition to any buffer
that you wish to maintain for fluctuations in traffic or growth over time.
Storage capacity is one of the factors to consider when sizing a Kafka cluster and
determining when to expand it. The total traffic for a cluster can be balanced across it
by having multiple partitions per topic, which will allow additional brokers to aug‐
ment the available capacity if the density on a single broker will not suffice. The deci‐
sion on how much disk capacity is needed will also be informed by the replication
strategy chosen for the cluster (which is discussed in more detail in Chapter 6).
Memory
The normal mode of operation for a Kafka consumer is reading from the end of the
partitions, where the consumer is caught up and lagging behind the producers very
little, if at all. In this situation, the messages the consumer is reading are optimally
stored in the systems page cache, resulting in faster reads than if the broker has to
reread the messages from disk. Therefore, having more memory available to the sys‐
tem for page cache will improve the performance of consumer clients.
Hardware Selection | 29
Kafka itself does not need much heap memory configured for the Java Virtual
Machine (JVM). Even a broker that is handling X messages per second and a data rate
of X megabits per second can run with a 5 GB heap. The rest of the system memory
will be used by the page cache and will benefit Kafka by allowing the system to cache
log segments in use. This is the main reason it is not recommended to have Kafka
collocated on a system with any other significant application, as they will have to
share the use of the page cache. This will decrease the consumer performance for
Kafka.
Networking
The available network throughput will specify the maximum amount of traffic that
Kafka can handle. This is often the governing factor, combined with disk storage, for
cluster sizing. This is complicated by the inherent imbalance between inbound and
outbound network usage that is created by Kafkas support for multiple consumers. A
producer may write 1 MB per second for a given topic, but there could be any num‐
ber of consumers that create a multiplier on the outbound network usage. Other
operations such as cluster replication (covered in Chapter 6) and mirroring (dis‐
cussed in Chapter 8) will also increase requirements. Should the network interface
become saturated, it is not uncommon for cluster replication to fall behind, which
can leave the cluster in a vulnerable state.
CPU
Processing power is not as important as disk and memory, but it will affect overall
performance of the broker to some extent. Ideally, clients should compress messages
to optimize network and disk usage. The Kafka broker must decompress all message
batches, however, in order to validate the checksum of the individual messages and
assign offsets. It then needs to recompress the message batch in order to store it on
disk. This is where the majority of Kafkas requirement for processing power comes
from. This should not be the primary factor in selecting hardware, however.
Kafka in the Cloud
A common installation for Kafka is within cloud computing environments, such as
Amazon Web Services (AWS). AWS provides many compute instances, each with a
different combination of CPU, memory, and disk, and so the various performance
characteristics of Kafka must be prioritized in order to select the correct instance
configuration to use. A good place to start is with the amount of data retention
required, followed by the performance needed from the producers. If very low
latency is necessary, I/O optimized instances that have local SSD storage might be
required. Otherwise, ephemeral storage (such as the AWS Elastic Block Store) might
30 | Chapter 2: Installing Kafka
be sufficient. Once these decisions are made, the CPU and memory options available
will be appropriate for the performance.
In real terms, this will mean that for AWS either the m4 or r3 instance types are a
common choice. The m4 instance will allow for greater retention periods, but the
throughput to the disk will be less because it is on elastic block storage. The r3
instance will have much better throughput with local SSD drives, but those drives will
limit the amount of data that can be retained. For the best of both worlds, it is neces‐
sary to move up to either the i2 or d2 instance types, which are significantly more
expensive.
Kafka Clusters
A single Kafka server works well for local development work, or for a proof-of-
concept system, but there are significant benefits to having multiple brokers config‐
ured as a cluster, as shown in Figure 2-2. The biggest benefit is the ability to scale the
load across multiple servers. A close second is using replication to guard against data
loss due to single system failures. Replication will also allow for performing mainte‐
nance work on Kafka or the underlying systems while still maintaining availability for
clients. This section focuses on configuring only a Kafka cluster. Chapter 6 contains
more more information on replication of data.
Figure 2-2. A simple Kaa cluster
Kafka Clusters | 31
How Many Brokers?
The appropriate size for a Kafka cluster is determined by several factors. The first fac‐
tor to consider is how much disk capacity is required for retaining messages and how
much storage is available on a single broker. If the cluster is required to retain 10 TB
of data and a single broker can store 2 TB, then the minimum cluster size is five brok‐
ers. In addition, using replication will increase the storage requirements by at least
100%, depending on the replication factor chosen (see Chapter 6). This means that
this same cluster, configured with replication, now needs to contain at least 10 brok‐
ers.
The other factor to consider is the capacity of the cluster to handle requests. For
example, what is the capacity of the network interfaces, and can they handle the client
traffic if there are multiple consumers of the data or if the traffic is not consistent
over the retention period of the data (e.g., bursts of traffic during peak times). If the
network interface on a single broker is used to 80% capacity at peak, and there are
two consumers of that data, the consumers will not be able to keep up with peak traf‐
fic unless there are two brokers. If replication is being used in the cluster, this is an
additional consumer of the data that must be taken into account. It may also be desir‐
able to scale out to more brokers in a cluster in order to handle performance con‐
cerns caused by lesser disk throughput or system memory available.
Broker Conguration
There are only two requirements in the broker configuration to allow multiple Kafka
brokers to join a single cluster. The first is that all brokers must have the same config‐
uration for the zookeeper.connect parameter. This specifies the Zookeeper ensemble
and path where the cluster stores metadata. The second requirement is that all brok‐
ers in the cluster must have a unique value for the broker.id parameter. If two brok‐
ers attempt to join the same cluster with the same broker.id, the second broker will
log an error and fail to start. There are other configuration parameters used when
running a cluster—specifically, parameters that control replication, which are covered
in later chapters.
OS Tuning
While most Linux distributions have an out-of-the-box configuration for the kernel-
tuning parameters that will work fairly well for most applications, there are a few
changes that can be made for a Kafka broker that will improve performance. These
primarily revolve around the virtual memory and networking subsystems, as well as
specific concerns for the disk mount point that is used for storing log segments.
These parameters are typically configured in the /etc/sysctl.conf file, but you should
refer to your Linux distributions documentation for specific details regarding how to
adjust the kernel configuration.
32 | Chapter 2: Installing Kafka
Virtual Memory
In general, the Linux virtual memory system will automatically adjust itself for the
workload of the system. We can make some adjustments to both how swap space is
handled, as well as to dirty memory pages, to tune it for Kafkas workload.
As with most applications—specifically ones where throughput is a concern—it is
best to avoid swapping at (almost) all costs. The cost incurred by having pages of
memory swapped to disk will show up as a noticeable impact on all aspects of perfor‐
mance in Kafka. In addition, Kafka makes heavy use of the system page cache, and if
the VM system is swapping to disk, there is not enough memory being allocated to
page cache.
One way to avoid swapping is just to not configure any swap space at all. Having swap
is not a requirement, but it does provide a safety net if something catastrophic hap‐
pens on the system. Having swap can prevent the OS from abruptly killing a process
due to an out-of-memory condition. For this reason, the recommendation is to set
the vm.swappiness parameter to a very low value, such as 1. The parameter is a per‐
centage of how likely the VM subsystem is to use swap space rather than dropping
pages from the page cache. It is preferable to reduce the size of the page cache rather
than swap.
Why Not Set Swappiness to Zero?
Previously, the recommendation for vm.swappiness was always to
set it to 0. This value used to have the meaning “do not swap unless
there is an out-of-memory condition.” However, the meaning of
this value changed as of Linux kernel version 3.5-rc1, and that
change was backported into many distributions, including Red Hat
Enterprise Linux kernels as of version 2.6.32-303. This changed the
meaning of the value 0 to “never swap under any circumstances.” It
is for this reason that a value of 1 is now recommended.
There is also a benefit to adjusting how the kernel handles dirty pages that must be
flushed to disk. Kafka relies on disk I/O performance to provide good response times
to producers. This is also the reason that the log segments are usually put on a fast
disk, whether that is an individual disk with a fast response time (e.g., SSD) or a disk
subsystem with significant NVRAM for caching (e.g., RAID). The result is that the
number of dirty pages that are allowed, before the flush background process starts
writing them to disk, can be reduced. This is accomplished by setting the
=vm.dirty_background_ratio value lower than the default of 10. The value is a per‐
centage of the total amount of system memory, and setting this value to 5 is appropri‐
ate in many situations. This setting should not be set to zero, however, as that would
cause the kernel to continually flush pages, which would then eliminate the ability of
Kafka Clusters | 33
the kernel to buffer disk writes against temporary spikes in the underlying device per‐
formance.
The total number of dirty pages that are allowed before the kernel forces synchronous
operations to flush them to disk can also be increased by changing the value of
vm.dirty_ratio, increasing it to above the default of 20 (also a percentage of total
system memory). There is a wide range of possible values for this setting, but between
60 and 80 is a reasonable number. This setting does introduce a small amount of risk,
both in regards to the amount of unflushed disk activity as well as the potential for
long I/O pauses if synchronous flushes are forced. If a higher setting for
vm.dirty_ratio is chosen, it is highly recommended that replication be used in the
Kafka cluster to guard against system failures.
When choosing values for these parameters, it is wise to review the number of dirty
pages over time while the Kafka cluster is running under load, whether in production
or simulated. The current number of dirty pages can be determined by checking
the /proc/vmstat file:
# cat /proc/vmstat | egrep "dirty|writeback"
nr_dirty 3875
nr_writeback 29
nr_writeback_temp 0
#
Disk
Outside of selecting the disk device hardware, as well as the configuration of RAID if
it is used, the choice of filesystem used for this disk can have the next largest impact
on performance. There are many different filesystems available, but the most com‐
mon choices for local filesystems are either EXT4 (fourth extended file system) or
Extents File System (XFS). Recently, XFS has become the default filesystem for many
Linux distributions, and this is for good reason—it outperforms EXT4 for most
workloads with minimal tuning required. EXT4 can perform well, but it requires
using tuning parameters that are considered less safe. This includes setting the com‐
mit interval to a longer time than the default of five to force less frequent flushes.
EXT4 also introduced delayed allocation of blocks, which brings with it a greater
chance of data loss and filesystem corruption in the case of a system failure. The XFS
filesystem also uses a delayed allocation algorithm, but it is generally safer than the
one used by EXT4. XFS also has better performance for Kafkas workload without
requiring tuning beyond the automatic tuning performed by the filesystem. It is also
more efficient when batching disk writes, all of which combine to give better overall
I/O throughput.
Regardless of which filesystem is chosen for the mount that holds the log segments, it
is advisable to set the noatime mount option for the mount point. File metadata con‐
tains three timestamps: creation time (ctime), last modified time (mtime), and last
34 | Chapter 2: Installing Kafka
access time (atime). By default, the atime is updated every time a file is read. This
generates a large number of disk writes. The atime attribute is generally considered to
be of little use, unless an application needs to know if a file has been accessed since it
was last modified (in which case the realtime option can be used). The atime is not
used by Kafka at all, so disabling it is safe to do. Setting noatime on the mount will
prevent these timestamp updates from happening, but will not affect the proper han‐
dling of the ctime and mtime attributes.
Networking
Adjusting the default tuning of the Linux networking stack is common for any appli‐
cation that generates a high amount of network traffic, as the kernel is not tuned by
default for large, high-speed data transfers. In fact, the recommended changes for
Kafka are the same as those suggested for most web servers and other networking
applications. The first adjustment is to change the default and maximum amount of
memory allocated for the send and receive buffers for each socket. This will signifi‐
cantly increase performance for large transfers. The relevant parameters for the send
and receive buffer default size per socket are net.core.wmem_default and
net.core.rmem_default, and a reasonable setting for these parameters is 131072, or
128 KiB. The parameters for the send and receive buffer maximum sizes are
net.core.wmem_max and net.core.rmem_max, and a reasonable setting is 2097152, or
2 MiB. Keep in mind that the maximum size does not indicate that every socket will
have this much buffer space allocated; it only allows up to that much if needed.
In addition to the socket settings, the send and receive buffer sizes for TCP sockets
must be set separately using the net.ipv4.tcp_wmem and net.ipv4.tcp_rmem param‐
eters. These are set using three space-separated integers that specify the minimum,
default, and maximum sizes, respectively. The maximum size cannot be larger than
the values specified for all sockets using net.core.wmem_max and
net.core.rmem_max. An example setting for each of these parameters is “4096 65536
2048000,” which is a 4 KiB minimum, 64 KiB default, and 2 MiB maximum buffer.
Based on the actual workload of your Kafka brokers, you may want to increase the
maximum sizes to allow for greater buffering of the network connections.
There are several other network tuning parameters that are useful to set. Enabling
TCP window scaling by setting net.ipv4.tcp_window_scaling to 1 will allow clients
to transfer data more efficiently, and allow that data to be buffered on the broker side.
Increasing the value of net.ipv4.tcp_max_syn_backlog above the default of 1024
will allow a greater number of simultaneous connections to be accepted. Increasing
the value of net.core.netdev_max_backlog to greater than the default of 1000 can
assist with bursts of network traffic, specifically when using multigigabit network
connection speeds, by allowing more packets to be queued for the kernel to process
them.
Kafka Clusters | 35
Production Concerns
Once you are ready to move your Kafka environment out of testing and into your
production operations, there are a few more things to think about that will assist with
setting up a reliable messaging service.
Garbage Collector Options
Tuning the Java garbage-collection options for an application has always been some‐
thing of an art, requiring detailed information about how the application uses mem‐
ory and a significant amount of observation and trial and error. Thankfully, this has
changed with Java 7 and the introduction of the Garbage First (or G1) garbage collec‐
tor. G1 is designed to automatically adjust to different workloads and provide consis‐
tent pause times for garbage collection over the lifetime of the application. It also
handles large heap sizes with ease by segmenting the heap into smaller zones and not
collecting over the entire heap in each pause.
G1 does all of this with a minimal amount of configuration in normal operation.
There are two configuration options for G1 used to adjust its performance:
MaxGCPauseMillis
This option specifies the preferred pause time for each garbage-collection cycle.
It is not a fixed maximum—G1 can and will exceed this time if it is required. This
value defaults to 200 milliseconds. This means that G1 will attempt to schedule
the frequency of GC cycles, as well as the number of zones that are collected in
each cycle, such that each cycle will take approximately 200ms.
InitiatingHeapOccupancyPercent
This option specifies the percentage of the total heap that may be in use before
G1 will start a collection cycle. The default value is 45. This means that G1 will
not start a collection cycle until after 45% of the heap is in use. This includes both
the new (Eden) and old zone usage in total.
The Kafka broker is fairly efficient with the way it utilizes heap memory and creates
garbage objects, so it is possible to set these options lower. The GC tuning options
provided in this section have been found to be appropriate for a server with 64 GB of
memory, running Kafka in a 5GB heap. For MaxGCPauseMillis, this broker can be
configured with a value of 20 ms. The value for InitiatingHeapOccupancyPercent is
set to 35, which causes garbage collection to run slightly earlier than with the default
value.
The start script for Kafka does not use the G1 collector, instead defaulting to using
parallel new and concurrent mark and sweep garbage collection. The change is easy
to make via environment variables. Using the start command from earlier in the
chapter, modify it as follows:
36 | Chapter 2: Installing Kafka
# export JAVA_HOME=/usr/java/jdk1.8.0_51
# export KAFKA_JVM_PERFORMANCE_OPTS="-server -XX:+UseG1GC
-XX:MaxGCPauseMillis=20 -XX:InitiatingHeapOccupancyPercent=35
-XX:+DisableExplicitGC -Djava.awt.headless=true"
# /usr/local/kafka/bin/kafka-server-start.sh -daemon
/usr/local/kafka/config/server.properties
#
Datacenter Layout
For development systems, the physical location of the Kafka brokers within a data‐
center is not as much of a concern, as there is not as severe an impact if the cluster is
partially or completely unavailable for short periods of time. When serving produc‐
tion traffic, however, downtime means dollars lost, whether through loss of services
to users or loss of telemetry on what the users are doing. This is when it becomes crit‐
ical to configure replication within the Kafka cluster (see Chapter 6), which is also
when it is important to consider the physical location of brokers in their racks in the
datacenter. If not addressed prior to deploying Kafka, expensive maintenance to
move servers around may be needed.
The Kafka broker has no rack-awareness when assigning new partitions to brokers.
This means that it cannot take into account that two brokers may be located in the
same physical rack, or in the same availability zone (if running in a cloud service like
AWS), and therefore can easily assign all replicas for a partition to brokers that share
the same power and network connections in the same rack. Should that rack have a
failure, these partitions would be offline and inaccessible to clients. In addition, it can
result in additional lost data on recovery due to an unclean leader election (more
about this in Chapter 6).
The best practice is to have each Kafka broker in a cluster installed in a different rack,
or at the very least not share single points of failure for infrastructure services such as
power and network. This typically means at least deploying the servers that will run
brokers with dual power connections (to two different circuits) and dual network
switches (with a bonded interface on the servers themselves to failover seamlessly).
Even with dual connections, there is a benefit to having brokers in completely sepa‐
rate racks. From time to time, it may be neccessary to perform physical maintenance
on a rack or cabinet that requires it to be offline (such as moving servers around, or
rewiring power connections).
Colocating Applications on Zookeeper
Kafka utilizes Zookeeper for storing metadata information about the brokers, topics,
and partitions. Writes to Zookeeper are only performed on changes to the member‐
ship of consumer groups or on changes to the Kafka cluster itself. This amount of
traffic is minimal, and it does not justify the use of a dedicated Zookeeper ensemble
Production Concerns | 37
for a single Kafka cluster. In fact, many deployments will use a single Zookeeper
ensemble for multiple Kafka clusters (using a chroot Zookeeper path for each cluster,
as described earlier in this chapter).
Kafka Consumers and Zookeeper
Prior to Apache Kafka 0.9.0.0, consumers, in addition to the brok‐
ers, utilized Zookeeper to directly store information about the
composition of the consumer group, what topics it was consuming,
and to periodically commit offsets for each partition being con‐
sumed (to enable failover between consumers in the group). With
version 0.9.0.0, a new consumer interface was introduced which
allows this to be managed directly with the Kafka brokers. This is
the consumer discussed in Chapter 4.
However, there is a concern with consumers and Zookeeper under certain configura‐
tions. Consumers have a configurable choice to use either Zookeeper or Kafka for
committing offsets, and they can also configure the interval between commits. If the
consumer uses Zookeeper for offsets, each consumer will perform a Zookeeper write
at every interval for every partition it consumes. A reasonable interval for offset com‐
mits is 1 minute, as this is the period of time over which a consumer group will read
duplicate messages in the case of a consumer failure. These commits can be a signifi‐
cant amount of Zookeeper traffic, especially in a cluster with many consumers, and
will need to be taken into account. It may be neccessary to use a longer commit inter‐
val if the Zookeeper ensemble is not able to handle the traffic. However, it is recom‐
mended that consumers using the latest Kafka libraries use Kafka for committing
offsets, removing the dependency on Zookeeper.
Outside of using a single ensemble for multiple Kafka clusters, it is not recommended
to share the ensemble with other applications, if it can be avoided. Kafka is sensitive
to Zookeeper latency and timeouts, and an interruption in communications with the
ensemble will cause the brokers to behave unpredictably. This can easily cause multi‐
ple brokers to go offline at the same time, should they lose Zookeeper connections,
which will result in offline partitions. It also puts stress on the cluster controller,
which can show up as subtle errors long after the interruption has passed, such as
when trying to perform a controlled shutdown of a broker. Other applications that
can put stress on the Zookeeper ensemble, either through heavy usage or improper
operations, should be segregated to their own ensemble.
38 | Chapter 2: Installing Kafka
Summary
In this chapter we learned how to get Apache Kafka up and running. We also covered
picking the right hardware for your brokers, and specific concerns around getting set
up in a production environment. Now that you have a Kafka cluster, we will walk
through the basics of Kafka client applications. The next two chapters will cover how
to create clients for both producing messages to Kafka (Chapter 3), as well as con‐
suming those messages out again (Chapter 4).
Summary | 39
CHAPTER 3
Kafka Producers: Writing Messages
to Kafka
Whether you use Kafka as a queue, message bus, or data storage platform, you will
always use Kafka by writing a producer that writes data to Kafka, a consumer that
reads data from Kafka, or an application that serves both roles.
For example, in a credit card transaction processing system, there will be a client
application, perhaps an online store, responsible for sending each transaction to
Kafka immediately when a payment is made. Another application is responsible for
immediately checking this transaction against a rules engine and determining
whether the transaction is approved or denied. The approve/deny response can then
be written back to Kafka and the response can propagate back to the online store
where the transaction was initiated. A third application can read both transactions
and the approval status from Kafka and store them in a database where analysts can
later review the decisions and perhaps improve the rules engine.
Apache Kafka ships with built-in client APIs that developers can use when developing
applications that interact with Kafka.
In this chapter we will learn how to use the Kafka producer, starting with an overview
of its design and components. We will show how to create KafkaProducer and Produ
cerRecord objects, how to send records to Kafka, and how to handle the errors that
Kafka may return. We’ll then review the most important configuration options used
to control the producer behavior. We’ll conclude with a deeper look at how to use dif‐
ferent partitioning methods and serializers, and how to write your own serializers
and partitioners.
In Chapter 4 we will look at Kafkas consumer client and reading data from Kafka.
41
Third-Party Clients
In addition to the built-in clients, Kafka has a binary wire protocol.
This means that it is possible for applications to read messages
from Kafka or write messages to Kafka simply by sending the cor‐
rect byte sequences to Kafkas network port. There are multiple cli‐
ents that implement Kafkas wire protocol in different
programming languages, giving simple ways to use Kafka not just
in Java applications but also in languages like C++, Python, Go,
and many more. Those clients are not part of Apache Kafka
project, but a list of non-Java clients is maintained in the project
wiki. The wire protocol and the external clients are outside the
scope of the chapter.
Producer Overview
There are many reasons an application might need to write messages to Kafka:
recording user activities for auditing or analysis, recording metrics, storing log mes‐
sages, recording information from smart appliances, communicating asynchronously
with other applications, buffering information before writing to a database, and much
more.
Those diverse use cases also imply diverse requirements: is every message critical, or
can we tolerate loss of messages? Are we OK with accidentally duplicating messages?
Are there any strict latency or throughput requirements we need to support?
In the credit card transaction processing example we introduced earlier, we can see
that it is critical to never lose a single message nor duplicate any messages. Latency
should be low but latencies up to 500ms can be tolerated, and throughput should be
very high—we expect to process up to a million messages a second.
A different use case might be to store click information from a website. In that case,
some message loss or a few duplicates can be tolerated; latency can be high as long as
there is no impact on the user experience. In other words, we dont mind if it takes a
few seconds for the message to arrive at Kafka, as long as the next page loads immedi‐
ately after the user clicked on a link. Throughput will depend on the level of activity
we anticipate on our website.
The different requirements will influence the way you use the producer API to write
messages to Kafka and the configuration you use.
While the producer APIs are very simple, there is a bit more that goes on under the
hood of the producer when we send data. Figure 3-1 shows the main steps involved in
sending data to Kafka.
42 | Chapter 3: Kafka Producers: Writing Messages to Kafka
Figure 3-1. High-level overview of Kaa producer components
We start producing messages to Kafka by creating a ProducerRecord, which must
include the topic we want to send the record to and a value. Optionally, we can also
specify a key and/or a partition. Once we send the ProducerRecord, the first thing the
producer will do is serialize the key and value objects to ByteArrays so they can be
sent over the network.
Next, the data is sent to a partitioner. If we specified a partition in the
ProducerRecord, the partitioner doesn’t do anything and simply returns the partition
we specified. If we didn’t, the partitioner will choose a partition for us, usually based
on the ProducerRecord key. Once a partition is selected, the producer knows which
topic and partition the record will go to. It then adds the record to a batch of records
that will also be sent to the same topic and partition. A separate thread is responsible
for sending those batches of records to the appropriate Kafka brokers.
When the broker receives the messages, it sends back a response. If the messages
were successfully written to Kafka, it will return a RecordMetadata object with the
Producer Overview | 43
topic, partition, and the offset of the record within the partition. If the broker failed
to write the messages, it will return an error. When the producer receives an error, it
may retry sending the message a few more times before giving up and returning an
error.
Constructing a Kafka Producer
The first step in writing messages to Kafka is to create a producer object with the
properties you want to pass to the producer. A Kafka producer has three mandatory
properties:
bootstrap.servers
List of host:port pairs of brokers that the producer will use to establish initial
connection to the Kafka cluster. This list doesnt need to include all brokers, since
the producer will get more information after the initial connection. But it is rec‐
ommended to include at least two, so in case one broker goes down, the producer
will still be able to connect to the cluster.
key.serializer
Name of a class that will be used to serialize the keys of the records we will pro‐
duce to Kafka. Kafka brokers expect byte arrays as keys and values of messages.
However, the producer interface allows, using parameterized types, any Java
object to be sent as a key and value. This makes for very readable code, but it also
means that the producer has to know how to convert these objects to byte arrays.
key.serializer should be set to a name of a class that implements the
org.apache.kafka.common.serialization.Serializer interface. The producer
will use this class to serialize the key object to a byte array. The Kafka client pack‐
age includes ByteArraySerializer (which doesn’t do much),
StringSerializer, and IntegerSerializer, so if you use common types, there
is no need to implement your own serializers. Setting key.serializer is
required even if you intend to send only values.
value.serializer
Name of a class that will be used to serialize the values of the records we will pro‐
duce to Kafka. The same way you set key.serializer to a name of a class that
will serialize the message key object to a byte array, you set value.serializer to
a class that will serialize the message value object.
The following code snippet shows how to create a new producer by setting just the
mandatory parameters and using defaults for everything else:
private Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "broker1:9092,broker2:9092");
kafkaProps.put("key.serializer",
44 | Chapter 3: Kafka Producers: Writing Messages to Kafka
"org.apache.kafka.common.serialization.StringSerializer");
kafkaProps.put("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
producer = new KafkaProducer<String, String>(kafkaProps);
We start with a Properties object.
Since we plan on using strings for message key and value, we use the built-in
StringSerializer.
Here we create a new producer by setting the appropriate key and value types
and passing the Properties object.
With such a simple interface, it is clear that most of the control over producer behav‐
ior is done by setting the correct configuration properties. Apache Kafka documenta‐
tion covers all the configuration options, and we will go over the important ones later
in this chapter.
Once we instantiate a producer, it is time to start sending messages. There are three
primary methods of sending messages:
Fire-and-forget
We send a message to the server and don’t really care if it arrives succesfully or
not. Most of the time, it will arrive successfully, since Kafka is highly available
and the producer will retry sending messages automatically. However, some mes‐
sages will get lost using this method.
Synchronous send
We send a message, the send() method returns a Future object, and we use get()
to wait on the future and see if the send() was successful or not.
Asynchronous send
We call the send() method with a callback function, which gets triggered when it
receives a response from the Kafka broker.
In the examples that follow, we will see how to send messages using these methods
and how to handle the different types of errors that might occur.
While all the examples in this chapter are single threaded, a producer object can be
used by multiple threads to send messages. You will probably want to start with one
producer and one thread. If you need better throughput, you can add more threads
that use the same producer. Once this ceases to increase throughput, you can add
more producers to the application to achieve even higher throughput.
Constructing a Kafka Producer | 45
Sending a Message to Kafka
The simplest way to send a message is as follows:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products",
"France");
try {
producer.send(record);
} catch (Exception e) {
e.printStackTrace();
}
The producer accepts ProducerRecord objects, so we start by creating one.
ProducerRecord has multiple constructors, which we will discuss later. Here we
use one that requires the name of the topic we are sending data to, which is
always a string, and the key and value we are sending to Kafka, which in this case
are also strings. The types of the key and value must match our serializer and
producer objects.
We use the producer object send() method to send the ProducerRecord. As
we’ve seen in the producer architecture diagram in Figure 3-1, the message will
be placed in a buffer and will be sent to the broker in a separate thread. The
send() method returns a Java Future object with RecordMetadata, but since we
simply ignore the returned value, we have no way of knowing whether the mes‐
sage was sent successfully or not. This method of sending messages can be used
when dropping a message silently is acceptable. This is not typically the case in
production applications.
While we ignore errors that may occur while sending messages to Kafka brokers
or in the brokers themselves, we may still get an exception if the producer
encountered errors before sending the message to Kafka. Those can be a
SerializationException when it fails to serialize the message, a BufferExhaus
tedException or TimeoutException if the buffer is full, or an InterruptException
if the sending thread was interrupted.
Sending a Message Synchronously
The simplest way to send a message synchronously is as follows:
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Precision Products", "France");
try {
producer.send(record).get();
} catch (Exception e) {
46 | Chapter 3: Kafka Producers: Writing Messages to Kafka
e.printStackTrace();
}
Here, we are using Future.get() to wait for a reply from Kafka. This method
will throw an exception if the record is not sent successfully to Kafka. If there
were no errors, we will get a RecordMetadata object that we can use to retrieve
the offset the message was written to.
If there were any errors before sending data to Kafka, while sending, if the Kafka
brokers returned a nonretriable exceptions or if we exhausted the available
retries, we will encounter an exception. In this case, we just print any exception
we ran into.
KafkaProducer has two types of errors. Retriable errors are those that can be resolved
by sending the message again. For example, a connection error can be resolved
because the connection may get reestablished. A “no leader” error can be resolved
when a new leader is elected for the partition. KafkaProducer can be configured to
retry those errors automatically, so the application code will get retriable exceptions
only when the number of retries was exhausted and the error was not resolved. Some
errors will not be resolved by retrying. For example, “message size too large.” In those
cases, KafkaProducer will not attempt a retry and will return the exception immedi‐
ately.
Sending a Message Asynchronously
Suppose the network roundtrip time between our application and the Kafka cluster is
10ms. If we wait for a reply after sending each message, sending 100 messages will
take around 1 second. On the other hand, if we just send all our messages and not
wait for any replies, then sending 100 messages will barely take any time at all. In
most cases, we really don’t need a reply—Kafka sends back the topic, partition, and
offset of the record after it was written, which is usually not required by the sending
app. On the other hand, we do need to know when we failed to send a message com‐
pletely so we can throw an exception, log an error, or perhaps write the message to an
errors” file for later analysis.
In order to send messages asynchronously and still handle error scenarios, the pro‐
ducer supports adding a callback when sending a record. Here is an example of how
we use a callback:
Sending a Message to Kafka | 47
private class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
ProducerRecord<String, String> record =
new ProducerRecord<>("CustomerCountry", "Biomedical Materials", "USA");
producer.send(record, new DemoProducerCallback());
To use callbacks, you need a class that implements the org.apache.kafka.
clients.producer.Callback interface, which has a single function—onComple
tion().
If Kafka returned an error, onCompletion() will have a nonnull exception. Here
we “handle” it by printing, but production code will probably have more robust
error handling functions.
The records are the same as before.
And we pass a Callback object along when sending the record.
Conguring Producers
So far we’ve seen very few configuration parameters for the producers—just the
mandatory bootstrap.servers URI and serializers.
The producer has a large number of configuration parameters; most are documented
in Apache Kafka documentation and many have reasonable defaults so there is no
reason to tinker with every single parameter. However, some of the parameters have a
significant impact on memory use, performance, and reliability of the producers. We
will review those here.
acks
The acks parameter controls how many partition replicas must receive the record
before the producer can consider the write successful. This option has a significant
impact on how likely messages are to be lost. There are three allowed values for the
acks parameter:
If acks=0, the producer will not wait for a reply from the broker before assuming
the message was sent successfully. This means that if something went wrong and
48 | Chapter 3: Kafka Producers: Writing Messages to Kafka
the broker did not receive the message, the producer will not know about it and
the message will be lost. However, because the producer is not waiting for any
response from the server, it can send messages as fast as the network will support,
so this setting can be used to achieve very high throughput.
If acks=1, the producer will receive a success response from the broker the
moment the leader replica received the message. If the message cant be written
to the leader (e.g., if the leader crashed and a new leader was not elected yet), the
producer will receive an error response and can retry sending the message,
avoiding potential loss of data. The message can still get lost if the leader crashes
and a replica without this message gets elected as the new leader (via unclean
leader election). In this case, throughput depends on whether we send messages
synchronously or asynchronously. If our client code waits for a reply from the
server (by calling the get() method of the Future object returned when sending
a message) it will obviously increase latency significantly (at least by a network
roundtrip). If the client uses callbacks, latency will be hidden, but throughput will
be limited by the number of in-flight messages (i.e., how many messages the pro‐
ducer will send before receiving replies from the server).
If acks=all, the producer will receive a success response from the broker once all
in-sync replicas received the message. This is the safest mode since you can make
sure more than one broker has the message and that the message will survive
even in the case of crash (more information on this in Chapter 5). However, the
latency we discussed in the acks=1 case will be even higher, since we will be wait‐
ing for more than just one broker to receive the message.
buer.memory
This sets the amount of memory the producer will use to buffer messages waiting to
be sent to brokers. If messages are sent by the application faster than they can be
delivered to the server, the producer may run out of space and additional send() calls
will either block or throw an exception, based on the block.on.buffer.full param‐
eter (replaced with max.block.ms in release 0.9.0.0, which allows blocking for a cer‐
tain time and then throwing an exception).
compression.type
By default, messages are sent uncompressed. This parameter can be set to snappy,
gzip, or lz4, in which case the corresponding compression algorithms will be used to
compress the data before sending it to the brokers. Snappy compression was invented
by Google to provide decent compression ratios with low CPU overhead and good
performance, so it is recommended in cases where both performance and bandwidth
are a concern. Gzip compression will typically use more CPU and time but result in
better compression ratios, so it recommended in cases where network bandwidth is
Conguring Producers | 49
more restricted. By enabling compression, you reduce network utilization and stor‐
age, which is often a bottleneck when sending messages to Kafka.
retries
When the producer receives an error message from the server, the error could be
transient (e.g., a lack of leader for a partition). In this case, the value of the retries
parameter will control how many times the producer will retry sending the message
before giving up and notifying the client of an issue. By default, the producer will wait
100ms between retries, but you can control this using the retry.backoff.ms parame‐
ter. We recommend testing how long it takes to recover from a crashed broker (i.e.,
how long until all partitions get new leaders) and setting the number of retries and
delay between them such that the total amount of time spent retrying will be longer
than the time it takes the Kafka cluster to recover from the crash—otherwise, the pro‐
ducer will give up too soon. Not all errors will be retried by the producer. Some errors
are not transient and will not cause retries (e.g., “message too large” error). In general,
because the producer handles retries for you, there is no point in handling retries
within your own application logic. You will want to focus your efforts on handling
nonretriable errors or cases where retry attempts were exhausted.
batch.size
When multiple records are sent to the same partition, the producer will batch them
together. This parameter controls the amount of memory in bytes (not messages!)
that will be used for each batch. When the batch is full, all the messages in the batch
will be sent. However, this does not mean that the producer will wait for the batch to
become full. The producer will send half-full batches and even batches with just a sin‐
gle message in them. Therefore, setting the batch size too large will not cause delays
in sending messages; it will just use more memory for the batches. Setting the batch
size too small will add some overhead because the producer will need to send mes‐
sages more frequently.
linger.ms
linger.ms controls the amount of time to wait for additional messages before send‐
ing the current batch. KafkaProducer sends a batch of messages either when the cur‐
rent batch is full or when the linger.ms limit is reached. By default, the producer will
send messages as soon as there is a sender thread available to send them, even if
theres just one message in the batch. By setting linger.ms higher than 0, we instruct
the producer to wait a few milliseconds to add additional messages to the batch
before sending it to the brokers. This increases latency but also increases throughput
(because we send more messages at once, there is less overhead per message).
50 | Chapter 3: Kafka Producers: Writing Messages to Kafka
client.id
This can be any string, and will be used by the brokers to identify messages sent from
the client. It is used in logging and metrics, and for quotas.
max.in.ight.requests.per.connection
This controls how many messages the producer will send to the server without
receiving responses. Setting this high can increase memory usage while improving
throughput, but setting it too high can reduce throughput as batching becomes less
efficient. Setting this to 1 will guarantee that messages will be written to the broker in
the order in which they were sent, even when retries occur.
timeout.ms, request.timeout.ms, and metadata.fetch.timeout.ms
These parameters control how long the producer will wait for a reply from the server
when sending data (request.timeout.ms) and when requesting metadata such as the
current leaders for the partitions we are writing to (metadata.fetch.timeout.ms). If
the timeout is reached without reply, the producer will either retry sending or
respond with an error (either through exception or the send callback). timeout.ms
controls the time the broker will wait for in-sync replicas to acknowledge the message
in order to meet the acks configuration—the broker will return an error if the time
elapses without the necessary acknowledgments.
max.block.ms
This parameter controls how long the producer will block when calling send() and
when explicitly requesting metadata via partitionsFor(). Those methods block
when the producer’s send buffer is full or when metadata is not available. When
max.block.ms is reached, a timeout exception is thrown.
max.request.size
This setting controls the size of a produce request sent by the producer. It caps both
the size of the largest message that can be sent and the number of messages that the
producer can send in one request. For example, with a default maximum request size
of 1 MB, the largest message you can send is 1 MB or the producer can batch 1,000
messages of size 1 K each into one request. In addition, the broker has its own limit
on the size of the largest message it will accept (message.max.bytes). It is usually a
good idea to have these configurations match, so the producer will not attempt to
send messages of a size that will be rejected by the broker.
receive.buer.bytes and send.buer.bytes
These are the sizes of the TCP send and receive buffers used by the sockets when
writing and reading data. If these are set to -1, the OS defaults will be used. It is a
Conguring Producers | 51
good idea to increase those when producers or consumers communicate with brokers
in a different datacenter because those network links typically have higher latency and
lower bandwidth.
Ordering Guarantees
Apache Kafka preserves the order of messages within a partition.
This means that if messages were sent from the producer in a spe‐
cific order, the broker will write them to a partition in that order
and all consumers will read them in that order. For some use cases,
order is very important. There is a big difference between deposit‐
ing $100 in an account and later withdrawing it, and the other way
around! However, some use cases are less sensitive.
Setting the retries parameter to nonzero and the
max.in.flights.requests.per.session to more than one means
that it is possible that the broker will fail to write the first batch of
messages, succeed to write the second (which was already in-
flight), and then retry the first batch and succeed, thereby reversing
the order.
Usually, setting the number of retries to zero is not an option in a
reliable system, so if guaranteeing order is critical, we recommend
setting in.flight.requests.per.session=1 to make sure that
while a batch of messages is retrying, additional messages will not
be sent (because this has the potential to reverse the correct order).
This will severely limit the throughput of the producer, so only use
this when order is important.
Serializers
As seen in previous examples, producer configuration includes mandatory serializers.
We’ve seen how to use the default String serializer. Kafka also includes serializers for
integers and ByteArrays, but this does not cover most use cases. Eventually, you will
want to be able to serialize more generic records.
We will start by showing how to write your own serializer and then introduce the
Avro serializer as a recommended alternative.
Custom Serializers
When the object you need to send to Kafka is not a simple string or integer, you have
a choice of either using a generic serialization library like Avro, Thrift, or Protobuf to
create records, or creating a custom serialization for objects you are already using. We
highly recommend using a generic serialization library. In order to understand how
52 | Chapter 3: Kafka Producers: Writing Messages to Kafka
the serializers work and why it is a good idea to use a serialization library, let’s see
what it takes to write your own custom serializer.
Suppose that instead of recording just the customer name, you create a simple class to
represent customers:
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
Now suppose we want to create a custom serializer for this class. It will look some‐
thing like this:
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
/**
We are serializing Customer as:
4 byte int representing customerId
4 byte int representing length of customerName in UTF-8 bytes (0 if name is
Null)
N bytes representing customerName in UTF-8
*/
public byte[] serialize(String topic, Customer data) {
try {
byte[] serializedName;
int stringSize;
if (data == null)
return null;
Serializers | 53
else {
if (data.getName() != null) {
serializeName = data.getName().getBytes("UTF-8");
stringSize = serializedName.length;
} else {
serializedName = new byte[0];
stringSize = 0;
}
}
ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
buffer.putInt(data.getID());
buffer.putInt(stringSize);
buffer.put(serializedName);
return buffer.array();
} catch (Exception e) {
throw new SerializationException("Error when serializing Customer to
byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}
Configuring a producer with this CustomerSerializer will allow you to define
ProducerRecord<String, Customer>, and send Customer data and pass Customer
objects directly to the producer. This example is pretty simple, but you can see how
fragile the code is. If we ever have too many customers, for example, and need to
change customerID to Long, or if we ever decide to add a startDate field to Cus
tomer, we will have a serious issue in maintaining compatibility between old and new
messages. Debugging compatibility issues between different versions of serializers
and deserializers is fairly challenging—you need to compare arrays of raw bytes. To
make matters even worse, if multiple teams in the same company end up writing Cus
tomer data to Kafka, they will all need to use the same serializers and modify the code
at the exact same time.
For these reasons, we recommend using existing serializers and deserializers such as
JSON, Apache Avro, Thrift, or Protobuf. In the following section we will describe
Apache Avro and then show how to serialize Avro records and send them to Kafka.
Serializing Using Apache Avro
Apache Avro is a language-neutral data serialization format. The project was created
by Doug Cutting to provide a way to share data files with a large audience.
54 | Chapter 3: Kafka Producers: Writing Messages to Kafka
Avro data is described in a language-independent schema. The schema is usually
described in JSON and the serialization is usually to binary files, although serializing
to JSON is also supported. Avro assumes that the schema is present when reading and
writing files, usually by embedding the schema in the files themselves.
One of the most interesting features of Avro, and what makes it a good fit for use in a
messaging system like Kafka, is that when the application that is writing messages
switches to a new schema, the applications reading the data can continue processing
messages without requiring any change or update.
Suppose the original schema was:
{"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string""},
{"name": "faxNumber", "type": ["null", "string"], "default": "null"}
]
}
id and name fields are mandatory, while fax number is optional and defaults to
null.
We used this schema for a few months and generated a few terabytes of data in this
format. Now suppose that we decide that in the new version, we will upgrade to the
twenty-first century and will no longer include a fax number field and will instead use
an email field.
The new schema would be:
{"namespace": "customerManagement.avro",
"type": "record",
"name": "Customer",
"fields": [
{"name": "id", "type": "int"},
{"name": "name", "type": "string"},
{"name": "email", "type": ["null", "string"], "default": "null"}
]
}
Now, after upgrading to the new version, old records will contain “faxNumber” and
new records will contain “email.” In many organizations, upgrades are done slowly
and over many months. So we need to consider how preupgrade applications that still
use the fax numbers and postupgrade applications that use email will be able to han‐
dle all the events in Kafka.
The reading application will contain calls to methods similar to getName(), getId(),
and getFaxNumber. If it encounters a message written with the new schema, get
Serializers | 55
Name() and getId() will continue working with no modification, but getFax
Number() will return null because the message will not contain a fax number.
Now suppose we upgrade our reading application and it no longer has the getFax
Number() method but rather getEmail(). If it encounters a message written with the
old schema, getEmail() will return null because the older messages do not contain
an email address.
This example illustrates the benefit of using Avro: even though we changed the
schema in the messages without changing all the applications reading the data, there
will be no exceptions or breaking errors and no need for expensive updates of exist‐
ing data.
However, there are two caveats to this scenario:
The schema used for writing the data and the schema expected by the reading
application must be compatible. The Avro documentation includes compatibility
rules.
The deserializer will need access to the schema that was used when writing the
data, even when it is different than the schema expected by the application that
accesses the data. In Avro files, the writing schema is included in the file itself,
but there is a better way to handle this for Kafka messages. We will look at that
next.
Using Avro Records with Kafka
Unlike Avro files, where storing the entire schema in the data file is associated with a
fairly reasonable overhead, storing the entire schema in each record will usually more
than double the record size. However, Avro still requires the entire schema to be
present when reading the record, so we need to locate the schema elsewhere. To ach‐
ieve this, we follow a common architecture pattern and use a Schema Registry. The
Schema Registry is not part of Apache Kafka but there are several open source
options to choose from. Well use the Confluent Schema Registry for this example.
You can find the Schema Registry code on GitHub, or you can install it as part of the
Confluent Platform. If you decide to use the Schema Registry, then we recommend
checking the documentation.
The idea is to store all the schemas used to write data to Kafka in the registry. Then
we simply store the identifier for the schema in the record we produce to Kafka. The
consumers can then use the identifier to pull the record out of the schema registry
and deserialize the data. The key is that all this work—storing the schema in the reg‐
istry and pulling it up when required—is done in the serializers and deserializers. The
code that produces data to Kafka simply uses the Avro serializer just like it would any
other serializer. Figure 3-2 demonstrates this process.
56 | Chapter 3: Kafka Producers: Writing Messages to Kafka
Figure 3-2. Flow diagram of serialization and deserialization of Avro records
Here is an example of how to produce generated Avro objects to Kafka (see the Avro
Documentation for how to use code generation with Avro):
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String,
Customer>(props);
// We keep producing new events until someone ctrl-c
while (true) {
Customer customer = CustomerGenerator.getNext();
System.out.println("Generated customer " +
customer.toString());
ProducerRecord<String, Customer> record =
new ProducerRecord<>(topic, customer.getId(), cus-
tomer);
producer.send(record);
}
We use the KafkaAvroSerializer to serialize our objects with Avro. Note that
the AvroSerializer can also handle primitives, which is why we can later use
String as the record key and our Customer object as the value.
schema.registry.url is a new parameter. This simply points to where we store
the schemas.
Serializers | 57
Customer is our generated object. We tell the producer that our records will con‐
tain Customer as the value.
We also instantiate ProducerRecord with Customer as the value type, and pass a
Customer object when creating the new record.
Thats it. We send the record with our Customer object and KafkaAvroSerial
izer will handle the rest.
What if you prefer to use generic Avro objects rather than the generated Avro objects?
No worries. In this case, you just need to provide the schema:
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url);
String schemaString = "{\"namespace\": \"customerManagement.avro\",
\"type\": \"record\", " +
"\"name\": \"Customer\"," +
"\"fields\": [" +
"{\"name\": \"id\", \"type\": \"int\"}," +
"{\"name\": \"name\", \"type\": \"string\"}," +
"{\"name\": \"email\", \"type\": [\"null\",\"string
\"], \"default\":\"null\" }" +
"]}";
Producer<String, GenericRecord> producer =
new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
String name = "exampleCustomer" + nCustomers;
String email = "example " + nCustomers + "@example.com"
GenericRecord customer = new GenericData.Record(schema);
customer.put("id", nCustomer);
customer.put("name", name);
customer.put("email", email);
ProducerRecord<String, GenericRecord> data =
new ProducerRecord<String,
GenericRecord>("customerContacts",
name, customer);
producer.send(data);
58 | Chapter 3: Kafka Producers: Writing Messages to Kafka
}
}
We still use the same KafkaAvroSerializer.
And we provide the URI of the same schema registry.
But now we also need to provide the Avro schema, since it is not provided by the
Avro-generated object.
Our object type is an Avro GenericRecord, which we initialize with our schema
and the data we want to write.
Then the value of the ProducerRecord is simply a GenericRecord that countains
our schema and data. The serializer will know how to get the schema from this
record, store it in the schema registry, and serialize the object data.
Partitions
In previous examples, the ProducerRecord objects we created included a topic name,
key, and value. Kafka messages are key-value pairs and while it is possible to create a
ProducerRecord with just a topic and a value, with the key set to null by default,
most applications produce records with keys. Keys serve two goals: they are addi‐
tional information that gets stored with the message, and they are also used to decide
which one of the topic partitions the message will be written to. All messages with the
same key will go to the same partition. This means that if a process is reading only a
subset of the partitions in a topic (more on that in Chapter 4), all the records for a
single key will be read by the same process. To create a key-value record, you simply
create a ProducerRecord as follows:
ProducerRecord<Integer, String> record =
new ProducerRecord<>("CustomerCountry", "Laboratory Equipment", "USA");
When creating messages with a null key, you can simply leave the key out:
ProducerRecord<Integer, String> record =
new ProducerRecord<>("CustomerCountry", "USA");
Here, the key will simply be set to null, which may indicate that a customer
name was missing on a form.
When the key is null and the default partitioner is used, the record will be sent to
one of the available partitions of the topic at random. A round-robin algorithm will
be used to balance the messages among the partitions.
Partitions | 59
If a key exists and the default partitioner is used, Kafka will hash the key (using its
own hash algorithm, so hash values will not change when Java is upgraded), and use
the result to map the message to a specific partition. Since it is important that a key is
always mapped to the same partition, we use all the partitions in the topic to calculate
the mapping—not just the available partitions. This means that if a specific partition
is unavailable when you write data to it, you might get an error. This is fairly rare, as
you will see in Chapter 6 when we discuss Kafkas replication and availability.
The mapping of keys to partitions is consistent only as long as the number of parti‐
tions in a topic does not change. So as long as the number of partitions is constant,
you can be sure that, for example, records regarding user 045189 will always get writ‐
ten to partition 34. This allows all kinds of optimization when reading data from par‐
titions. However, the moment you add new partitions to the topic, this is no longer
guaranteed—the old records will stay in partition 34 while new records will get writ‐
ten to a different partition. When partitioning keys is important, the easiest solution
is to create topics with sufficient partitions (Chapter 2 includes suggestions for how
to determine a good number of partitions) and never add partitions.
Implementing a custom partitioning strategy
So far, we have discussed the traits of the default partitioner, which is the one most
commonly used. However, Kafka does not limit you to just hash partitions, and
sometimes there are good reasons to partition data differently. For example, suppose
that you are a B2B vendor and your biggest customer is a company that manufactures
handheld devices called Bananas. Suppose that you do so much business with cus‐
tomer “Banana” that over 10% of your daily transactions are with this customer. If
you use default hash partitioning, the Banana records will get allocated to the same
partition as other accounts, resulting in one partition being about twice as large as the
rest. This can cause servers to run out of space, processing to slow down, etc. What
we really want is to give Banana its own partition and then use hash partitioning to
map the rest of the accounts to partitions.
60 | Chapter 3: Kafka Producers: Writing Messages to Kafka
Here is an example of a custom partitioner:
import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;
import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.utils.Utils;
public class BananaPartitioner implements Partitioner {
public void configure(Map<String, ?> configs) {}
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes,
Cluster cluster) {
List<PartitionInfo> partitions =
cluster.partitionsForTopic(topic);
int numPartitions = partitions.size();
if ((keyBytes == null) || (!(key instanceOf String)))
throw new InvalidRecordException("We expect all messages
to have customer name as key")
if (((String) key).equals("Banana"))
return numPartitions; // Banana will always go to last
partition
// Other records will get hashed to the rest of the
partitions
return (Math.abs(Utils.murmur2(keyBytes)) % (numPartitions - 1))
}
public void close() {}
}
Partitioner interface includes configure, partition, and close methods. Here
we only implement partition, although we really should have passed the special
customer name through configure instead of hard-coding it in partition.
We only expect String keys, so we throw an exception if that is not the case.
Old Producer APIs
In this chapter weve discussed the Java producer client that is part of the
org.apache.kafka.clients package. However, Apache Kafka still has two older cli‐
ents written in Scala that are part of the kafka.producer package and the core Kafka
module. These producers are called SyncProducers (which, depending on the value
of the acks parameter, may wait for the server to ack each message or batch of mes‐
sages before sending additional messages) and AsyncProducer (which batches mes‐
Old Producer APIs | 61
sages in the background, sends them in a separate thread, and does not provide
feedback regarding success to the client).
Because the current producer supports both behaviors and provides much more reli‐
ability and control to the developer, we will not discuss the older APIs. If you are
interested in using them, think twice and then refer to Apache Kafka documentation
to learn more.
Summary
We began this chapter with a simple example of a producer—just 10 lines of code that
send events to Kafka. We added to the simple example by adding error handling and
experimenting with synchronous and asynchronous producing. We then explored the
most important producer configuration parameters and saw how they modify the
behavior of the producers. We discussed serializers, which let us control the format of
the events we write to Kafka. We looked in-depth at Avro, one of many ways to serial‐
ize events, but one that is very commonly used with Kafka. We concluded the chapter
with a discussion of partitioning in Kafka and an example of an advanced custom
partitioning technique.
Now that we know how to write events to Kafka, in Chapter 4 well learn all about
consuming events from Kafka.
62 | Chapter 3: Kafka Producers: Writing Messages to Kafka
CHAPTER 4
Kafka Consumers: Reading Data from Kafka
Applications that need to read data from Kafka use a KafkaConsumer to subscribe to
Kafka topics and receive messages from these topics. Reading data from Kafka is a bit
different than reading data from other messaging systems, and there are few unique
concepts and ideas involved. It is difficult to understand how to use the consumer
API without understanding these concepts first. Well start by explaining some of the
important concepts, and then well go through some examples that show the different
ways consumer APIs can be used to implement applications with varying require‐
ments.
Kafka Consumer Concepts
In order to understand how to read data from Kafka, you first need to understand its
consumers and consumer groups. The following sections cover those concepts.
Consumers and Consumer Groups
Suppose you have an application that needs to read messages from a Kafka topic, run
some validations against them, and write the results to another data store. In this case
your application will create a consumer object, subscribe to the appropriate topic, and
start receiving messages, validating them and writing the results. This may work well
for a while, but what if the rate at which producers write messages to the topic
exceeds the rate at which your application can validate them? If you are limited to a
single consumer reading and processing the data, your application may fall farther
and farther behind, unable to keep up with the rate of incoming messages. Obviously
there is a need to scale consumption from topics. Just like multiple producers can
write to the same topic, we need to allow multiple consumers to read from the same
topic, splitting the data between them.
63
Kafka consumers are typically part of a consumer group. When multiple consumers
are subscribed to a topic and belong to the same consumer group, each consumer in
the group will receive messages from a different subset of the partitions in the topic.
Lets take topic T1 with four partitions. Now suppose we created a new consumer, C1,
which is the only consumer in group G1, and use it to subscribe to topic T1. Con‐
sumer C1 will get all messages from all four t1 partitions. See Figure 4-1.
Figure 4-1. One Consumer group with four partitions
If we add another consumer, C2, to group G1, each consumer will only get messages
from two partitions. Perhaps messages from partition 0 and 2 go to C1 and messages
from partitions 1 and 3 go to consumer C2. See Figure 4-2.
Figure 4-2. Four partitions split to two consumer groups
If G1 has four consumers, then each will read messages from a single partition. See
Figure 4-3.
64 | Chapter 4: Kafka Consumers: Reading Data from Kafka
Figure 4-3. Four consumer groups to one partition each
If we add more consumers to a single group with a single topic than we have parti‐
tions, some of the consumers will be idle and get no messages at all. See Figure 4-4.
Figure 4-4. More consumer groups than partitions means missed messages
The main way we scale data consumption from a Kafka topic is by adding more con‐
sumers to a consumer group. It is common for Kafka consumers to do high-latency
operations such as write to a database or a time-consuming computation on the data.
In these cases, a single consumer can’t possibly keep up with the rate data flows into a
topic, and adding more consumers that share the load by having each consumer own
just a subset of the partitions and messages is our main method of scaling. This is a
good reason to create topics with a large number of partitions—it allows adding more
consumers when the load increases. Keep in mind that there is no point in adding
more consumers than you have partitions in a topic—some of the consumers will just
be idle. Chapter 2 includes some suggestions on how to choose the number of parti‐
tions in a topic.
Kafka Consumer Concepts | 65
In addition to adding consumers in order to scale a single application, it is very com‐
mon to have multiple applications that need to read data from the same topic. In fact,
one of the main design goals in Kafka was to make the data produced to Kafka topics
available for many use cases throughout the organization. In those cases, we want
each application to get all of the messages, rather than just a subset. To make sure an
application gets all the messages in a topic, ensure the application has its own con‐
sumer group. Unlike many traditional messaging systems, Kafka scales to a large
number of consumers and consumer groups without reducing performance.
In the previous example, if we add a new consumer group G2 with a single consumer,
this consumer will get all the messages in topic T1 independent of what G1 is doing.
G2 can have more than a single consumer, in which case they will each get a subset of
partitions, just like we showed for G1, but G2 as a whole will still get all the messages
regardless of other consumer groups. See Figure 4-5.
Figure 4-5. Adding a new consumer group ensures no messages are missed
To summarize, you create a new consumer group for each application that needs all
the messages from one or more topics. You add consumers to an existing consumer
group to scale the reading and processing of messages from the topics, so each addi‐
tional consumer in a group will only get a subset of the messages.
Consumer Groups and Partition Rebalance
As we saw in the previous section, consumers in a consumer group share ownership
of the partitions in the topics they subscribe to. When we add a new consumer to the
group, it starts consuming messages from partitions previously consumed by another
66 | Chapter 4: Kafka Consumers: Reading Data from Kafka
consumer. The same thing happens when a consumer shuts down or crashes; it leaves
the group, and the partitions it used to consume will be consumed by one of the
remaining consumers. Reassignment of partitions to consumers also happen when
the topics the consumer group is consuming are modified (e.g., if an administrator
adds new partitions).
Moving partition ownership from one consumer to another is called a rebalance.
Rebalances are important because they provide the consumer group with high availa‐
bility and scalability (allowing us to easily and safely add and remove consumers), but
in the normal course of events they are fairly undesirable. During a rebalance, con‐
sumers can’t consume messages, so a rebalance is basically a short window of unavail‐
ability of the entire consumer group. In addition, when partitions are moved from
one consumer to another, the consumer loses its current state; if it was caching any
data, it will need to refresh its caches—slowing down the application until the con‐
sumer sets up its state again. Throughout this chapter we will discuss how to safely
handle rebalances and how to avoid unnecessary ones.
The way consumers maintain membership in a consumer group and ownership of
the partitions assigned to them is by sending heartbeats to a Kafka broker designated
as the group coordinator (this broker can be different for different consumer groups).
As long as the consumer is sending heartbeats at regular intervals, it is assumed to be
alive, well, and processing messages from its partitions. Heartbeats are sent when the
consumer polls (i.e., retrieves records) and when it commits records it has consumed.
If the consumer stops sending heartbeats for long enough, its session will time out
and the group coordinator will consider it dead and trigger a rebalance. If a consumer
crashed and stopped processing messages, it will take the group coordinator a few
seconds without heartbeats to decide it is dead and trigger the rebalance. During
those seconds, no messages will be processed from the partitions owned by the dead
consumer. When closing a consumer cleanly, the consumer will notify the group
coordinator that it is leaving, and the group coordinator will trigger a rebalance
immediately, reducing the gap in processing. Later in this chapter we will discuss con‐
figuration options that control heartbeat frequency and session timeouts and how to
set those to match your requirements.
Changes to Heartbeat Behavior in Recent Kafka Versions
In release 0.10.1, the Kafka community introduced a separate heartbeat thread that
will send heartbeats in between polls as well. This allows you to separate the heartbeat
frequency (and therefore how long it takes for the consumer group to detect that a
consumer crashed and is no longer sending heartbeats) from the frequency of polling
(which is determined by the time it takes to process the data returned from the brok‐
ers). With newer versions of Kafka, you can configure how long the application can
go without polling before it will leave the group and trigger a rebalance. This configu‐
Kafka Consumer Concepts | 67
ration is used to prevent a livelock, where the application did not crash but fails to
make progress for some reason. This configuration is separate from session.time
out.ms, which controls the time it takes to detect a consumer crash and stop sending
heartbeats.
The rest of the chapter will discuss some of the challenges with older behaviors and
how the programmer can handle them. This chapter includes discussion about how
to handle applications that take longer to process records. This is less relevant to
readers running Apache Kafka 0.10.1 or later. If you are using a new version and need
to handle records that take longer to process, you simply need to tune
max.poll.interval.ms so it will handle longer delays between polling for new
records.
How Does the Process of Assigning Partitions to Brokers Work?
When a consumer wants to join a group, it sends a JoinGroup
request to the group coordinator. The first consumer to join the
group becomes the group leader. The leader receives a list of all
consumers in the group from the group coordinator (this will
include all consumers that sent a heartbeat recently and which are
therefore considered alive) and is responsible for assigning a subset
of partitions to each consumer. It uses an implementation of Parti
tionAssignor to decide which partitions should be handled by
which consumer.
Kafka has two built-in partition assignment policies, which we will
discuss in more depth in the configuration section. After deciding
on the partition assignment, the consumer leader sends the list of
assignments to the GroupCoordinator, which sends this informa‐
tion to all the consumers. Each consumer only sees his own assign‐
ment—the leader is the only client process that has the full list of
consumers in the group and their assignments. This process
repeats every time a rebalance happens.
Creating a Kafka Consumer
The first step to start consuming records is to create a KafkaConsumer instance. Cre‐
ating a KafkaConsumer is very similar to creating a KafkaProducer—you create a Java
Properties instance with the properties you want to pass to the consumer. We will
discuss all the properties in depth later in the chapter. To start we just need to use the
three mandatory properties: bootstrap.servers, key.deserializer, and
value.deserializer.
The first property, bootstrap.servers, is the connection string to a Kafka cluster. It
is used the exact same way as in KafkaProducer (you can refer to Chapter 3 for
68 | Chapter 4: Kafka Consumers: Reading Data from Kafka
details on how this is defined). The other two properties, key.deserializer and
value.deserializer, are similar to the serializers defined for the producer, but
rather than specifying classes that turn Java objects to byte arrays, you need to specify
classes that can take a byte array and turn it into a Java object.
There is a fourth property, which is not strictly mandatory, but for now we will pre‐
tend it is. The property is group.id and it specifies the consumer group the
KafkaConsumer instance belongs to. While it is possible to create consumers that do
not belong to any consumer group, this is uncommon, so for most of the chapter we
will assume the consumer is part of a group.
The following code snippet shows how to create a KafkaConsumer:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String,
String>(props);
Most of what you see here should be familiar if youve read Chapter 3 on creating
producers. We assume that the records we consume will have String objects as both
the key and the value of the record. The only new property here is group.id, which is
the name of the consumer group this consumer belong to.
Subscribing to Topics
Once we create a consumer, the next step is to subscribe to one or more topics. The
subcribe() method takes a list of topics as a parameter, so it’s pretty simple to use:
consumer.subscribe(Collections.singletonList("customerCountries"));
Here we simply create a list with a single element: the topic name
customerCountries.
It is also possible to call subscribe with a regular expression. The expression can
match multiple topic names, and if someone creates a new topic with a name that
matches, a rebalance will happen almost immediately and the consumers will start
consuming from the new topic. This is useful for applications that need to consume
from multiple topics and can handle the different types of data the topics will contain.
Subscribing to multiple topics using a regular expression is most commonly used in
applications that replicate data between Kafka and another system.
Subscribing to Topics | 69
To subscribe to all test topics, we can call:
consumer.subscribe("test.*");
The Poll Loop
At the heart of the consumer API is a simple loop for polling the server for more data.
Once the consumer subscribes to topics, the poll loop handles all details of coordina‐
tion, partition rebalances, heartbeats, and data fetching, leaving the developer with a
clean API that simply returns available data from the assigned partitions. The main
body of a consumer will look as follows:
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
log.debug("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
int updatedCount = 1;
if (custCountryMap.countainsValue(record.value())) {
updatedCount = custCountryMap.get(record.value()) + 1;
}
custCountryMap.put(record.value(), updatedCount)
JSONObject json = new JSONObject(custCountryMap);
System.out.println(json.toString(4))
}
}
} finally {
consumer.close();
}
This is indeed an infinite loop. Consumers are usually long-running applications
that continuously poll Kafka for more data. We will show later in the chapter how
to cleanly exit the loop and close the consumer.
This is the most important line in the chapter. The same way that sharks must
keep moving or they die, consumers must keep polling Kafka or they will be con‐
sidered dead and the partitions they are consuming will be handed to another
consumer in the group to continue consuming. The parameter we pass, poll(),
is a timeout interval and controls how long poll() will block if data is not avail‐
able in the consumer buffer. If this is set to 0, poll() will return immediately;
otherwise, it will wait for the specified number of milliseconds for data to arrive
from the broker.
70 | Chapter 4: Kafka Consumers: Reading Data from Kafka
poll() returns a list of records. Each record contains the topic and partition the
record came from, the offset of the record within the partition, and of course the
key and the value of the record. Typically we want to iterate over the list and pro‐
cess the records individually. The poll() method takes a timeout parameter. This
specifies how long it will take poll to return, with or without data. The value is
typically driven by application needs for quick responses—how fast do you want
to return control to the thread that does the polling?
Processing usually ends in writing a result in a data store or updating a stored
record. Here, the goal is to keep a running count of customers from each county,
so we update a hashtable and print the result as JSON. A more realistic example
would store the updates result in a data store.
Always close() the consumer before exiting. This will close the network connec‐
tions and sockets. It will also trigger a rebalance immediately rather than wait for
the group coordinator to discover that the consumer stopped sending heartbeats
and is likely dead, which will take longer and therefore result in a longer period
of time in which consumers can’t consume messages from a subset of the parti‐
tions.
The poll loop does a lot more than just get data. The first time you call poll() with a
new consumer, it is responsible for finding the GroupCoordinator, joining the con‐
sumer group, and receiving a partition assignment. If a rebalance is triggered, it will
be handled inside the poll loop as well. And of course the heartbeats that keep con‐
sumers alive are sent from within the poll loop. For this reason, we try to make sure
that whatever processing we do between iterations is fast and efficient.
Thread Safety
You can’t have multiple consumers that belong to the same group
in one thread and you cant have multiple threads safely use the
same consumer. One consumer per thread is the rule. To run mul‐
tiple consumers in the same group in one application, you will
need to run each in its own thread. It is useful to wrap the con‐
sumer logic in its own object and then use Javas ExecutorService
to start multiple threads each with its own consumer. The Conflu‐
ent blog has a tutorial that shows how to do just that.
The Poll Loop | 71
Conguring Consumers
So far we have focused on learning the consumer API, but weve only looked at a few
of the configuration properties—just the mandatory bootstrap.servers, group.id,
key.deserializer, and value.deserializer. All the consumer configuration is
documented in Apache Kafka documentation. Most of the parameters have reason‐
able defaults and do not require modification, but some have implications on the per‐
formance and availability of the consumers. Lets take a look at some of the more
important properties.
fetch.min.bytes
This property allows a consumer to specify the minimum amount of data that it
wants to receive from the broker when fetching records. If a broker receives a request
for records from a consumer but the new records amount to fewer bytes than
min.fetch.bytes, the broker will wait until more messages are available before send‐
ing the records back to the consumer. This reduces the load on both the consumer
and the broker as they have to handle fewer back-and-forth messages in cases where
the topics don’t have much new activity (or for lower activity hours of the day). You
will want to set this parameter higher than the default if the consumer is using too
much CPU when there isnt much data available, or reduce load on the brokers when
you have large number of consumers.
fetch.max.wait.ms
By setting fetch.min.bytes, you tell Kafka to wait until it has enough data to send
before responding to the consumer. fetch.max.wait.ms lets you control how long to
wait. By default, Kafka will wait up to 500 ms. This results in up to 500 ms of extra
latency in case there is not enough data flowing to the Kafka topic to satisfy the mini‐
mum amount of data to return. If you want to limit the potential latency (usually due
to SLAs controlling the maximum latency of the application), you can set
fetch.max.wait.ms to a lower value. If you set fetch.max.wait.ms to 100 ms and
fetch.min.bytes to 1 MB, Kafka will recieve a fetch request from the consumer and
will respond with data either when it has 1 MB of data to return or after 100 ms,
whichever happens first.
max.partition.fetch.bytes
This property controls the maximum number of bytes the server will return per parti‐
tion. The default is 1 MB, which means that when KafkaConsumer.poll() returns
ConsumerRecords, the record object will use at most max.partition.fetch.bytes
per partition assigned to the consumer. So if a topic has 20 partitions, and you have 5
consumers, each consumer will need to have 4 MB of memory available for Consumer
72 | Chapter 4: Kafka Consumers: Reading Data from Kafka
Records. In practice, you will want to allocate more memory as each consumer will
need to handle more partitions if other consumers in the group fail. max.
partition.fetch.bytes must be larger than the largest message a broker will accept
(determined by the max.message.size property in the broker configuration), or the
broker may have messages that the consumer will be unable to consume, in which
case the consumer will hang trying to read them. Another important consideration
when setting max.partition.fetch.bytes is the amount of time it takes the con‐
sumer to process data. As you recall, the consumer must call poll() frequently
enough to avoid session timeout and subsequent rebalance. If the amount of data a
single poll() returns is very large, it may take the consumer longer to process, which
means it will not get to the next iteration of the poll loop in time to avoid a session
timeout. If this occurs, the two options are either to lower max.
partition.fetch.bytes or to increase the session timeout.
session.timeout.ms
The amount of time a consumer can be out of contact with the brokers while still
considered alive defaults to 3 seconds. If more than session.timeout.ms passes
without the consumer sending a heartbeat to the group coordinator, it is considered
dead and the group coordinator will trigger a rebalance of the consumer group to
allocate partitions from the dead consumer to the other consumers in the group. This
property is closely related to heartbeat.interval.ms. heartbeat.interval.ms con‐
trols how frequently the KafkaConsumer poll() method will send a heartbeat to the
group coordinator, whereas session.timeout.ms controls how long a consumer can
go without sending a heartbeat. Therefore, those two properties are typically modi‐
fied togetherheatbeat.interval.ms must be lower than session.timeout.ms, and
is usually set to one-third of the timeout value. So if session.timeout.ms is 3 sec‐
onds, heartbeat.interval.ms should be 1 second. Setting session.timeout.ms
lower than the default will allow consumer groups to detect and recover from failure
sooner, but may also cause unwanted rebalances as a result of consumers taking
longer to complete the poll loop or garbage collection. Setting session.timeout.ms
higher will reduce the chance of accidental rebalance, but also means it will take
longer to detect a real failure.
auto.oset.reset
This property controls the behavior of the consumer when it starts reading a partition
for which it doesn’t have a committed offset or if the committed offset it has is invalid
(usually because the consumer was down for so long that the record with that offset
was already aged out of the broker). The default is “latest,” which means that lacking a
valid offset, the consumer will start reading from the newest records (records that
were written after the consumer started running). The alternative is “earliest,” which
Conguring Consumers | 73
means that lacking a valid offset, the consumer will read all the data in the partition,
starting from the very beginning.
enable.auto.commit
We discussed the different options for committing offsets earlier in this chapter. This
parameter controls whether the consumer will commit offsets automatically, and
defaults to true. Set it to false if you prefer to control when offsets are committed,
which is necessary to minimize duplicates and avoid missing data. If you set
enable.auto.commit to true, then you might also want to control how frequently
offsets will be committed using auto.commit.interval.ms.
partition.assignment.strategy
We learned that partitions are assigned to consumers in a consumer group. A
PartitionAssignor is a class that, given consumers and topics they subscribed to,
decides which partitions will be assigned to which consumer. By default, Kafka has
two assignment strategies:
Range
Assigns to each consumer a consecutive subset of partitions from each topic it
subscribes to. So if consumers C1 and C2 are subscribed to two topics, T1 and
T2, and each of the topics has three partitions, then C1 will be assigned partitions
0 and 1 from topics T1 and T2, while C2 will be assigned partition 2 from those
topics. Because each topic has an uneven number of partitions and the assign‐
ment is done for each topic independently, the first consumer ends up with more
partitions than the second. This happens whenever Range assignment is used and
the number of consumers does not divide the number of partitions in each topic
neatly.
RoundRobin
Takes all the partitions from all subscribed topics and assigns them to consumers
sequentially, one by one. If C1 and C2 described previously used RoundRobin
assignment, C1 would have partitions 0 and 2 from topic T1 and partition 1 from
topic T2. C2 would have partition 1 from topic T1 and partitions 0 and 2 from
topic T2. In general, if all consumers are subscribed to the same topics (a very
common scenario), RoundRobin assignment will end up with all consumers hav‐
ing the same number of partitions (or at most 1 partition difference).
The partition.assignment.strategy allows you to choose a partition-assignment
strategy. The default is org.apache.kafka.clients.consumer.RangeAssignor,
which implements the Range strategy described above. You can replace it with
org.apache.kafka.clients.consumer.RoundRobinAssignor. A more advanced
option is to implement your own assignment strategy, in which case
partition.assignment.strategy should point to the name of your class.
74 | Chapter 4: Kafka Consumers: Reading Data from Kafka
client.id
This can be any string, and will be used by the brokers to identify messages sent from
the client. It is used in logging and metrics, and for quotas.
max.poll.records
This controls the maximum number of records that a single call to poll() will return.
This is useful to help control the amount of data your application will need to process
in the polling loop.
receive.buer.bytes and send.buer.bytes
These are the sizes of the TCP send and receive buffers used by the sockets when
writing and reading data. If these are set to -1, the OS defaults will be used. It can be a
good idea to increase those when producers or consumers communicate with brokers
in a different datacenter, because those network links typically have higher latency
and lower bandwidth.
Commits and Osets
Whenever we call poll(), it returns records written to Kafka that consumers in our
group have not read yet. This means that we have a way of tracking which records
were read by a consumer of the group. As discussed before, one of Kafkas unique
characteristics is that it does not track acknowledgments from consumers the way
many JMS queues do. Instead, it allows consumers to use Kafka to track their posi‐
tion (offset) in each partition.
We call the action of updating the current position in the partition a commit.
How does a consumer commit an offset? It produces a message to Kafka, to a special
__consumer_offsets topic, with the committed offset for each partition. As long as all
your consumers are up, running, and churning away, this will have no impact. How‐
ever, if a consumer crashes or a new consumer joins the consumer group, this will
trigger a rebalance. After a rebalance, each consumer may be assigned a new set of
partitions than the one it processed before. In order to know where to pick up the
work, the consumer will read the latest committed offset of each partition and con‐
tinue from there.
If the committed offset is smaller than the offset of the last message the client pro‐
cessed, the messages between the last processed offset and the committed offset will
be processed twice. See Figure 4-6.
Commits and Osets | 75
Figure 4-6. Re-processed messages
If the committed offset is larger than the offset of the last message the client actually
processed, all messages between the last processed offset and the committed offset
will be missed by the consumer group. See Figure 4-7.
Figure 4-7. Missed messages between osets
Clearly, managing offsets has a big impact on the client application. The
KafkaConsumer API provides multiple ways of committing offsets:
Automatic Commit
The easiest way to commit offsets is to allow the consumer to do it for you. If you
configure enable.auto.commit=true, then every five seconds the consumer will
commit the largest offset your client received from poll(). The five-second interval
is the default and is controlled by setting auto.commit.interval.ms. Just like every‐
thing else in the consumer, the automatic commits are driven by the poll loop. When‐
ever you poll, the consumer checks if it is time to commit, and if it is, it will commit
the offsets it returned in the last poll.
Before using this convenient option, however, it is important to understand the con‐
sequences.
76 | Chapter 4: Kafka Consumers: Reading Data from Kafka
Consider that, by default, automatic commits occur every five seconds. Suppose that
we are three seconds after the most recent commit and a rebalance is triggered. After
the rebalancing, all consumers will start consuming from the last offset committed. In
this case, the offset is three seconds old, so all the events that arrived in those three
seconds will be processed twice. It is possible to configure the commit interval to
commit more frequently and reduce the window in which records will be duplicated,
but it is impossible to completely eliminate them.
With autocommit enabled, a call to poll will always commit the last offset returned by
the previous poll. It doesnt know which events were actually processed, so it is critical
to always process all the events returned by poll() before calling poll() again. (Just
like poll(), close() also commits offsets automatically.) This is usually not an issue,
but pay attention when you handle exceptions or exit the poll loop prematurely.
Automatic commits are convenient, but they don’t give developers enough control to
avoid duplicate messages.
Commit Current Oset
Most developers exercise more control over the time at which offsets are committed
—both to eliminate the possibility of missing messages and to reduce the number of
messages duplicated during rebalancing. The consumer API has the option of com‐
mitting the current offset at a point that makes sense to the application developer
rather than based on a timer.
By setting auto.commit.offset=false, offsets will only be committed when the
application explicitly chooses to do so. The simplest and most reliable of the commit
APIs is commitSync(). This API will commit the latest offset returned by poll() and
return once the offset is committed, throwing an exception if commit fails for some
reason.
It is important to remember that commitSync() will commit the latest offset returned
by poll(), so make sure you call commitSync() after you are done processing all the
records in the collection, or you risk missing messages as described previously. When
rebalance is triggered, all the messages from the beginning of the most recent batch
until the time of the rebalance will be processed twice.
Here is how we would use commitSync to commit offsets after we finished processing
the latest batch of messages:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset =
%d, customer = %s, country = %s\n",
record.topic(), record.partition(),
Commits and Osets | 77
record.offset(), record.key(), record.value());
}
try {
consumer.commitSync();
} catch (CommitFailedException e) {
log.error("commit failed", e)
}
}
Lets assume that by printing the contents of a record, we are done processing it.
Your application will likely do a lot more with the records—modify them, enrich
them, aggregate them, display them on a dashboard, or notify users of important
events. You should determine when you are “done” with a record according to
your use case.
Once we are done “processing” all the records in the current batch, we call com
mitSync to commit the last offset in the batch, before polling for additional mes‐
sages.
commitSync retries committing as long as there is no error that cant be recov‐
ered. If this happens, there is not much we can do except log an error.
Asynchronous Commit
One drawback of manual commit is that the application is blocked until the broker
responds to the commit request. This will limit the throughput of the application.
Throughput can be improved by committing less frequently, but then we are increas‐
ing the number of potential duplicates that a rebalance will create.
Another option is the asynchronous commit API. Instead of waiting for the broker to
respond to a commit, we just send the request and continue on:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync();
}
Commit the last offset and carry on.
The drawback is that while commitSync() will retry the commit until it either suc‐
ceeds or encounters a nonretriable failure, commitAsync() will not retry. The reason
78 | Chapter 4: Kafka Consumers: Reading Data from Kafka
it does not retry is that by the time commitAsync() receives a response from the
server, there may have been a later commit that was already successful. Imagine that
we sent a request to commit offset 2000. There is a temporary communication prob‐
lem, so the broker never gets the request and therefore never responds. Meanwhile,
we processed another batch and successfully committed offset 3000. If commitA
sync() now retries the previously failed commit, it might succeed in committing off‐
set 2000 aer offset 3000 was already processed and committed. In the case of a
rebalance, this will cause more duplicates.
We mention this complication and the importance of correct order of commits,
because commitAsync() also gives you an option to pass in a callback that will be trig‐
gered when the broker responds. It is common to use the callback to log commit
errors or to count them in a metric, but if you want to use the callback for retries, you
need to be aware of the problem with commit order:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s,
offset = %d, customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitAsync(new OffsetCommitCallback() {
public void onComplete(Map<TopicPartition,
OffsetAndMetadata> offsets, Exception exception) {
if (e != null)
log.error("Commit failed for offsets {}", offsets, e);
}
});
}
We send the commit and carry on, but if the commit fails, the failure and the off‐
sets will be logged.
Retrying Async Commits
A simple pattern to get commit order right for asynchronous
retries is to use a monotonically increasing sequence number.
Increase the sequence number every time you commit and add the
sequence number at the time of the commit to the commitAsync
callback. When you’re getting ready to send a retry, check if the
commit sequence number the callback got is equal to the instance
variable; if it is, there was no newer commit and it is safe to retry. If
the instance sequence number is higher, dont retry because a
newer commit was already sent.
Commits and Osets | 79
Combining Synchronous and Asynchronous Commits
Normally, occasional failures to commit without retrying are not a huge problem
because if the problem is temporary, the following commit will be successful. But if
we know that this is the last commit before we close the consumer, or before a reba‐
lance, we want to make extra sure that the commit succeeds.
Therefore, a common pattern is to combine commitAsync() with commitSync() just
before shutdown. Here is how it works (we will discuss how to commit just before
rebalance when we get to the section about rebalance listeners):
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync();
} finally {
consumer.close();
}
}
While everything is fine, we use commitAsync. It is faster, and if one commit fails,
the next commit will serve as a retry.
But if we are closing, there is no “next commit.” We call commitSync(), because it
will retry until it succeeds or suffers unrecoverable failure.
Commit Specied Oset
Committing the latest offset only allows you to commit as often as you finish process‐
ing batches. But what if you want to commit more frequently than that? What if
poll() returns a huge batch and you want to commit offsets in the middle of the
batch to avoid having to process all those rows again if a rebalance occurs? You can’t
just call commitSync() or commitAsync()—this will commit the last offset returned,
which you didn’t get to process yet.
80 | Chapter 4: Kafka Consumers: Reading Data from Kafka
Fortunately, the consumer API allows you to call commitSync() and commitAsync()
and pass a map of partitions and offsets that you wish to commit. If you are in the
middle of processing a batch of records, and the last message you got from partition 3
in topic “customers” has offset 5000, you can call commitSync() to commit offset
5000 for partition 3 in topic “customers.” Since your consumer may be consuming
more than a single partition, you will need to track offsets on all of them, which adds
complexity to your code.
Here is what a commit of specific offsets looks like:
private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
new HashMap<>();
int count = 0;
....
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new
OffsetAndMetadata(record.offset()+1, "no metadata"));
if (count % 1000 == 0)
consumer.commitAsync(currentOffsets, null);
count++;
}
}
This is the map we will use to manually track offsets.
Remember, println is a stand-in for whatever processing you do for the records
you consume.
After reading each record, we update the offsets map with the offset of the next
message we expect to process. This is where well start reading next time we start.
Here, we decide to commit current offsets every 1,000 records. In your applica‐
tion, you can commit based on time or perhaps content of the records.
I chose to call commitAsync(), but commitSync() is also completely valid here. Of
course, when committing specific offsets you still need to perform all the error
handling we’ve seen in previous sections.
Commits and Osets | 81
Rebalance Listeners
As we mentioned in the previous section about committing offsets, a consumer will
want to do some cleanup work before exiting and also before partition rebalancing.
If you know your consumer is about to lose ownership of a partition, you will want to
commit offsets of the last event youve processed. If your consumer maintained a
buffer with events that it only processes occasionally (e.g., the currentRecords map
we used when explaining pause() functionality), you will want to process the events
you accumulated before losing ownership of the partition. Perhaps you also need to
close file handles, database connections, and such.
The consumer API allows you to run your own code when partitions are added or
removed from the consumer. You do this by passing a ConsumerRebalanceListener
when calling the subscribe() method we discussed previously. ConsumerRebalance
Listener has two methods you can implement:
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
Called before the rebalancing starts and after the consumer stopped consuming
messages. This is where you want to commit offsets, so whoever gets this parti‐
tion next will know where to start.
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
Called after partitions have been reassigned to the broker, but before the con‐
sumer starts consuming messages.
This example will show how to use onPartitionsRevoked() to commit offsets before
losing ownership of a partition. In the next section we will show a more involved
example that also demonstrates the use of onPartitionsAssigned():
private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
new HashMap<>();
private class HandleRebalance implements ConsumerRebalanceListener {
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
}
public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
System.out.println("Lost partitions in rebalance.
Committing current
offsets:" + currentOffsets);
consumer.commitSync(currentOffsets);
}
}
try {
82 | Chapter 4: Kafka Consumers: Reading Data from Kafka
consumer.subscribe(topics, new HandleRebalance());
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()), new
OffsetAndMetadata(record.offset()+1, "no metadata"));
}
consumer.commitAsync(currentOffsets, null);
}
} catch (WakeupException e) {
// ignore, we're closing
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
try {
consumer.commitSync(currentOffsets);
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
We start by implementing a ConsumerRebalanceListener.
In this example we dont need to do anything when we get a new partition; well
just start consuming messages.
However, when we are about to lose a partition due to rebalancing, we need to
commit offsets. Note that we are committing the latest offsets weve processed,
not the latest offsets in the batch we are still processing. This is because a parti‐
tion could get revoked while we are still in the middle of a batch. We are commit‐
ting offsets for all partitions, not just the partitions we are about to lose—because
the offsets are for events that were already processed, there is no harm in that.
And we are using commitSync() to make sure the offsets are committed before
the rebalance proceeds.
The most important part: pass the ConsumerRebalanceListener to the sub
scribe() method so it will get invoked by the consumer.
Rebalance Listeners | 83
Consuming Records with Specic Osets
So far we’ve seen how to use poll() to start consuming messages from the last com‐
mitted offset in each partition and to proceed in processing all messages in sequence.
However, sometimes you want to start reading at a different offset.
If you want to start reading all messages from the beginning of the partition, or you
want to skip all the way to the end of the partition and start consuming only new
messages, there are APIs specifically for that: seekToBeginning(TopicPartition tp)
and seekToEnd(TopicPartition tp).
However, the Kafka API also lets you seek a specific offset. This ability can be used in
a variety of ways; for example, to go back a few messages or skip ahead a few mes‐
sages (perhaps a time-sensitive application that is falling behind will want to skip
ahead to more relevant messages). The most exciting use case for this ability is when
offsets are stored in a system other than Kafka.
Think about this common scenario: Your application is reading events from Kafka
(perhaps a clickstream of users in a website), processes the data (perhaps remove
records that indicate clicks from automated programs rather than users), and then
stores the results in a database, NoSQL store, or Hadoop. Suppose that we really dont
want to lose any data, nor do we want to store the same results in the database twice.
In these cases, the consumer loop may look a bit like this:
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
currentOffsets.put(new TopicPartition(record.topic(),
record.partition()),
record.offset());
processRecord(record);
storeRecordInDB(record);
consumer.commitAsync(currentOffsets);
}
}
In this example, we are very paranoid, so we commit offsets after processing each
record. However, there is still a chance that our application will crash after the record
was stored in the database but before we committed offsets, causing the record to be
processed again and the database to contain duplicates.
This could be avoided if there was a way to store both the record and the offset in one
atomic action. Either both the record and the offset are committed, or neither of
them are committed. As long as the records are written to a database and the offsets
to Kafka, this is impossible.
84 | Chapter 4: Kafka Consumers: Reading Data from Kafka
But what if we wrote both the record and the offset to the database, in one transac‐
tion? Then well know that either we are done with the record and the offset is com‐
mitted or we are not and the record will be reprocessed.
Now the only problem is if the record is stored in a database and not in Kafka, how
will our consumer know where to start reading when it is assigned a partition? This is
exactly what seek() can be used for. When the consumer starts or when new parti‐
tions are assigned, it can look up the offset in the database and seek() to that loca‐
tion.
Here is a skeleton example of how this may work. We use ConsumerRebalanceLister
and seek() to make sure we start processing at the offsets stored in the database:
public class SaveOffsetsOnRebalance implements
ConsumerRebalanceListener {
public void onPartitionsRevoked(Collection<TopicPartition>
partitions) {
commitDBTransaction();
}
public void onPartitionsAssigned(Collection<TopicPartition>
partitions) {
for(TopicPartition partition: partitions)
consumer.seek(partition, getOffsetFromDB(partition));
}
}
}
consumer.subscribe(topics, new SaveOffsetOnRebalance(consumer));
consumer.poll(0);
for (TopicPartition partition: consumer.assignment())
consumer.seek(partition, getOffsetFromDB(partition));
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
{
processRecord(record);
storeRecordInDB(record);
storeOffsetInDB(record.topic(), record.partition(),
record.offset());
}
commitDBTransaction();
}
Consuming Records with Specic Osets | 85
We use an imaginary method here to commit the transaction in the database.
The idea here is that the database records and offsets will be inserted to the data‐
base as we process the records, and we just need to commit the transactions
when we are about to lose the partition to make sure this information is persis‐
ted.
We also have an imaginary method to fetch the offsets from the database, and
then we seek() to those records when we get ownership of new partitions.
When the consumer first starts, after we subscribe to topics, we call poll() once
to make sure we join a consumer group and get assigned partitions, and then we
immediately seek() to the correct offset in the partitions we are assigned to.
Keep in mind that seek() only updates the position we are consuming from, so
the next poll() will fetch the right messages. If there was an error in seek()
(e.g., the offset does not exist), the exception will be thrown by poll().
Another imaginary method: this time we update a table storing the offsets in our
database. Here we assume that updating records is fast, so we do an update on
every record, but commits are slow, so we only commit at the end of the batch.
However, this can be optimized in different ways.
There are many different ways to implement exactly-once semantics by storing offsets
and data in an external store, but all of them will need to use the ConsumerRebalance
Listener and seek() to make sure offsets are stored in time and that the consumer
starts reading messages from the correct location.
But How Do We Exit?
Earlier in this chapter, when we discussed the poll loop, I told you not to worry about
the fact that the consumer polls in an infinite loop and that we would discuss how to
exit the loop cleanly. So, lets discuss how to exit cleanly.
When you decide to exit the poll loop, you will need another thread to call con
sumer.wakeup(). If you are running the consumer loop in the main thread, this can
be done from ShutdownHook. Note that consumer.wakeup() is the only consumer
method that is safe to call from a different thread. Calling wakeup will cause poll()
to exit with WakeupException, or if consumer.wakeup() was called while the thread
was not waiting on poll, the exception will be thrown on the next iteration when
poll() is called. The WakeupException doesn’t need to be handled, but before exiting
the thread, you must call consumer.close(). Closing the consumer will commit off‐
sets if needed and will send the group coordinator a message that the consumer is
leaving the group. The consumer coordinator will trigger rebalancing immediately
86 | Chapter 4: Kafka Consumers: Reading Data from Kafka
and you won’t need to wait for the session to time out before partitions from the con‐
sumer you are closing will be assigned to another consumer in the group.
Here is what the exit code will look like if the consumer is running in the main appli‐
cation thread. This example is a bit truncated, but you can view the full example at
http://bit.ly/2u47e9A.
Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
...
try {
// looping until ctrl-c, the shutdown hook will
cleanup on exit
while (true) {
ConsumerRecords<String, String> records =
movingAvg.consumer.poll(1000);
System.out.println(System.currentTimeMillis() + "
-- waiting for data...");
for (ConsumerRecord<String, String> record :
records) {
System.out.printf("offset = %d, key = %s,
value = %s\n",
record.offset(), record.key(),
record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at
position:" +
consumer.position(tp));
movingAvg.consumer.commitSync();
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();
System.out.println("Closed consumer and we are done");
}
}
But How Do We Exit? | 87
ShutdownHook runs in a seperate thread, so the only safe action we can take is to
call wakeup to break out of the poll loop.
Another thread calling wakeup will cause poll to throw a WakeupException. You’ll
want to catch the exception to make sure your application doesn’t exit unexpect‐
edly, but there is no need to do anything with it.
Before exiting the consumer, make sure you close it cleanly.
Deserializers
As discussed in the previous chapter, Kafka producers require serializers to convert
objects into byte arrays that are then sent to Kafka. Similarly, Kafka consumers
require deserializers to convert byte arrays recieved from Kafka into Java objects. In
previous examples, we just assumed that both the key and the value of each message
are strings and we used the default StringDeserializer in the consumer configura‐
tion.
In Chapter 3 about the Kafka producer, we saw how to serialize custom types and
how to use Avro and AvroSerializers to generate Avro objects from schema defini‐
tions and then serialize them when producing messages to Kafka. We will now look at
how to create custom deserializers for your own objects and how to use Avro and its
deserializers.
It should be obvious that the serializer used to produce events to Kafka must match
the deserializer that will be used when consuming events. Serializing with IntSerial
izer and then deserializing with StringDeserializer will not end well. This means
that as a developer you need to keep track of which serializers were used to write into
each topic, and make sure each topic only contains data that the deserializers you use
can interpret. This is one of the benefits of using Avro and the Schema Repository for
serializing and deserializing—the AvroSerializer can make sure that all the data
written to a specific topic is compatible with the schema of the topic, which means it
can be deserialized with the matching deserializer and schema. Any errors in compat‐
ibility—on the producer or the consumer side—will be caught easily with an appro‐
priate error message, which means you will not need to try to debug byte arrays for
serialization errors.
We will start by quickly showing how to write a custom deserializer, even though this
is the less common method, and then we will move on to an example of how to use
Avro to deserialize message keys and values.
88 | Chapter 4: Kafka Consumers: Reading Data from Kafka
Custom deserializers
Lets take the same custom object we serialized in Chapter 3, and write a deserializer
for it:
public class Customer {
private int customerID;
private String customerName;
public Customer(int ID, String name) {
this.customerID = ID;
this.customerName = name;
}
public int getID() {
return customerID;
}
public String getName() {
return customerName;
}
}
The custom deserializer will look as follows:
import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerDeserializer implements
Deserializer<Customer> {
@Override
public void configure(Map configs, boolean isKey) {
// nothing to configure
}
@Override
public Customer deserialize(String topic, byte[] data) {
int id;
int nameSize;
String name;
try {
if (data == null)
return null;
if (data.length < 8)
throw new SerializationException("Size of data received by
IntegerDeserializer is shorter than expected");
ByteBuffer buffer = ByteBuffer.wrap(data);
Deserializers | 89
id = buffer.getInt();
String nameSize = buffer.getInt();
byte[] nameBytes = new Array[Byte](nameSize);
buffer.get(nameBytes);
name = new String(nameBytes, 'UTF-8');
return new Customer(id, name);
} catch (Exception e) {
throw new SerializationException("Error when serializing
Customer
to byte[] " + e);
}
}
@Override
public void close() {
// nothing to close
}
}
The consumer also needs the implementation of the Customer class, and both the
class and the serializer need to match on the producing and consuming applica‐
tions. In a large organization with many consumers and producers sharing access
to the data, this can become challenging.
We are just reversing the logic of the serializer here—we get the customer ID and
name out of the byte array and use them to construct the object we need.
The consumer code that uses this serializer will look similar to this example:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer",
"org.apache.kafka.common.serialization.CustomerDeserializer");
KafkaConsumer<String, Customer> consumer =
new KafkaConsumer<>(props);
consumer.subscribe("customerCountries")
while (true) {
ConsumerRecords<String, Customer> records =
consumer.poll(100);
for (ConsumerRecord<String, Customer> record : records)
{
System.out.println("current customer Id: " +
90 | Chapter 4: Kafka Consumers: Reading Data from Kafka
record.value().getId() + " and
current customer name: " + record.value().getName());
}
}
Again, it is important to note that implementing a custom serializer and deserializer
is not recommended. It tightly couples producers and consumers and is fragile and
error-prone. A better solution would be to use a standard message format such as
JSON, Thrift, Protobuf, or Avro. We’ll now see how to use Avro deserializers with the
Kafka consumer. For background on Apache Avro, its schemas, and schema-
compatibility capabilities, refer back to Chapter 3.
Using Avro deserialization with Kafka consumer
Lets assume we are using the implementation of the Customer class in Avro that was
shown in Chapter 3. In order to consume those objects from Kafka, you want to
implement a consuming application similar to this:
Properties props = new Properties();
props.put("bootstrap.servers", "broker1:9092,broker2:9092");
props.put("group.id", "CountryCounter");
props.put("key.serializer",
"org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.serializer",
"io.confluent.kafka.serializers.KafkaAvroDeserializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts"
KafkaConsumer consumer = new
KafkaConsumer(createConsumerConfig(brokers, groupId, url));
consumer.subscribe(Collections.singletonList(topic));
System.out.println("Reading topic:" + topic);
while (true) {
ConsumerRecords<String, Customer> records =
consumer.poll(1000);
for (ConsumerRecord<String, Customer> record: records) {
System.out.println("Current customer name is: " +
record.value().getName());
}
consumer.commitSync();
}
We use KafkaAvroDeserializer to deserialize the Avro messages.
schema.registry.url is a new parameter. This simply points to where we store
the schemas. This way the consumer can use the schema that was registered by
the producer to deserialize the message.
Deserializers | 91
We specify the generated class, Customer, as the type for the record value.
record.value() is a Customer instance and we can use it accordingly.
Standalone Consumer: Why and How to Use a Consumer
Without a Group
So far, we have discussed consumer groups, which are where partitions are assigned
automatically to consumers and are rebalanced automatically when consumers are
added or removed from the group. Typically, this behavior is just what you want, but
in some cases you want something much simpler. Sometimes you know you have a
single consumer that always needs to read data from all the partitions in a topic, or
from a specific partition in a topic. In this case, there is no reason for groups or reba‐
lances—just assign the consumer-specific topic and/or partitions, consume messages,
and commit offsets on occasion.
When you know exactly which partitions the consumer should read, you don’t sub‐
scribe to a topic—instead, you assign yourself a few partitions. A consumer can either
subscribe to topics (and be part of a consumer group), or assign itself partitions, but
not both at the same time.
Here is an example of how a consumer can assign itself all partitions of a specific
topic and consume from them:
List<PartitionInfo> partitionInfos = null;
partitionInfos = consumer.partitionsFor("topic");
if (partitionInfos != null) {
for (PartitionInfo partition : partitionInfos)
partitions.add(new TopicPartition(partition.topic(),
partition.partition()));
consumer.assign(partitions);
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(1000);
for (ConsumerRecord<String, String> record: records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value());
}
consumer.commitSync();
}
}
92 | Chapter 4: Kafka Consumers: Reading Data from Kafka
We start by asking the cluster for the partitions available in the topic. If you only
plan on consuming a specific partition, you can skip this part.
Once we know which partitions we want, we call assign() with the list.
Other than the lack of rebalances and the need to manually find the partitions, every‐
thing else is business as usual. Keep in mind that if someone adds new partitions to
the topic, the consumer will not be notified. You will need to handle this by checking
consumer.partitionsFor() periodically or simply by bouncing the application
whenever partitions are added.
Older Consumer APIs
In this chapter we discussed the Java KafkaConsumer client that is part of the
org.apache.kafka.clients package. At the time of writing, Apache Kafka still has
two older clients written in Scala that are part of the kafka.consumer package, which
is part of the core Kafka module. These consumers are called SimpleConsumer (which
is not very simple). SimpleConsumer is a thin wrapper around the Kafka APIs that
allows you to consume from specific partitions and offsets. The other old API is
called high-level consumer or ZookeeperConsumerConnector. The high-level con‐
sumer is somewhat similar to the current consumer in that it has consumer groups
and it rebalances partitions, but it uses Zookeeper to manage consumer groups and
does not give you the same control over commits and rebalances as we have now.
Because the current consumer supports both behaviors and provides much more reli‐
ability and control to the developer, we will not discuss the older APIs. If you are
interested in using them, please think twice and then refer to Apache Kafka docu‐
mentation to learn more.
Summary
We started this chapter with an in-depth explanation of Kafkas consumer groups and
the way they allow multiple consumers to share the work of reading events from top‐
ics. We followed the theoretical discussion with a practical example of a consumer
subscribing to a topic and continuously reading events. We then looked into the most
important consumer configuration parameters and how they affect consumer behav‐
ior. We dedicated a large part of the chapter to discussing offsets and how consumers
keep track of them. Understanding how consumers commit offsets is critical when
writing reliable consumers, so we took time to explain the different ways this can be
done. We then discussed additional parts of the consumer APIs, handling rebalances
and closing the consumer.
Older Consumer APIs | 93
We concluded by discussing the deserializers used by consumers to turn bytes stored
in Kafka into Java objects that the applications can process. We discussed Avro deser‐
ializers in some detail, even though they are just one type of deserializer you can use,
because these are most commonly used with Kafka.
Now that you know how to produce and consume events with Kafka, the next chapter
explains some of the internals of a Kafka implementation.
94 | Chapter 4: Kafka Consumers: Reading Data from Kafka
CHAPTER 5
Kafka Internals
It is not strictly necessary to understand Kafkas internals in order to run Kafka in
production or write applications that use it. However, knowing how Kafka works
does provide context when troubleshooting or trying to understand why Kafka
behaves the way it does. Since covering every single implementation detail and design
decision is beyond the scope of this book, in this chapter we focus on three topics that
are especially relevant to Kafka practitioners:
How Kafka replication works
How Kafka handles requests from producers and consumers
How Kafka handles storage such as file format and indexes
Understanding these topics in-depth will be especially useful when tuning Kafka—
understanding the mechanisms that the tuning knobs control goes a long way toward
using them with precise intent rather than fiddling with them randomly.
Cluster Membership
Kafka uses Apache Zookeeper to maintain the list of brokers that are currently mem‐
bers of a cluster. Every broker has a unique identifier that is either set in the broker
configuration file or automatically generated. Every time a broker process starts, it
registers itself with its ID in Zookeeper by creating an ephemeral node. Different
Kafka components subscribe to the /brokers/ids path in Zookeeper where brokers
are registered so they get notified when brokers are added or removed.
If you try to start another broker with the same ID, you will get an error—the new
broker will try to register, but fail because we already have a Zookeeper node for the
same broker ID.
95
When a broker loses connectivity to Zookeeper (usually as a result of the broker stop‐
ping, but this can also happen as a result of network partition or a long garbage-
collection pause), the ephemeral node that the broker created when starting will be
automatically removed from Zookeeper. Kafka components that are watching the list
of brokers will be notified that the broker is gone.
Even though the node representing the broker is gone when the broker is stopped,
the broker ID still exists in other data structures. For example, the list of replicas of
each topic (see “Replicationon page 97) contains the broker IDs for the replica. This
way, if you completely lose a broker and start a brand new broker with the ID of the
old one, it will immediately join the cluster in place of the missing broker with the
same partitions and topics assigned to it.
The Controller
The controller is one of the Kafka brokers that, in addition to the usual broker func‐
tionality, is responsible for electing partition leaders (well discuss partition leaders
and what they do in the next section). The first broker that starts in the cluster
becomes the controller by creating an ephemeral node in ZooKeeper called /control
ler. When other brokers start, they also try to create this node, but receive a “node
already exists” exception, which causes them to “realize” that the controller node
already exists and that the cluster already has a controller. The brokers create a Zoo‐
keeper watch on the controller node so they get notified of changes to this node. This
way, we guarantee that the cluster will only have one controller at a time.
When the controller broker is stopped or loses connectivity to Zookeeper, the ephem‐
eral node will disappear. Other brokers in the cluster will be notified through the
Zookeeper watch that the controller is gone and will attempt to create the controller
node in Zookeeper themselves. The first node to create the new controller in Zoo‐
keeper is the new controller, while the other nodes will receive a “node already exists
exception and re-create the watch on the new controller node. Each time a controller
is elected, it receives a new, higher controller epoch number through a Zookeeper con‐
ditional increment operation. The brokers know the current controller epoch and if
they receive a message from a controller with an older number, they know to ignore
it.
When the controller notices that a broker left the cluster (by watching the relevant
Zookeeper path), it knows that all the partitions that had a leader on that broker will
need a new leader. It goes over all the partitions that need a new leader, determines
who the new leader should be (simply the next replica in the replica list of that parti‐
tion), and sends a request to all the brokers that contain either the new leaders or the
existing followers for those partitions. The request contains information on the new
leader and the followers for the partitions. Each new leader knows that it needs to
96 | Chapter 5: Kafka Internals
start serving producer and consumer requests from clients while the followers know
that they need to start replicating messages from the new leader.
When the controller notices that a broker joined the cluster, it uses the broker ID to
check if there are replicas that exist on this broker. If there are, the controller notifies
both new and existing brokers of the change, and the replicas on the new broker start
replicating messages from the existing leaders.
To summarize, Kafka uses Zookeeper’s ephemeral node feature to elect a controller
and to notify the controller when nodes join and leave the cluster. The controller is
responsible for electing leaders among the partitions and replicas whenever it notices
nodes join and leave the cluster. The controller uses the epoch number to prevent a
split brain” scenario where two nodes believe each is the current controller.
Replication
Replication is at the heart of Kafkas architecture. The very first sentence in Kafkas
documentation describes it as “a distributed, partitioned, replicated commit log ser‐
vice.” Replication is critical because it is the way Kafka guarantees availability and
durability when individual nodes inevitably fail.
As we’ve already discussed, data in Kafka is organized by topics. Each topic is parti‐
tioned, and each partition can have multiple replicas. Those replicas are stored on
brokers, and each broker typically stores hundreds or even thousands of replicas
belonging to different topics and partitions.
There are two types of replicas:
Leader replica
Each partition has a single replica designated as the leader. All produce and con‐
sume requests go through the leader, in order to guarantee consistency.
Follower replica
All replicas for a partition that are not leaders are called followers. Followers
don’t serve client requests; their only job is to replicate messages from the leader
and stay up-to-date with the most recent messages the leader has. In the event
that a leader replica for a partition crashes, one of the follower replicas will be
promoted to become the new leader for the partition.
Another task the leader is responsible for is knowing which of the follower replicas is
up-to-date with the leader. Followers attempt to stay up-to-date by replicating all the
messages from the leader as the messages arrive, but they can fail to stay in sync for
various reasons, such as when network congestion slows down replication or when a
broker crashes and all replicas on that broker start falling behind until we start the
broker and they can start replicating again.
Replication | 97
In order to stay in sync with the leader, the replicas send the leader Fetch requests,
the exact same type of requests that consumers send in order to consume messages.
In response to those requests, the leader sends the messages to the replicas. Those
Fetch requests contain the offset of the message that the replica wants to receive next,
and will always be in order.
A replica will request message 1, then message 2, and then message 3, and it will not
request message 4 before it gets all the previous messages. This means that the leader
can know that a replica got all messages up to message 3 when the replica requests
message 4. By looking at the last offset requested by each replica, the leader can tell
how far behind each replica is. If a replica hasn’t requested a message in more than 10
seconds or if it has requested messages but hasn’t caught up to the most recent mes‐
sage in more than 10 seconds, the replica is considered out of sync. If a replica fails to
keep up with the leader, it can no longer become the new leader in the event of failure
—after all, it does not contain all the messages.
The inverse of this, replicas that are consistently asking for the latest messages, is
called in-sync replicas. Only in-sync replicas are eligible to be elected as partition lead‐
ers in case the existing leader fails.
The amount of time a follower can be inactive or behind before it is considered out of
sync is controlled by the replica.lag.time.max.ms configuration parameter. This
allowed lag has implications on client behavior and data retention during leader elec‐
tion. We will discuss this in depth in Chapter 6, when we discuss reliability guaran‐
tees.
In addition to the current leader, each partition has a preferred leader—the replica
that was the leader when the topic was originally created. It is preferred because when
partitions are first created, the leaders are balanced between brokers (we explain the
algorithm for distributing replicas and leaders among brokers later in the chapter).
As a result, we expect that when the preferred leader is indeed the leader for all parti‐
tions in the cluster, load will be evenly balanced between brokers. By default, Kafka is
configured with auto.leader.rebalance.enable=true, which will check if the pre‐
ferred leader replica is not the current leader but is in-sync and trigger leader election
to make the preferred leader the current leader.
98 | Chapter 5: Kafka Internals
Finding the Preferred Leaders
The best way to identify the current preferred leader is by looking
at the list of replicas for a partition (You can see details of partitions
and replicas in the output of the kafka-topics.sh tool. Well dis‐
cuss this and other admin tools in Chapter 10.) The first replica in
the list is always the preferred leader. This is true no matter who is
the current leader and even if the replicas were reassigned to differ‐
ent brokers using the replica reassignment tool. In fact, if you man‐
ually reassign replicas, it is important to remember that the replica
you specify first will be the preferred replica, so make sure you
spread those around different brokers to avoid overloading some
brokers with leaders while other brokers are not handling their fair
share of the work.
Request Processing
Most of what a Kafka broker does is process requests sent to the partition leaders
from clients, partition replicas, and the controller. Kafka has a binary protocol (over
TCP) that specifies the format of the requests and how brokers respond to them—
both when the request is processed successfully or when the broker encounters errors
while processing the request. Clients always initiate connections and send requests,
and the broker processes the requests and responds to them. All requests sent to the
broker from a specific client will be processed in the order in which they were
received—this guarantee is what allows Kafka to behave as a message queue and pro‐
vide ordering guarantees on the messages it stores.
All requests have a standard header that includes:
Request type (also called API key)
Request version (so the brokers can handle clients of different versions and
respond accordingly)
Correlation ID: a number that uniquely identifies the request and also appears in
the response and in the error logs (the ID is used for troubleshooting)
Client ID: used to identify the application that sent the request
We will not describe the protocol here because it is described in significant detail in
the Kafka documentation. However, it is helpful to take a look at how requests are
processed by the broker—later, when we discuss how to monitor Kafka and the vari‐
ous configuration options, you will have context about which queues and threads the
metrics and configuration parameters refer to.
For each port the broker listens on, the broker runs an acceptor thread that creates a
connection and hands it over to a processor thread for handling. The number of pro‐
Request Processing | 99
cessor threads (also called network threads) is configurable. The network threads are
responsible for taking requests from client connections, placing them in a request
queue, and picking up responses from a response queue and sending them back to cli‐
ents. See Figure 5-1 for a visual of this process.
Once requests are placed on the request queue, IO threads are responsible for picking
them up and processing them. The most common types of requests are:
Produce requests
Sent by producers and contain messages the clients write to Kafka brokers.
Fetch requests
Sent by consumers and follower replicas when they read messages from Kafka
brokers.
Figure 5-1. Request processing inside Apache Kaa
Both produce requests and fetch requests have to be sent to the leader replica of a
partition. If a broker receives a produce request for a specific partition and the leader
for this partition is on a different broker, the client that sent the produce request will
get an error response of “Not a Leader for Partition.” The same error will occur if a
fetch request for a specific partition arrives at a broker that does not have the leader
for that partition. Kafkas clients are responsible for sending produce and fetch
requests to the broker that contains the leader for the relevant partition for the
request.
How do the clients know where to send the requests? Kafka clients use another
request type called a metadata request, which includes a list of topics the client is
interested in. The server response specifies which partitions exist in the topics, the
replicas for each partition, and which replica is the leader. Metadata requests can be
sent to any broker because all brokers have a metadata cache that contains this infor‐
mation.
Clients typically cache this information and use it to direct produce and fetch
requests to the correct broker for each partition. They also need to occasionally
100 | Chapter 5: Kafka Internals
refresh this information (refresh intervals are controlled by the meta
data.max.age.ms configuration parameter) by sending another metadata request so
they know if the topic metadata changed—for example, if a new broker was added or
some replicas were moved to a new broker (Figure 5-2). In addition, if a client
receives the “Not a Leader” error to one of its requests, it will refresh its metadata
before trying to send the request again, since the error indicates that the client is
using outdated information and is sending requests to the wrong broker.
Figure 5-2. Client routing requests
Produce Requests
As we saw in Chapter 3, a configuration parameter called acks is the number of brok‐
ers who need to acknowledge receiving the message before it is considered a success‐
ful write. Producers can be configured to consider messages as “written successfully”
when the message was accepted by just the leader (acks=1), all in-sync replicas
(acks=all), or the moment the message was sent without waiting for the broker to
accept it at all (acks=0).
When the broker that contains the lead replica for a partition receives a produce
request for this partition, it will start by running a few validations:
Does the user sending the data have write privileges on the topic?
Is the number of acks specified in the request valid (only 0, 1, and “all” are
allowed)?
If acks is set to all, are there enough in-sync replicas for safely writing the mes‐
sage? (Brokers can be configured to refuse new messages if the number of in-sync
replicas falls below a configurable number; we will discuss this in more detail in
Chapter 6, when we discuss Kafkas durability and reliability guarantees.)
Request Processing | 101
Then it will write the new messages to local disk. On Linux, the messages are written
to the filesystem cache and there is no guarantee about when they will be written to
disk. Kafka does not wait for the data to get persisted to disk—it relies on replication
for message durability.
Once the message is written to the leader of the partition, the broker examines the
acks configuration—if acks is set to 0 or 1, the broker will respond immediately; if
acks is set to all, the request will be stored in a buffer called purgatory until the
leader observes that the follower replicas replicated the message, at which point a
response is sent to the client.
Fetch Requests
Brokers process fetch requests in a way that is very similar to the way produce
requests are handled. The client sends a request, asking the broker to send messages
from a list of topics, partitions, and offsets—something like “Please send me messages
starting at offset 53 in partition 0 of topic Test and messages starting at offset 64 in
partition 3 of topic Test.” Clients also specify a limit to how much data the broker can
return for each partition. The limit is important because clients need to allocate
memory that will hold the response sent back from the broker. Without this limit,
brokers could send back replies large enough to cause clients to run out of memory.
As we’ve discussed earlier, the request has to arrive to the leaders of the partitions
specified in the request and the client will make the necessary metadata requests to
make sure it is routing the fetch requests correctly. When the leader receives the
request, it first checks if the request is valid—does this offset even exist for this partic‐
ular partition? If the client is asking for a message that is so old that it got deleted
from the partition or an offset that does not exist yet, the broker will respond with an
error.
If the offset exists, the broker will read messages from the partition, up to the limit set
by the client in the request, and send the messages to the client. Kafka famously uses a
zero-copy method to send the messages to the clients—this means that Kafka sends
messages from the file (or more likely, the Linux filesystem cache) directly to the net‐
work channel without any intermediate buffers. This is different than most databases
where data is stored in a local cache before being sent to clients. This technique
removes the overhead of copying bytes and managing buffers in memory, and results
in much improved performance.
In addition to setting an upper boundary on the amount of data the broker can
return, clients can also set a lower boundary on the amount of data returned. Setting
the lower boundary to 10K, for example, is the client’s way of telling the broker “Only
return results once you have at least 10K bytes to send me.” This is a great way to
reduce CPU and network utilization when clients are reading from topics that are not
seeing much traffic. Instead of the clients sending requests to the brokers every few
102 | Chapter 5: Kafka Internals
milliseconds asking for data and getting very few or no messages in return, the clients
send a request, the broker waits until there is a decent amount of data and returns the
data, and only then will the client ask for more (Figure 5-3). The same amount of data
is read overall but with much less back and forth and therefore less overhead.
Figure 5-3. Broker delaying response until enough data accumulated
Of course, we wouldn’t want clients to wait forever for the broker to have enough
data. After a while, it makes sense to just take the data that exists and process that
instead of waiting for more. Therefore, clients can also define a timeout to tell the
broker “If you didn’t satisfy the minimum amount of data to send within x milli‐
seconds, just send what you got.
It is also interesting to note that not all the data that exists on the leader of the parti‐
tion is available for clients to read. Most clients can only read messages that were
written to all in-sync replicas (follower replicas, even though they are consumers, are
exempt from this—otherwise replication would not work). We already discussed that
the leader of the partition knows which messages were replicated to which replica,
and until a message was written to all in-sync replicas, it will not be sent to consum‐
ers—attempts to fetch those messages will result in an empty response rather than an
error.
The reason for this behavior is that messages not replicated to enough replicas yet are
considered “unsafe”—if the leader crashes and another replica takes its place, these
messages will no longer exist in Kafka. If we allowed clients to read messages that
only exist on the leader, we could see inconsistent behavior. For example, if a con‐
sumer reads a message and the leader crashed and no other broker contained this
message, the message is gone. No other consumer will be able to read this message,
which can cause inconsistency with the consumer who did read it. Instead, we wait
until all the in-sync replicas get the message and only then allow consumers to read it
(Figure 5-4). This behavior also means that if replication between brokers is slow for
some reason, it will take longer for new messages to arrive to consumers (since we
wait for the messages to replicate first). This delay is limited to
replica.lag.time.max.ms—the amount of time a replica can be delayed in replicat‐
ing new messages while still being considered in-sync.
Request Processing | 103
Figure 5-4. Consumers only see messages that were replicated to in-sync replicas
Other Requests
We just discussed the most common types of requests used by Kafka clients: Meta
data, Produce, and Fetch. It is important to remember that we are talking about a
generic binary protocol used by clients over the network. Whereas Kafka includes
Java clients that were implemented and maintained by contributors to the Apache
Kafka project, there are also clients in other languages such as C, Python, Go, and
many others. You can see the full list on the Apache Kafka website and they all com‐
municate with Kafka brokers using this protocol.
In addition, the same protocol is used to communicate between the Kafka brokers
themselves. Those requests are internal and should not be used by clients. For exam‐
ple, when the controller announces that a partition has a new leader, it sends a Leader
AndIsr request to the new leader (so it will know to start accepting client requests)
and to the followers (so they will know to follow the new leader).
The Kafka protocol currently handles 20 different request types, and more will be
added. The protocol is ever-evolving—as we add more client capabilities, we need to
grow the protocol to match. For example, in the past, Kafka Consumers used Apache
Zookeeper to keep track of the offsets they receive from Kafka. So when a consumer
is started, it can check Zookeeper for the last offset that was read from its partitions
and know where to start processing. For various reasons, we decided to stop using
Zookeeper for this, and instead store those offsets in a special Kafka topic. In order to
do this, we had to add several requests to the protocol: OffsetCommitRequest, Offset
FetchRequest, and ListOffsetsRequest. Now when an application calls the
commitOffset() client API, the client no longer writes to Zookeeper; instead, it sends
OffsetCommitRequest to Kafka.
Topic creation is still done by command-line tools that update the list of topics in
Zookeeper directly, and brokers watch the topic list in Zookeeper to know when new
topics are added. We are working on improving Kafka and adding a Create
TopicRequest that will allow all clients (even in languages that don’t have a Zoo‐
keeper library) to create topics by asking Kafka brokers directly.
104 | Chapter 5: Kafka Internals
In addition to evolving the protocol by adding new request types, we sometimes
choose to modify existing requests to add some capabilities. For example, between
Kafka 0.9.0 and Kafka 0.10.0, we decided to let clients know who the current control‐
ler is by adding the information to the Metadata response. As a result, we added a
new version to the Metadata request and response. Now, 0.9.0 clients send Metadata
requests of version 0 (because version 1 did not exist in 0.9.0 clients) and the brokers,
whether they are 0.9.0 or 0.10.0 know to respond with a version 0 response, which
does not have the controller information. This is fine, because 0.9.0 clients dont
expect the controller information and wouldnt know how to parse it anyway. If you
have the 0.10.0 client, it will send a version 1 Metadata request and 0.10.0 brokers will
respond with a version 1 response that contains the controller information, which the
0.10.0 clients can use. If a 0.10.0 client sends a version 1 Metadata request to a 0.9.0
broker, the broker will not know how to handle the newer version of the request and
will respond with an error. This is the reason we recommend upgrading the brokers
before upgrading any of the clients—new brokers know how to handle old requests,
but not vice versa.
In release 0.10.0 we added ApiVersionRequest, which allows clients to ask the broker
which versions of each request is supported and to use the correct version accord‐
ingly. Clients that use this new capability correctly will be able to talk to older brokers
by using a version of the protocol that is supported by the broker they are connecting
to.
Physical Storage
The basic storage unit of Kafka is a partition replica. Partitions cannot be split
between multiple brokers and not even between multiple disks on the same broker.
So the size of a partition is limited by the space available on a single mount point. (A
mount point will consist of either a single disk, if JBOD configuration is used, or mul‐
tiple disks, if RAID is configured. See Chapter 2.)
When configuring Kafka, the administrator defines a list of directories in which the
partitions will be stored—this is the log.dirs parameter (not to be confused with the
location in which Kafka stores its error log, which is configured in the log4j.properties
file). The usual configuration includes a directory for each mount point that Kafka
will use.
Lets look at how Kafka uses the available directories to store data. First, we want to
look at how data is allocated to the brokers in the cluster and the directories in the
broker. Then we will look at how the broker manages the files—especially how the
retention guarantees are handled. We will then dive inside the files and look at the file
and index formats. Lastly we will look at Log Compaction, an advanced feature that
allows turning Kafka into a long-term data store, and describe how it works.
Physical Storage | 105
Partition Allocation
When you create a topic, Kafka first decides how to allocate the partitions between
brokers. Suppose you have 6 brokers and you decide to create a topic with 10 parti‐
tions and a replication factor of 3. Kafka now has 30 partition replicas to allocate to 6
brokers. When doing the allocations, the goals are:
To spread replicas evenly among brokers—in our example, to make sure we allo‐
cate 5 replicas per broker.
To make sure that for each partition, each replica is on a different broker. If parti‐
tion 0 has the leader on broker 2, we can place the followers on brokers 3 and 4,
but not on 2 and not both on 3.
If the brokers have rack information (available in Kafka release 0.10.0 and
higher), then assign the replicas for each partition to different racks if possible.
This ensures that an event that causes downtime for an entire rack does not cause
complete unavailability for partitions.
To do this, we start with a random broker (let’s say, 4) and start assigning partitions to
each broker in round-robin manner to determine the location for the leaders. So par‐
tition leader 0 will be on broker 4, partition 1 leader will be on broker 5, partition 2
will be on broker 0 (because we only have 6 brokers), and so on. Then, for each parti‐
tion, we place the replicas at increasing offsets from the leader. If the leader for parti‐
tion 0 is on broker 4, the first follower will be on broker 5 and the second on broker
0. The leader for partition 1 is on broker 5, so the first replica is on broker 0 and the
second on broker 1.
When rack awareness is taken into account, instead of picking brokers in numerical
order, we prepare a rack-alternating broker list. Suppose that we know that brokers 0,
1, and 2 are on the same rack, and brokers 3, 4, and 5 are on a separate rack. Instead
of picking brokers in the order of 0 to 5, we order them as 0, 3, 1, 4, 2, 5—each broker
is followed by a broker from a different rack (Figure 5-5). In this case, if the leader for
partition 0 is on broker 4, the first replica will be on broker 2, which is on a com‐
pletely different rack. This is great, because if the first rack goes offline, we know that
we still have a surviving replica and therefore the partition is still available. This will
be true for all our replicas, so we have guaranteed availability in the case of rack fail‐
ure.
106 | Chapter 5: Kafka Internals
Figure 5-5. Partitions and replicas assigned to brokers on dierent racks
Once we choose the correct brokers for each partition and replica, it is time to decide
which directory to use for the new partitions. We do this independently for each par‐
tition, and the rule is very simple: we count the number of partitions on each direc‐
tory and add the new partition to the directory with the fewest partitions. This means
that if you add a new disk, all the new partitions will be created on that disk. This is
because, until things balance out, the new disk will always have the fewest partitions.
Mind the Disk Space
Note that the allocation of partitions to brokers does not take avail‐
able space or existing load into account, and that allocation of par‐
titions to disks takes the number of partitions into account, but not
the size of the partitions. This means that if some brokers have
more disk space than others (perhaps because the cluster is a mix
of older and newer servers), some partitions are abnormally large,
or you have disks of different sizes on the same broker, you need to
be careful with the partition allocation.
File Management
Retention is an important concept in Kafka—Kafka does not keep data forever, nor
does it wait for all consumers to read a message before deleting it. Instead, the Kafka
administrator configures a retention period for each topic—either the amount of time
to store messages before deleting them or how much data to store before older mes‐
sages are purged.
Because finding the messages that need purging in a large file and then deleting a
portion of the file is both time-consuming and error-prone, we instead split each par‐
tition into segments. By default, each segment contains either 1 GB of data or a week
of data, whichever is smaller. As a Kafka broker is writing to a partition, if the seg‐
ment limit is reached, we close the file and start a new one.
Physical Storage | 107
The segment we are currently writing to is called an active segment. The active seg‐
ment is never deleted, so if you set log retention to only store a day of data but each
segment contains five days of data, you will really keep data for five days because we
can’t delete the data before the segment is closed. If you choose to store data for a
week and roll a new segment every day, you will see that every day we will roll a new
segment while deleting the oldest segment—so most of the time the partition will
have seven segments.
As you learned in Chapter 2, a Kafka broker will keep an open file handle to every
segment in every partition—even inactive segments. This leads to an usually high
number of open file handles, and the OS must be tuned accordingly.
File Format
Each segment is stored in a single data file. Inside the file, we store Kafka messages
and their offsets. The format of the data on the disk is identical to the format of the
messages that we send from the producer to the broker and later from the broker to
the consumers. Using the same message format on disk and over the wire is what
allows Kafka to use zero-copy optimization when sending messages to consumers
and also avoid decompressing and recompressing messages that the producer already
compressed.
Each message contains—in addition to its key, value, and offset—things like the mes‐
sage size, checksum code that allows us to detect corruption, magic byte that indicates
the version of the message format, compression codec (Snappy, GZip, or LZ4), and a
timestamp (added in release 0.10.0). The timestamp is given either by the producer
when the message was sent or by the broker when the message arrived—depending
on configuration.
If the producer is sending compressed messages, all the messages in a single producer
batch are compressed together and sent as the “value” of a “wrapper message
(Figure 5-6). So the broker receives a single message, which it sends to the consumer.
But when the consumer decompresses the message value, it will see all the messages
that were contained in the batch, with their own timestamps and offsets.
This means that if you are using compression on the producer (recommended!),
sending larger batches means better compression both over the network and on the
broker disks. This also means that if we decide to change the message format that
consumers use (e.g., add a timestamp to the message), both the wire protocol and the
on-disk format need to change, and Kafka brokers need to know how to handle cases
in which files contain messages of two formats due to upgrades.
108 | Chapter 5: Kafka Internals
Figure 5-6. A normal message and a wrapper message
Kafka brokers ship with the DumpLogSegment tool, which allows you to look at a parti‐
tion segment in the filesystem and examine its contents. It will show you the offset,
checksum, magic byte, size, and compression codec for each message. You can run
the tool using:
bin/kafka-run-class.sh kafka.tools.DumpLogSegments
If you choose the --deep-iteration parameter, it will show you information about
messages compressed inside the wrapper messages.
Indexes
Kafka allows consumers to start fetching messages from any available offset. This
means that if a consumer asks for 1 MB messages starting at offset 100, the broker
must be able to quickly locate the message for offset 100 (which can be in any of the
segments for the partition) and start reading the messages from that offset on. In
order to help brokers quickly locate the message for a given offset, Kafka maintains
an index for each partition. The index maps offsets to segment files and positions
within the file.
Indexes are also broken into segments, so we can delete old index entries when the
messages are purged. Kafka does not attempt to maintain checksums of the index. If
the index becomes corrupted, it will get regenerated from the matching log segment
simply by rereading the messages and recording the offsets and locations. It is also
completely safe for an administrator to delete index segments if needed—they will be
regenerated automatically.
Physical Storage | 109
Compaction
Normally, Kafka will store messages for a set amount of time and purge messages
older than the retention period. However, imagine a case where you use Kafka to
store shipping addresses for your customers. In that case, it makes more sense to
store the last address for each customer rather than data for just the last week or year.
This way, you dont have to worry about old addresses and you still retain the address
for customers who havent moved in a while. Another use case can be an application
that uses Kafka to store its current state. Every time the state changes, the application
writes the new state into Kafka. When recovering from a crash, the application reads
those messages from Kafka to recover its latest state. In this case, it only cares about
the latest state before the crash, not all the changes that occurred while it was run‐
ning.
Kafka supports such use cases by allowing the retention policy on a topic to be delete,
which deletes events older than retention time, to compact, which only stores the
most recent value for each key in the topic. Obviously, setting the policy to compact
only makes sense on topics for which applications produce events that contain both a
key and a value. If the topic contains null keys, compaction will fail.
How Compaction Works
Each log is viewed as split into two portions (see Figure 5-7):
Clean
Messages that have been compacted before. This section contains only one value
for each key, which is the latest value at the time of the pervious compaction
Dirty
Messages that were written after the last compaction.
Figure 5-7. Partition with clean and dirty portions
If compaction is enabled when Kafka starts (using the awkwardly named
log.cleaner.enabled configuration), each broker will start a compaction manager
thread and a number of compaction threads. These are responsible for performing
110 | Chapter 5: Kafka Internals
the compaction tasks. Each of these threads chooses the partition with the highest
ratio of dirty messages to total partition size and cleans this partition.
To compact a partition, the cleaner thread reads the dirty section of the partition and
creates an in-memory map. Each map entry is comprised of a 16-byte hash of a mes‐
sage key and the 8-byte offset of the previous message that had this same key. This
means each map entry only uses 24 bytes. If we look at a 1 GB segment and assume
that each message in the segment takes up 1 KB, the segment will contain 1 million
such messages and we will only need a 24 MB map to compact the segment (we may
need a lot less—if the keys repeat themselves, we will reuse the same hash entries
often and use less memory). This is quite efficient!
When configuring Kafka, the administrator configures how much memory compac‐
tion threads can use for this offset map. Even though each thread has its own map,
the configuration is for total memory across all threads. If you configured 1 GB for
the compaction offset map and you have five cleaner threads, each thread will get 200
MB for its own offset map. Kafka doesn’t require the entire dirty section of the parti‐
tion to fit into the size allocated for this map, but at least one full segment has to fit. If
it doesn’t, Kafka will log an error and the administrator will need to either allocate
more memory for the offset maps or use fewer cleaner threads. If only a few segments
fit, Kafka will start by compacting the oldest segments that fit into the map. The rest
will remain dirty and wait for the next compaction.
Once the cleaner thread builds the offset map, it will start reading off the clean seg‐
ments, starting with the oldest, and check their contents against the offset map. For
each message it checks, if the key of the message exists in the offset map. If the key
does not exist in the map, the value of the message we’ve just read is still the latest and
we copy over the message to a replacement segment. If the key does exist in the map,
we omit the message because there is a message with an identical key but newer value
later in the partition. Once weve copied over all the messages that still contain the
latest value for their key, we swap the replacement segment for the original and move
on to the next segment. At the end of the process, we are left with one message per
key—the one with the latest value. See Figure 5-8.
Physical Storage | 111
Figure 5-8. Partition segment before and aer compaction
Deleted Events
If we always keep the latest message for each key, what do we do when we really want
to delete all messages for a specific key, such as if a user left our service and we are
legally obligated to remove all traces of that user from our system?
In order to delete a key from the system completely, not even saving the last message,
the application must produce a message that contains that key and a null value. When
the cleaner thread finds such a message, it will first do a normal compaction and
retain only the message with the null value. It will keep this special message (known
as a tombstone) around for a configurable amount of time. During this time, consum‐
ers will be able to see this message and know that the value is deleted. So if a con‐
sumer copies data from Kafka to a relational database, it will see the tombstone
message and know to delete the user from the database. After this set amount of time,
the cleaner thread will remove the tombstone message, and the key will be gone from
the partition in Kafka. It is important to give consumers enough time to see the
tombstone message, because if our consumer was down for a few hours and missed
the tombstone message, it will simply not see the key when consuming and therefore
not know that it was deleted from Kafka or to delete it from the database.
When Are Topics Compacted?
In the same way that the delete policy never deletes the current active segments, the
compact policy never compacts the current segment. Messages are eligble for compac‐
tion only on inactive segments.
In version 0.10.0 and older, Kafka will start compacting when 50% of the topic con‐
tains dirty records. The goal is not to compact too often (since compaction can
impact the read/write performance on a topic), but also not leave too many dirty
records around (since they consume disk space). Wasting 50% of the disk space used
112 | Chapter 5: Kafka Internals
by a topic on dirty records and then compacting them in one go seems like a reason‐
able trade-off, and it can be tuned by the administrator.
In future versions, we are planning to add a grace period during which we guarantee
that messages will remain uncompacted. This will allow applications that need to see
every message that was written to the topic enough time to be sure they indeed saw
those messages even if they are lagging a bit.
Summary
There is obviously more to Kafka than we could cover in this chapter, but we hope
this gave you a taste of the kind of design decisions and optimizations weve made
when working on the project and perhaps explained some of the more obscure
behaviors and configurations you’ve run into while using Kafka.
If you are really interested in Kafka internals, there is no substitute for reading the
code. The Kafka developer mailing list (dev@kaa.apache.org) is a very friendly com‐
munity and there is always someone willing to answer questions regarding how Kafka
really works. And while you are reading the code, perhaps you can fix a bug or two—
open source projects always welcome contributions.
Summary | 113
CHAPTER 6
Reliable Data Delivery
Reliable data delivery is one of the attributes of a system that cannot be left as an
afterthought. Like performance, it has to be designed into a system from its very first
whiteboard diagram. You cannot bolt on reliability after the fact. More so, reliability
is a property of a system—not of a single component—so even when we are talking
about the reliability guarantees of Apache Kafka, you will need to keep the entire sys‐
tem and its use cases in mind. When it comes to reliability, the systems that integrate
with Kafka are as important as Kafka itself. And because reliability is a system con‐
cern, it cannot be the responsibility of just one person. Everyone—Kafka administra‐
tors, Linux administrators, network and storage administrators, and the application
developers—must work together to build a reliable system.
Apache Kafka is very flexible about reliable data delivery. We understand that Kafka
has many use cases, from tracking clicks in a website to credit card payments. Some
of the use cases require utmost reliability while others prioritize speed and simplicity
over reliability. Kafka was written to be configurable enough and its client API flexi‐
ble enough to allow all kinds of reliability trade-offs.
Because of its flexibility, it is also easy to accidentally shoot yourself in the foot when
using Kafka—believing that your system is reliable when in fact it is not. In this chap‐
ter, we will start by talking about different kinds of reliability and what they mean in
the context of Apache Kafka. Then we will talk about Kafkas replication mechanism
and how it contributes to the reliability of the system. We will then discuss Kafkas
brokers and topics and how they should be configured for different use cases. Then
we will discuss the clients, producer, and consumer, and how they should be used in
different reliability scenarios. Last, we will discuss the topic of validating the system
reliability, because it is not enough to believe a system is reliable—the assumption
must be thoroughly tested.
115
Reliability Guarantees
When we talk about reliability, we usually talk in terms of guarantees, which are the
behaviors a system is guaranteed to preserve under different circumstances.
Probably the best known reliability guarantee is ACID, which is the standard reliabil‐
ity guarantee that relational databases universally support. ACID stands for atomicity,
consistency, isolation, and durability. When a vendor explains that their database is
ACID-compliant, it means the database guarantees certain behaviors regarding trans‐
action behavior.
Those guarantees are the reason people trust relational databases with their most crit‐
ical applications—they know exactly what the system promises and how it will behave
in different conditions. They understand the guarantees and can write safe applica‐
tions by relying on those guarantees.
Understanding the guarantees Kafka provides is critical for those seeking to build
reliable applications. This understanding allows the developers of the system to figure
out how it will behave under different failure conditions. So, what does Apache Kafka
guarantee?
Kafka provides order guarantee of messages in a partition. If message B was writ‐
ten after message A, using the same producer in the same partition, then Kafka
guarantees that the offset of message B will be higher than message A, and that
consumers will read message B after message A.
Produced messages are considered “committed” when they were written to the
partition on all its in-sync replicas (but not necessarily flushed to disk). Produc‐
ers can choose to receive acknowledgments of sent messages when the message
was fully committed, when it was written to the leader, or when it was sent over
the network.
Messages that are committed will not be lost as long as at least one replica
remains alive.
Consumers can only read messages that are committed.
These basic guarantees can be used while building a reliable system, but in them‐
selves, don’t make the system fully reliable. There are trade-offs involved in building a
reliable system, and Kafka was built to allow administrators and developers to decide
how much reliability they need by providing configuration parameters that allow
controlling these trade-offs. The trade-offs usually involve how important it is to reli‐
ably and consistently store messages versus other important considerations such as
availability, high throughput, low latency, and hardware costs. We next review Kafkas
replication mechanism, introduce terminology, and discuss how reliability is built
into Kafka. After that, we go over the configuration parameters we just mentioned.
116 | Chapter 6: Reliable Data Delivery
Replication
Kafkas replication mechanism, with its multiple replicas per partition, is at the core
of all of Kafkas reliability guarantees. Having a message written in multiple replicas is
how Kafka provides durability of messages in the event of a crash.
We explained Kafkas replication mechanism in depth in Chapter 5, but let’s recap the
highlights here.
Each Kafka topic is broken down into partitions, which are the basic data building
blocks. A partition is stored on a single disk. Kafka guarantees order of events within
a partition and a partition can be either online (available) or offline (unavailable).
Each partition can have multiple replicas, one of which is a designated leader. All
events are produced to and consumed from the leader replica. Other replicas just
need to stay in sync with the leader and replicate all the recent events on time. If the
leader becomes unavailable, one of the in-sync replicas becomes the new leader.
A replica is considered in-sync if it is the leader for a partition, or if it is a follower
that:
Has an active session with Zookeeper—meaning, it sent a heartbeat to Zookeeper
in the last 6 seconds (configurable).
Fetched messages from the leader in the last 10 seconds (configurable).
Fetched the most recent messages from the leader in the last 10 seconds. That is,
it isn’t enough that the follower is still getting messages from the leader; it must
have almost no lag.
If a replica loses connection to Zookeeper, stops fetching new messages, or falls
behind and can’t catch up within 10 seconds, the replica is considered out-of-sync.
An out-of-sync replica gets back into sync when it connects to Zookeeper again and
catches up to the most recent message written to the leader. This usually happens
quickly after a temporary network glitch is healed but can take a while if the broker
the replica is stored on was down for a longer period of time.
Out-of-Sync Replicas
Seeing one or more replicas rapidly flip between in-sync and out-
of-sync status is a sure sign that something is wrong with the clus‐
ter. The cause is often a misconfiguration of Javas garbage
collection on a broker. Misconfigured garbage collection can cause
the broker to pause for a few seconds, during which it will lose con‐
nectivity to Zookeeper. When a broker loses connectivity to Zoo‐
keeper, it is considered out-of-sync with the cluster, which causes
the flipping behavior.
Replication | 117
An in-sync replica that is slightly behind can slow down producers and consumers—
since they wait for all the in-sync replicas to get the message before it is committed.
Once a replica falls out of sync, we no longer wait for it to get messages. It is still
behind, but now there is no performance impact. The catch is that with fewer in-sync
replicas, the effective replication factor of the partition is lower and therefore there is
a higher risk for downtime or data loss.
In the next section, we will look at what this means in practice.
Broker Conguration
There are three configuration parameters in the broker that change Kafkas behavior
regarding reliable message storage. Like many broker configuration variables, these
can apply at the broker level, controlling configuration for all topics in the system,
and at the topic level, controlling behavior for a specific topic.
Being able to control reliability trade-offs at the topic level means that the same Kafka
cluster can be used to host reliable and nonreliable topics. For example, at a bank, the
administrator will probably want to set very reliable defaults for the entire cluster but
make an exception to the topic that stores customer complaints where some data loss
is acceptable.
Lets look at these configuration parameters one by one and see how they affect relia‐
bility of message storage in Kafka and the trade-offs involved.
Replication Factor
The topic-level configuration is replication.factor. At the broker level, you control
the default.replication.factor for automatically created topics.
Until this point, throughout the book, we always assumed that topics had a replica‐
tion factor of three, meaning that each partition is replicated three times on three dif‐
ferent brokers. This was a reasonable assumption, as this is Kafkas default, but this is
also a configuration that users can modify. Even after a topic exists, you can choose to
add or remove replicas and thereby modify the replication factor.
A replication factor of N allows you to lose N-1 brokers while still being able to read
and write data to the topic reliably. So a higher replication factor leads to higher avail‐
ability, higher reliability, and fewer disasters. On the flip side, for a replication factor
of N, you will need at least N brokers and you will store N copies of the data, meaning
you will need N times as much disk space. We are basically trading availability for
hardware.
So how do you determine the right number of replicas for a topic? The answer is
based on how critical a topic is and how much you are willing to pay for higher avail‐
ability. It also depends a bit on how paranoid you are.
118 | Chapter 6: Reliable Data Delivery
If you are totally OK with a specific topic being unavailable when a single broker is
restarted (which is part of the normal operations of a cluster), then a replication fac‐
tor of 1 may be enough. Dont forget to make sure your management and users are
also OK with this trade-off—you are saving on disks or servers, but losing high avail‐
ability. A replication factor of 2 means you can lose one broker and still be OK, which
sounds like enough, but keep in mind that losing one broker can sometimes (mostly
on older versions of Kafka) send the cluster into an unstable state, forcing you to
restart another broker—the Kafka Controller. This means that with a replication fac‐
tor of 2, you may be forced to go into unavailability in order to recover from an
operational issue. This can be a tough choice.
For those reasons, we recommend a replication factor of 3 for any topic where availa‐
bility is an issue. In rare cases, this is considered not safe enough—we’ve seen banks
run critical topics with five replicas, just in case.
Placement of replicas is also very important. By default, Kafka will make sure each
replica for a partition is on a separate broker. However, in some cases, this is not safe
enough. If all replicas for a partition are placed on brokers that are on the same rack
and the top-of-rack switch misbehaves, you will lose availability of the partition
regardless of the replication factor. To protect against rack-level misfortune, we rec‐
ommend placing brokers in multiple racks and using the broker.rack broker config‐
uration parameter to configure the rack name for each broker. If rack names are
configured, Kafka will make sure replicas for a partition are spread across multiple
racks in order to guarantee even higher availability. In Chapter 5 we provided details
on how Kafka places replicas on brokers and racks, if you are interested in under‐
standing more.
Unclean Leader Election
This configuration is only available at the broker (and in practice, cluster-wide) level.
The parameter name is unclean.leader.election.enable and by default it is set to
true.
As explained earlier, when the leader for a partition is no longer available, one of the
in-sync replicas will be chosen as the new leader. This leader election is “clean” in the
sense that it guarantees no loss of committed data—by definition, committed data
exists on all in-sync replicas.
But what do we do when no in-sync replica exists except for the leader that just
became unavailable?
This situation can happen in one of two scenarios:
The partition had three replicas, and the two followers became unavailable (lets
say two brokers crashed). In this situation, as producers continue writing to the
Broker Conguration | 119
leader, all the messages are acknowledged and committed (since the leader is the
one and only in-sync replica). Now lets say that the leader becomes unavailable
(oops, another broker crash). In this scenario, if one of the out-of-sync followers
starts first, we have an out-of-sync replica as the only available replica for the
partition.
The partition had three replicas and, due to network issues, the two followers fell
behind so that even though they are up and replicating, they are no longer in
sync. The leader keeps accepting messages as the only in-sync replica. Now if the
leader becomes unavailable, the two available replicas are no longer in-sync.
In both these scenarios, we need to make a difficult decision:
If we don’t allow the out-of-sync replica to become the new leader, the partition
will remain offline until we bring the old leader (and the last in-sync replica)
back online. In some cases (e.g., memory chip needs replacement), this can take
many hours.
If we do allow the out-of-sync replica to become the new leader, we are going to
lose all messages that were written to the old leader while that replica was out of
sync and also cause some inconsistencies in consumers. Why? Imagine that while
replicas 0 and 1 were not available, we wrote messages with offsets 100-200 to
replica 2 (then the leader). Now replica 3 is unavailable and replica 0 is back
online. Replica 0 only has messages 0-100 but not 100-200. If we allow replica 0
to become the new leader, it will allow producers to write new messages and
allow consumers to read them. So, now the new leader has completely new mes‐
sages 100-200. First, lets note that some consumers may have read the old mes‐
sages 100-200, some consumers got the new 100-200, and some got a mix of
both. This can lead to pretty bad consequences when looking at things like
downstream reports. In addition, replica 2 will come back online and become a
follower of the new leader. At that point, it will delete any messages it got that are
ahead of the current leader. Those messages will not be available to any consumer
in the future.
In summary, if we allow out-of-sync replicas to become leaders, we risk data loss and
data inconsistencies. If we dont allow them to become leaders, we face lower availa‐
bility as we must wait for the original leader to become available before the partition
is back online.
Setting unclean.leader.election.enable to true means we allow out-of-sync repli‐
cas to become leaders (knowns as unclean election), knowing that we will lose mes‐
sages when this occurs. If we set it to false, we choose to wait for the original leader to
come back online, resulting in lower availability. We typically see unclean leader elec‐
tion disabled (configuration set to false) in systems where data quality and consis‐
tency are critical—banking systems are a good example (most banks would rather be
120 | Chapter 6: Reliable Data Delivery
unable to process credit card payments for few minutes or even hours than risk pro‐
cessing a payment incorrectly). In systems where availability is more important, such
as real-time clickstream analysis, unclean leader election is often enabled.
Minimum In-Sync Replicas
Both the topic and the broker-level configuration are called min.insync.replicas.
As we’ve seen, there are cases where even though we configured a topic to have three
replicas, we may be left with a single in-sync replica. If this replica becomes unavail‐
able, we may have to choose between availability and consistency. This is never an
easy choice. Note that part of the problem is that, per Kafka reliability guarantees,
data is considered committed when it is written to all in-sync replicas, even when all
means just one replica and the data could be lost if that replica is unavailable.
If you would like to be sure that committed data is written to more than one replica,
you need to set the minimum number of in-sync replicas to a higher value. If a topic
has three replicas and you set min.insync.replicas to 2, then you can only write to
a partition in the topic if at least two out of the three replicas are in-sync.
When all three replicas are in-sync, everything proceeds normally. This is also true if
one of the replicas becomes unavailable. However, if two out of three replicas are not
available, the brokers will no longer accept produce requests. Instead, producers that
attempt to send data will receive NotEnoughReplicasException. Consumers can con‐
tinue reading existing data. In effect, with this configuation, a single in-sync replica
becomes read-only. This prevents the undesirable situation where data is produced
and consumed, only to disappear when unclean election occurs. In order to recover
from this read-only situation, we must make one of the two unavailable partitions
available again (maybe restart the broker) and wait for it to catch up and get in-sync.
Using Producers in a Reliable System
Even if we configure the brokers in the most reliable configuration possible, the sys‐
tem as a whole can still accidentally lose data if we dont configure the producers to be
reliable as well.
Here are two example scenarios to demonstrate this:
We configured the brokers with three replicas, and unclean leader election is dis‐
abled. So we should never lose a single message that was committed to the Kafka
cluster. However, we configured the producer to send messages with acks=1. We
send a message from the producer and it was written to the leader, but not yet to
the in-sync replicas. The leader sent back a response to the producer saying
Message was written successfully” and immediately crashes before the data was
replicated to the other replicas. The other replicas are still considered in-sync
Using Producers in a Reliable System | 121
(remember that it takes a while before we declare a replica out of sync) and one
of them will become the leader. Since the message was not written to the replicas,
it will be lost. But the producing application thinks it was written successfully.
The system is consistent because no consumer saw the message (it was never
committed because the replicas never got it), but from the producer perspective,
a message was lost.
We configured the brokers with three replicas, and unclean leader election is dis‐
abled. We learned from our mistakes and started producing messages with
acks=all. Suppose that we are attempting to write a message to Kafka, but the
leader for the partition we are writing to just crashed and a new one is still get‐
ting elected. Kafka will respond with “Leader not Available.” At this point, if the
producer doesn’t handle the error correctly and doesnt retry until the write is
successful, the message may be lost. Once again, this is not a broker reliability
issue because the broker never got the message; and it is not a consistency issue
because the consumers never got the message either. But if producers don’t han‐
dle errors correctly, they may cause message loss.
So how do we avoid these tragic results? As the examples show, there are two impor‐
tant things that everyone who writes applications that produce to Kafka must pay
attention to:
Use the correct acks configuration to match reliability requirements
Handle errors correctly both in configuration and in code
We discussed producer modes in depth in Chapter 3, but lets go over the important
points again.
Send Acknowledgments
Producers can choose between three different acknowledgment modes:
acks=0 means that a message is considered to be written successfully to Kafka if
the producer managed to send it over the network. You will still get errors if the
object you are sending cannot be serialized or if the network card failed, but you
won’t get any error if the partition is offline or if the entire Kafka cluster decided
to take a long vacation. This means that even in the expected case of a clean
leader election, your producer will lose messages because it wont know that the
leader is unavailable while a new leader is being elected. Running with acks=0 is
very fast (which is why you see a lot of benchmarks with this configuration). You
can get amazing throughput and utilize most of your bandwidth, but you are
guaranteed to lose some messages if you choose this route.
122 | Chapter 6: Reliable Data Delivery
acks=1 means that the leader will send either an acknowledgment or an error the
moment it got the message and wrote it to the partition data file (but not neces‐
sarily synced to disk). This means that under normal circumstances of leader
election, your producer will get LeaderNotAvailableException while a leader is
getting elected, and if the producer handles this error correctly (see next section),
it will retry sending the message and the message will arrive safely to the new
leader. You can lose data if the leader crashes and some messages that were suc‐
cessfully written to the leader and acknowledged were not replicated to the fol‐
lowers before the crash.
acks=all means that the leader will wait until all in-sync replicas got the message
before sending back an acknowledgment or an error. In conjunction with the
min.insync.replica configuration on the broker, this lets you control how
many replicas get the message before it is acknowledged. This is the safest option
—the producer won’t stop trying to send the message before it is fully committed.
This is also the slowest option—the producer waits for all replicas to get all the
messages before it can mark the message batch as “done” and carry on. The
effects can be mitigated by using async mode for the producer and by sending
larger batches, but this option will typically get you lower throughput.
Conguring Producer Retries
There are two parts to handling errors in the producer: the errors that the producers
handle automatically for you and the errors that you as the developer using the pro‐
ducer library must handle.
The producer can handle retriable errors that are returned by the broker for you.
When the producer sends messages to a broker, the broker can return either a success
or an error code. Those error codes belong to two categories—errors that can be
resolved after retrying and errors that wont be resolved. For example, if the broker
returns the error code LEADER_NOT_AVAILABLE, the producer can try sending the
error again—maybe a new broker was elected and the second attempt will succeed.
This means that LEADER_NOT_AVAILABLE is a retriable error. On the other hand, if a
broker returns an INVALID_CONFIG exception, trying the same message again will not
change the configuration. This is an example of a nonretriable error.
In general, if your goal is to never lose a message, your best approach is to configure
the producer to keep trying to send the messages when it encounters a retriable error.
Why? Because things like lack of leader or network connectivity issues often take a
few seconds to resolve—and if you just let the producer keep trying until it succeeds,
you don’t need to handle these issues yourself. I frequently get asked “how many
times should I configure the producer to retry?” and the answer really depends on
what you are planning on doing after the producer throws an exception that it retried
Using Producers in a Reliable System | 123
N times and gave up. If your answer is “I’ll catch the exception and retry some more,
then you definitely need to set the number of retries higher and let the producer con‐
tinue trying. You want to stop retrying when the answer is either “I’ll just drop the
message; theres no point to continue retrying” or “I’ll just write it somewhere else
and handle it later.” Note that Kafkas cross-DC replication tool (MirrorMaker, which
well discuss in Chapter 8) is configured by default to retry endlessly (i.e., retries =
MAX_INT)—because as a highly reliable replication tool, it should never just drop mes‐
sages.
Note that retrying to send a failed message often includes a small risk that both mes‐
sages were successfully written to the broker, leading to duplicates. For example, if
network issues prevented the broker acknowledgment from reaching the producer,
but the message was successfully written and replicated, the producer will treat the
lack of acknowledgment as a temporary network issue and will retry sending the
message (since it can’t know that it was received). In that case, the broker will end up
having the same message twice. Retries and careful error handling can guarantee that
each message will be stored at least once, but in the current version of Apache Kafka
(0.10.0), we can’t guarantee it will be stored exactly once. Many real-world applica‐
tions add a unique identifier to each message to allow detecting duplicates and clean‐
ing them when consuming the messages. Other applications make the messages
idempotent—meaning that even if the same message is sent twice, it has no negative
impact on correctness. For example, the message “Account value is 110$” is idempo‐
tent, since sending it several times doesnt change the result. The message “Add $10 to
the account” is not idempotent, since it changes the result every time you send it.
Additional Error Handling
Using the built-in producer retries is an easy way to correctly handle a large variety of
errors without loss of messages, but as a developer, you must still be able to handle
other types of errors. These include:
Nonretriable broker errors such as errors regarding message size, authorization
errors, etc.
Errors that occur before the message was sent to the broker—for example, seriali‐
zation errors
Errors that occur when the producer exhausted all retry attempts or when the
available memory used by the producer is filled to the limit due to using all of it
to store messages while retrying
In Chapter 3 we discussed how to write error handlers for both sync and async
message-sending methods. The content of these error handlers is specific to the appli‐
cation and its goals—do you throw away “bad messages”? Log errors? Store these
messages in a directory on the local disk? Trigger a callback to another application?
124 | Chapter 6: Reliable Data Delivery
These decisions are specific to your architecture. Just note that if all your error han‐
dler is doing is retrying to send the message, you are better off relying on the produc‐
er’s retry functionality.
Using Consumers in a Reliable System
Now that we have learned how to produce data while taking Kafkas reliability guar‐
antees into account, it is time to see how to consume data.
As we saw in the first part of this chapter, data is only available to consumers after it
has been committed to Kafka—meaning it was written to all in-sync replicas. This
means that consumers get data that is guaranteed to be consistent. The only thing
consumers are left to do is make sure they keep track of which messages they’ve read
and which messages they havent. This is key to not losing messages while consuming
them.
When reading data from a partition, a consumer is fetching a batch of events, check‐
ing the last offset in the batch, and then requesting another batch of events starting
from the last offset received. This guarantees that a Kafka consumer will always get
new data in correct order without missing any events.
When a consumer stops, another consumer needs to know where to pick up the work
—what was the last offset that the previous consumer processed before it stopped?
The “other” consumer can even be the original one after a restart. It doesnt really
matter—some consumer is going to pick up consuming from that partition, and it
needs to know in which offset to start. This is why consumers need to “commit” their
offsets. For each partition it is consuming, the consumer stores its current location, so
they or another consumer will know where to continue after a restart. The main way
consumers can lose messages is when committing offsets for events they’ve read but
didn’t completely process yet. This way, when another consumer picks up the work, it
will skip those events and they will never get processed. This is why paying careful
attention to when and how offsets get committed is critical.
Committed Messages Versus Commited Osets
This is different from a committed message, which, as discussed
previously, is a message that was written to all in-sync replicas and
is available to consumers. Committed osets are offsets the con‐
sumer sent to Kafka to acknowledge that it received and processed
all the messages in a partition up to this specific offset.
In Chapter 4 we discussed the consumer API in detail and covered the many methods
for committing offsets. Here we will cover some important considerations and
choices, but refer you back to Chapter 4 for details on using the APIs.
Using Consumers in a Reliable System | 125
Important Consumer Conguration Properties for Reliable Processing
There are four consumer configuration properties that are important to understand
in order to configure your consumer for a desired reliability behavior.
The first is group.id, as explained in great detail in Chapter 4. The basic idea is that if
two consumers have the same group ID and subscribe to the same topic, each will be
assigned a subset of the partitions in the topic and will therefore only read a subset of
the messages individually (but all the messages will be read by the group as a whole).
If you need a consumer to see, on its own, every single message in the topics it is sub‐
scribed to—it will need a unique group.id.
The second relevant configuration is auto.offset.reset. This parameter controls
what the consumer will do when no offsets were committed (e.g., when the consumer
first starts) or when the consumer asks for offsets that dont exist in the broker (Chap‐
ter 4 explains how this can happen). There are only two options here. If you choose
earliest, the consumer will start from the beginning of the partition whenever it
doesn’t have a valid offset. This can lead to the consumer processing a lot of messages
twice, but it guarantees to minimize data loss. If you choose latest, the consumer
will start at the end of the partition. This minimizes duplicate processing by the con‐
sumer but almost certainly leads to some messages getting missed by the consumer.
The third relevant configuration is enable.auto.commit. This is a big decision: are
you going to let the consumer commit offsets for you based on schedule, or are you
planning on committing offsets manually in your code? The main benefit of auto‐
matic offset commits is that it’s one less thing to worry about when implementing
your consumers. If you do all the processing of consumed records within the con‐
sumer poll loop, then the automatic offset commit guarantees you will never commit
an offset that you didnt process. (If you are not sure what the consumer poll loop
is, refer back to Chapter 4.) The main drawbacks of automatic offset commits is that
you have no control over the number of duplicate records you may need to process
(because your consumer stopped after processing some records but before the auto‐
mated commit kicked in). If you do anything fancy like pass records to another
thread to process in the background, the automatic commit may commit offsets for
records the consumer has read but perhaps did not process yet.
The fourth relevant configuration is tied to the third, and is auto.com
mit.interval.ms. If you choose to commit offsets automatically, this configuration
lets you configure how frequently they will be committed. The default is every five
seconds. In general, committing more frequently adds some overhead but reduces the
number of duplicates that can occur when a consumer stops.
126 | Chapter 6: Reliable Data Delivery
Explicitly Committing Osets in Consumers
If you go with the automatic offset commits, you dont need to worry about explicitly
committing offsets. But you do need to think about how you will commit offsets if
you decide you need more control over the timing of offset commits—either in order
to minimize duplicates or because you are doing event processing outside the main
consumer poll loop.
We will not go over the mechanics and APIs involved in committing offsets here,
since they were covered in great depth in Chapter 4. Instead, we will review impor‐
tant considerations when developing a consumer to handle data reliably. Well start
with the simple and perhaps obvious points and move on to more complex patterns.
Always commit osets after events were processed
If you do all the processing within the poll loop and don’t maintain state between poll
loops (e.g., for aggregation), this should be easy. You can use the auto-commit config‐
uration or commit events at the end of the poll loop.
Commit frequency is a trade-o between performance and number of duplicates in the event
of a crash
Even in the simplest case where you do all the processing within the poll loop and
don’t maintain state between poll loops, you can choose to commit multiple times
within a loop (perhaps even after every event) or choose to only commit every several
loops. Committing has some performance overhead (similar to produce with
acks=all), so it all depends on the trade-offs that work for you.
Make sure you know exactly what osets you are committing
A common pitfall when committing in the middle of the poll loop is accidentally
committing the last offset read when polling and not the last offset processed.
Remember that it is critical to always commit offsets for messages after they were
processed—committing offsets for messages read but not processed can lead to the
consumer missing messages. Chapter 4 has examples that show how to do just that.
Rebalances
When designing your application, remember that consumer rebalances will happen
and you need to handle them properly. Chapter 4 contains a few examples, but the
bigger picture is that this usually involves committing offsets before partitions are
revoked and cleaning any state you maintain when you are assigned new partitions.
Using Consumers in a Reliable System | 127
Consumers may need to retry
In some cases, after calling poll and processing records, some re