Amazon Kinesis Data Analytics Developer Guide 2.Dev

User Manual:

Open the PDF directly: View PDF PDF.
Page Count: 296

DownloadAmazon Kinesis Data Analytics - Developer Guide 2.Dev
Open PDF In BrowserView PDF
Amazon Kinesis Data Analytics
Developer Guide

Amazon Kinesis Data Analytics Developer Guide

Amazon Kinesis Data Analytics: Developer Guide

Copyright © 2019 Amazon Web Services, Inc. and/or its affiliates. All rights reserved.
Amazon's trademarks and trade dress may not be used in connection with any product or service that is not Amazon's, in any manner
that is likely to cause confusion among customers, or in any manner that disparages or discredits Amazon. All other trademarks not
owned by Amazon are the property of their respective owners, who may or may not be affiliated with, connected to, or sponsored by
Amazon.

Amazon Kinesis Data Analytics Developer Guide

Table of Contents
What Is Amazon Kinesis Data Analytics? ............................................................................................... 1
When Should I Use Amazon Kinesis Data Analytics? ........................................................................ 1
Are You a First-Time User of Amazon Kinesis Data Analytics? ........................................................... 1
How It Works .................................................................................................................................... 3
Input ........................................................................................................................................ 5
Configuring a Streaming Source ........................................................................................... 5
Configuring a Reference Source ........................................................................................... 7
Working with JSONPath ...................................................................................................... 9
Mapping Streaming Source Elements to SQL Input Columns .................................................. 12
Using the Schema Discovery Feature on Streaming Data ....................................................... 16
Using the Schema Discovery Feature on Static Data .............................................................. 18
Preprocessing Data Using a Lambda Function ...................................................................... 21
Parallelizing Input Streams for Increased Throughput ........................................................... 27
Application Code ...................................................................................................................... 30
Output .................................................................................................................................... 32
Creating an Output Using the AWS CLI ............................................................................... 32
Using a Lambda Function as Output ................................................................................... 33
Application Output Delivery Model ..................................................................................... 39
Error Handling ......................................................................................................................... 40
Reporting Errors Using an In-Application Error Stream .......................................................... 40
Granting Permissions ................................................................................................................ 41
Trust Policy ..................................................................................................................... 41
Permissions Policy ............................................................................................................ 41
Auto Scaling Applications .......................................................................................................... 43
Getting Started ................................................................................................................................ 45
Step 1: Set Up an Account ........................................................................................................ 45
Sign Up for AWS .............................................................................................................. 45
Create an IAM User .......................................................................................................... 46
Next Step ........................................................................................................................ 46
Step 2: Set Up the AWS CLI ....................................................................................................... 46
Next Step ........................................................................................................................ 47
Step 3: Create Your Starter Analytics Application ........................................................................ 47
Step 3.1: Create an Application .......................................................................................... 49
Step 3.2: Configure Input .................................................................................................. 50
Step 3.3: Add Real-Time Analytics (Add Application Code) ..................................................... 52
Step 3.4: (Optional) Update the Application Code ................................................................. 54
Step 4 (Optional) Edit the Schema and SQL Code Using the Console ............................................... 56
Working with the Schema Editor ........................................................................................ 56
Working with the SQL Editor ............................................................................................. 63
Streaming SQL Concepts ................................................................................................................... 66
In-Application Streams and Pumps ............................................................................................. 66
Timestamps and the ROWTIME Column ...................................................................................... 67
Understanding Various Times in Streaming Analytics ............................................................ 67
Continuous Queries .................................................................................................................. 69
Windowed Queries ................................................................................................................... 70
Stagger Windows ............................................................................................................. 70
Tumbling Windows ........................................................................................................... 75
Sliding Windows ............................................................................................................... 76
Stream Joins ............................................................................................................................ 80
Example 1: Report Orders Where There Are Trades Within One Minute of the Order Being
Placed ............................................................................................................................. 80
Examples ......................................................................................................................................... 82
Transforming Data .................................................................................................................... 82
Preprocessing Streams with Lambda ................................................................................... 82

iii

Amazon Kinesis Data Analytics Developer Guide

Transforming String Values ................................................................................................ 82
Transforming DateTime Values ........................................................................................... 96
Transforming Multiple Data Types ...................................................................................... 99
Windows and Aggregation ....................................................................................................... 104
Stagger Window ............................................................................................................. 104
Tumbling Window Using ROWTIME ................................................................................... 107
Tumbling Window Using an Event Timestamp .................................................................... 109
Most Frequently Occurring Values (TOP_K_ITEMS_TUMBLING) .............................................. 112
Aggregating Partial Results .............................................................................................. 114
Joins ..................................................................................................................................... 116
Example: Add Reference Data Source ................................................................................ 116
Machine Learning ................................................................................................................... 119
Detecting Anomalies ....................................................................................................... 119
Example: Detect Anomalies and Get an Explanation ............................................................ 125
Example: Detect Hotspots ................................................................................................ 129
Alerts and Errors .................................................................................................................... 139
Simple Alerts ................................................................................................................. 139
Throttled Alerts .............................................................................................................. 140
In-Application Error Stream ............................................................................................. 141
Solution Accelerators .............................................................................................................. 142
Real-Time Insights on AWS Account Activity ...................................................................... 142
Real-Time IoT Device Monitoring with Kinesis Data Analytics ................................................ 143
Real-Time Web Analytics with Kinesis Data Analytics ........................................................... 143
AWS Connected Vehicle Solution ...................................................................................... 143
Monitoring ..................................................................................................................................... 144
Monitoring Tools .................................................................................................................... 144
Automated Tools ............................................................................................................ 145
Manual Tools ................................................................................................................. 145
Monitoring with Amazon CloudWatch ....................................................................................... 145
Metrics and Dimensions ................................................................................................... 146
Viewing Metrics and Dimensions ...................................................................................... 147
Alarms .......................................................................................................................... 148
Logs .............................................................................................................................. 149
Limits ............................................................................................................................................ 154
Best Practices ................................................................................................................................. 155
Managing Applications ............................................................................................................ 155
Defining Input Schema ............................................................................................................ 156
Connecting to Outputs ............................................................................................................ 157
Authoring Application Code ..................................................................................................... 157
Testing Applications ................................................................................................................ 157
Setting up a Test Application ........................................................................................... 157
Testing Schema Changes ................................................................................................. 158
Testing Code Changes ..................................................................................................... 158
Troubleshooting ............................................................................................................................. 159
Unable to Run SQL Code ......................................................................................................... 159
Unable to Detect or Discover My Schema .................................................................................. 159
Reference Data is Out of Date .................................................................................................. 160
Application Not Writing to Destination ...................................................................................... 160
Important Application Health Parameters to Monitor .................................................................. 160
Invalid Code Errors When Running an Application ....................................................................... 161
Application is Writing Errors to the Error Stream ........................................................................ 161
Insufficient Throughput or High MillisBehindLatest ..................................................................... 161
Authentication and Access Control .................................................................................................... 163
Authentication ....................................................................................................................... 163
Access Control ........................................................................................................................ 164
Overview of Managing Access .................................................................................................. 164
Amazon Kinesis Data Analytics Resources and Operations .................................................... 165

iv

Amazon Kinesis Data Analytics Developer Guide

Understanding Resource Ownership ..................................................................................
Managing Access to Resources .........................................................................................
Specifying Policy Elements: Actions, Effects, and Principals ..................................................
Specifying Conditions in a Policy ......................................................................................
Using Identity-Based Policies (IAM Policies) ................................................................................
Permissions Required to Use the Amazon Kinesis Data Analytics Console ................................
AWS Managed (Predefined) Policies for Amazon Kinesis Data Analytics ..................................
Customer Managed Policy Examples .................................................................................
Amazon Kinesis Data Analytics API Permissions Reference ...........................................................
SQL Reference ...............................................................................................................................
API Reference .................................................................................................................................
Actions ..................................................................................................................................
AddApplicationCloudWatchLoggingOption .........................................................................
AddApplicationInput .......................................................................................................
AddApplicationInputProcessingConfiguration .....................................................................
AddApplicationOutput .....................................................................................................
AddApplicationReferenceDataSource .................................................................................
CreateApplication ...........................................................................................................
DeleteApplication ...........................................................................................................
DeleteApplicationCloudWatchLoggingOption .....................................................................
DeleteApplicationInputProcessingConfiguration ..................................................................
DeleteApplicationOutput .................................................................................................
DeleteApplicationReferenceDataSource ..............................................................................
DescribeApplication ........................................................................................................
DiscoverInputSchema ......................................................................................................
ListApplications ..............................................................................................................
StartApplication .............................................................................................................
StopApplication ..............................................................................................................
UpdateApplication ..........................................................................................................
Data Types ............................................................................................................................
ApplicationDetail ............................................................................................................
ApplicationSummary .......................................................................................................
ApplicationUpdate ..........................................................................................................
CloudWatchLoggingOption ..............................................................................................
CloudWatchLoggingOptionDescription ..............................................................................
CloudWatchLoggingOptionUpdate ....................................................................................
CSVMappingParameters ...................................................................................................
DestinationSchema .........................................................................................................
Input .............................................................................................................................
InputConfiguration ..........................................................................................................
InputDescription .............................................................................................................
InputLambdaProcessor ....................................................................................................
InputLambdaProcessorDescription ....................................................................................
InputLambdaProcessorUpdate ..........................................................................................
InputParallelism .............................................................................................................
InputParallelismUpdate ...................................................................................................
InputProcessingConfiguration ...........................................................................................
InputProcessingConfigurationDescription ...........................................................................
InputProcessingConfigurationUpdate .................................................................................
InputSchemaUpdate ........................................................................................................
InputStartingPositionConfiguration ...................................................................................
InputUpdate ..................................................................................................................
JSONMappingParameters ................................................................................................
KinesisFirehoseInput ........................................................................................................
KinesisFirehoseInputDescription ........................................................................................
KinesisFirehoseInputUpdate .............................................................................................
KinesisFirehoseOutput .....................................................................................................

v

165
165
167
167
168
168
169
170
173
175
176
176
177
179
182
185
188
191
196
198
200
202
204
206
210
214
216
218
220
223
225
228
229
230
231
232
233
234
235
237
238
240
241
242
243
244
245
246
247
248
249
250
252
253
254
255
256

Amazon Kinesis Data Analytics Developer Guide

KinesisFirehoseOutputDescription .....................................................................................
KinesisFirehoseOutputUpdate ...........................................................................................
KinesisStreamsInput ........................................................................................................
KinesisStreamsInputDescription ........................................................................................
KinesisStreamsInputUpdate ..............................................................................................
KinesisStreamsOutput .....................................................................................................
KinesisStreamsOutputDescription .....................................................................................
KinesisStreamsOutputUpdate ...........................................................................................
LambdaOutput ...............................................................................................................
LambdaOutputDescription ...............................................................................................
LambdaOutputUpdate .....................................................................................................
MappingParameters ........................................................................................................
Output ..........................................................................................................................
OutputDescription ..........................................................................................................
OutputUpdate ................................................................................................................
RecordColumn ................................................................................................................
RecordFormat .................................................................................................................
ReferenceDataSource ......................................................................................................
ReferenceDataSourceDescription .......................................................................................
ReferenceDataSourceUpdate ............................................................................................
S3Configuration .............................................................................................................
S3ReferenceDataSource ...................................................................................................
S3ReferenceDataSourceDescription ...................................................................................
S3ReferenceDataSourceUpdate .........................................................................................
SourceSchema ................................................................................................................
Document History ..........................................................................................................................
AWS Glossary .................................................................................................................................

vi

257
258
259
260
261
262
263
264
265
266
267
268
269
271
273
275
276
277
278
280
282
283
284
285
286
287
290

Amazon Kinesis Data Analytics Developer Guide
When Should I Use Amazon Kinesis Data Analytics?

What Is Amazon Kinesis Data
Analytics?
With Amazon Kinesis Data Analytics, you can process and analyze streaming data using standard SQL.
The service enables you to quickly author and run powerful SQL code against streaming sources to
perform time series analytics, feed real-time dashboards, and create real-time metrics.
To get started with Kinesis Data Analytics, you create a Kinesis data analytics application that
continuously reads and processes streaming data. The service supports ingesting data from Amazon
Kinesis Data Streams and Amazon Kinesis Data Firehose streaming sources. Then, you author your SQL
code using the interactive editor and test it with live streaming data. You can also configure destinations
where you want Kinesis Data Analytics to send the results.
Kinesis Data Analytics supports Amazon Kinesis Data Firehose (Amazon S3, Amazon Redshift, and
Amazon Elasticsearch Service), AWS Lambda, and Amazon Kinesis Data Streams as destinations.

When Should I Use Amazon Kinesis Data Analytics?
Amazon Kinesis Data Analytics enables you to quickly author SQL code that continuously reads,
processes, and stores data in near real time. Using standard SQL queries on the streaming data, you can
construct applications that transform and provide insights into your data. Following are some of example
scenarios for using Kinesis Data Analytics:
• Generate time-series analytics – You can calculate metrics over time windows, and then stream values
to Amazon S3 or Amazon Redshift through a Kinesis data delivery stream.
• Feed real-time dashboards – You can send aggregated and processed streaming data results
downstream to feed real-time dashboards.
• Create real-time metrics – You can create custom metrics and triggers for use in real-time monitoring,
notifications, and alarms.
For information about the SQL language elements that are supported by Kinesis Data Analytics, see
Amazon Kinesis Data Analytics SQL Reference.

Are You a First-Time User of Amazon Kinesis Data
Analytics?
If you are a first-time user of Amazon Kinesis Data Analytics, we recommend that you read the following
sections in order:
1. Read the How It Works section of this guide. This section introduces various Kinesis Data Analytics
components that you work with to create an end-to-end experience. For more information, see
Amazon Kinesis Data Analytics: How It Works (p. 3).
2. Try the Getting Started exercises. For more information, see Getting Started with Amazon Kinesis
Data Analytics (p. 45).
3. Explore the streaming SQL concepts. For more information, see Streaming SQL Concepts (p. 66).

1

Amazon Kinesis Data Analytics Developer Guide
Are You a First-Time User of
Amazon Kinesis Data Analytics?

4. Try additional examples. For more information, see Example Applications (p. 82).

2

Amazon Kinesis Data Analytics Developer Guide

Amazon Kinesis Data Analytics: How
It Works
An application is the primary resource in Amazon Kinesis Data Analytics that you can create in your
account. You can create and manage applications using the AWS Management Console or the Kinesis
Data Analytics API. Kinesis Data Analytics provides API operations to manage applications. For a list of
API operations, see Actions (p. 176).
Kinesis Data Analytics applications continuously read and process streaming data in real time. You write
application code using SQL to process the incoming streaming data and produce output. Then, Kinesis
Data Analytics writes the output to a configured destination. The following diagram illustrates a typical
application architecture.

Each application has a name, description, version ID, and status. Amazon Kinesis Data Analytics assigns
a version ID when you first create an application. This version ID is updated when you update any
application configuration. For example, if you add an input configuration, add or delete a reference
data source, add or delete an output configuration, or update application code, Kinesis Data Analytics
updates the current application version ID. Kinesis Data Analytics also maintains timestamps for when an
application was created and last updated.
In addition to these basic properties, each application consists of the following:
• Input – The streaming source for your application. You can select either a Kinesis data stream or
a Kinesis Data Firehose data delivery stream as the streaming source. In the input configuration,
you map the streaming source to an in-application input stream. The in-application stream is like a
continuously updating table upon which you can perform the SELECT and INSERT SQL operations.
In your application code, you can create additional in-application streams to store intermediate query
results.

3

Amazon Kinesis Data Analytics Developer Guide

You can optionally partition a single streaming source in multiple in-application input streams to
improve the throughput. For more information, see Limits (p. 154) and Configuring Application
Input (p. 5).

Amazon Kinesis Data Analytics provides a timestamp column in each application stream called
Timestamps and the ROWTIME Column (p. 67). You can use this column in time-based windowed
queries. For more information, see Windowed Queries (p. 70).

You can optionally configure a reference data source to enrich your input data stream within the
application. It results in an in-application reference table. You must store your reference data as
an object in your S3 bucket. When the application starts, Amazon Kinesis Data Analytics reads
the Amazon S3 object and creates an in-application table. For more information, see Configuring
Application Input (p. 5).

• Application code – A series of SQL statements that process input and produce output. You can write
SQL statements against in-application streams and reference tables. You can also write JOIN queries to
combine data from both of these sources.

For information about the SQL language elements that are supported by Kinesis Data Analytics, see
Amazon Kinesis Data Analytics SQL Reference.

In its simplest form, application code can be a single SQL statement that selects from a streaming
input and inserts results into a streaming output. It can also be a series of SQL statements where
output of one feeds into the input of the next SQL statement. Further, you can write application code
to split an input stream into multiple streams. You can then apply additional queries to process these
streams. For more information, see Application Code (p. 30).

• Output – In application code, query results go to in-application streams. In your application code,
you can create one or more in-application streams to hold intermediate results. You can then
optionally configure the application output to persist data in the in-application streams that hold
your application output (also referred to as in-application output streams) to external destinations.
External destinations can be a Kinesis Data Firehose delivery stream or a Kinesis data stream. Note the
following about these destinations:
• You can configure a Kinesis Data Firehose delivery stream to write results to Amazon S3, Amazon
Redshift, or Amazon Elasticsearch Service (Amazon ES).

• You can also write application output to a custom destination instead of Amazon S3 or Amazon
Redshift. To do that, you specify a Kinesis data stream as the destination in your output
configuration. Then, you configure AWS Lambda to poll the stream and invoke your Lambda
function. Your Lambda function code receives stream data as input. In your Lambda function code,
you can write the incoming data to your custom destination. For more information, see Using AWS
Lambda with Amazon Kinesis Data Analytics.
For more information, see Configuring Application Output (p. 32).

4

Amazon Kinesis Data Analytics Developer Guide
Input

In addition, note the following:
• Amazon Kinesis Data Analytics needs permissions to read records from a streaming source and write
application output to the external destinations. You use IAM roles to grant these permissions.

• Kinesis Data Analytics automatically provides an in-application error stream for each application. If
your application has issues while processing certain records (for example, because of a type mismatch
or late arrival), that record is written to the error stream. You can configure application output to direct
Kinesis Data Analytics to persist the error stream data to an external destination for further evaluation.
For more information, see Error Handling (p. 40).

• Amazon Kinesis Data Analytics ensures that your application output records are written to the
configured destination. It uses an "at least once" processing and delivery model, even if you experience
an application interruption. For more information, see Delivery Model for Persisting Application
Output to an External Destination (p. 39).
Topics
• Configuring Application Input (p. 5)
• Application Code (p. 30)
• Configuring Application Output (p. 32)
• Error Handling (p. 40)
• Granting Amazon Kinesis Data Analytics Permissions to Access Streaming and Reference Sources
(Creating an IAM Role) (p. 41)
• Automatically Scaling Applications to Increase Throughput (p. 43)

Configuring Application Input
Your Amazon Kinesis Data Analytics application can receive input from a single streaming source and,
optionally, use one reference data source. For more information, see Amazon Kinesis Data Analytics: How
It Works (p. 3). The sections in this topic describe the application input sources.
Topics
• Configuring a Streaming Source (p. 5)
• Configuring a Reference Source (p. 7)
• Working with JSONPath (p. 9)
• Mapping Streaming Source Elements to SQL Input Columns (p. 12)
• Using the Schema Discovery Feature on Streaming Data (p. 16)
• Using the Schema Discovery Feature on Static Data (p. 18)
• Preprocessing Data Using a Lambda Function (p. 21)
• Parallelizing Input Streams for Increased Throughput (p. 27)

Configuring a Streaming Source
At the time that you create an application, you specify a streaming source. You can also modify an input
after you create the application. Amazon Kinesis Data Analytics supports the following streaming sources
for your application:
5

Amazon Kinesis Data Analytics Developer Guide
Configuring a Streaming Source

• A Kinesis data stream
• A Kinesis Data Firehose delivery stream

Note

If the Kinesis data stream is encrypted, Kinesis Data Analytics accesses the data in the encrypted
stream seamlessly with no further configuration needed. Kinesis Data Analytics does not store
unencrypted data read from Kinesis Data Streams. For more information, see What Is ServerSide Encryption For Kinesis Data Streams?.
Kinesis Data Analytics continuously polls the streaming source for new data and ingests it in inapplication streams according to the input configuration. Your application code can query the inapplication stream. As part of input configuration you provide the following:
• Streaming source – You provide the Amazon Resource Name (ARN) of the stream and an IAM role that
Kinesis Data Analytics can assume to access the stream on your behalf.
• In-application stream name prefix – When you start the application, Kinesis Data Analytics creates
the specified in-application stream. In your application code, you access the in-application stream
using this name.
You can optionally map a streaming source to multiple in-application streams. For more information,
see Limits (p. 154). In this case, Amazon Kinesis Data Analytics creates the specified number of inapplication streams with names as follows: prefix_001, prefix_002, and prefix_003. By default,
Kinesis Data Analytics maps the streaming source to one in-application stream named prefix_001.
There is a limit on the rate that you can insert rows in an in-application stream. Therefore, Kinesis
Data Analytics supports multiple such in-application streams so that you can bring records into your
application at a much faster rate. If you find that your application is not keeping up with the data in
the streaming source, you can add units of parallelism to improve performance.
• Mapping schema – You describe the record format (JSON, CSV) on the streaming source. You also
describe how each record on the stream maps to columns in the in-application stream that is created.
This is where you provide column names and data types.

Note

Kinesis Data Analytics adds quotation marks around the identifiers (stream name and column
names) when creating the input in-application stream. When querying this stream and the
columns, you must specify them in quotation marks using the same casing (matching lowercase
and uppercase letters exactly). For more information about identifiers, see Identifiers in the
Amazon Kinesis Data Analytics SQL Reference.
You can create an application and configure inputs in the Amazon Kinesis Data Analytics console. The
console then makes the necessary API calls. You can configure application input when you create a
new application API or add input configuration to an existing application. For more information, see
CreateApplication (p. 191) and AddApplicationInput (p. 179). The following is the input configuration
part of the Createapplication API request body:
"Inputs": [
{

"InputSchema": {
"RecordColumns": [
{
"Mapping": "string",
"Name": "string",
"SqlType": "string"
}
],
"RecordEncoding": "string",
"RecordFormat": {

6

Amazon Kinesis Data Analytics Developer Guide
Configuring a Reference Source
"MappingParameters": {
"CSVMappingParameters": {
"RecordColumnDelimiter": "string",
"RecordRowDelimiter": "string"
},
"JSONMappingParameters": {
"RecordRowPath": "string"
}
},
"RecordFormatType": "string"

]

}

}
},
"KinesisFirehoseInput": {
"ResourceARN": "string",
"RoleARN": "string"
},
"KinesisStreamsInput": {
"ResourceARN": "string",
"RoleARN": "string"
},
"Name": "string"

Configuring a Reference Source
You can also optionally add a reference data source to an existing application to enrich the data coming
in from streaming sources. You must store reference data as an object in your Amazon S3 bucket. When
the application starts, Amazon Kinesis Data Analytics reads the Amazon S3 object and creates an inapplication reference table. Your application code can then join it with an in-application stream.
You store reference data in the Amazon S3 object using supported formats (CSV, JSON). For example,
suppose that your application performs analytics on stock orders. Assume the following record format
on the streaming source:
Ticker, SalePrice, OrderId
AMZN
XYZ
...

$700
$250

1003
1004

In this case, you might then consider maintaining a reference data source to provide details for each
stock ticker, such as company name.
Ticker, Company
AMZN, Amazon
XYZ, SomeCompany
...

You can add an application reference data source either with the API or with the console. Amazon Kinesis
Data Analytics provides the following API actions to manage reference data sources:
• AddApplicationReferenceDataSource (p. 188)
• UpdateApplication (p. 220)
For information about adding reference data using the console, see Example: Adding Reference Data to a
Kinesis Data Analytics Application (p. 116).
Note the following:

7

Amazon Kinesis Data Analytics Developer Guide
Configuring a Reference Source

• If the application is running, Kinesis Data Analytics creates an in-application reference table, and then
loads the reference data immediately.
• If the application is not running (for example, it's in the ready state), Kinesis Data Analytics saves only
the updated input configuration. When the application starts running, Kinesis Data Analytics loads the
reference data in your application as a table.
Suppose that you want to refresh the data after Kinesis Data Analytics creates the in-application
reference table. Perhaps you updated the Amazon S3 object, or you want to use a different Amazon
S3 object. In this case, you can either explicitly call UpdateApplication (p. 220), or choose Actions,
Synchronize reference data table in the console. Kinesis Data Analytics does not refresh the inapplication reference table automatically.
There is a limit on the size of the Amazon S3 object that you can create as a reference data source. For
more information, see Limits (p. 154). If the object size exceeds the limit, Kinesis Data Analytics can't
load the data. The application state appears as running, but the data is not being read.
When you add a reference data source, you provide the following information:
• S3 bucket and object key name – In addition to the bucket name and object key, you also provide an
IAM role that Kinesis Data Analytics can assume to read the object on your behalf.
• In-application reference table name – Kinesis Data Analytics creates this in-application table and
populates it by reading the Amazon S3 object. This is the table name you specify in your application
code.
• Mapping schema – You describe the record format (JSON, CSV), encoding of data stored in the
Amazon S3 object. You also describe how each data element maps to columns in the in-application
reference table.
The following shows the request body in the AddApplicationReferenceDataSource API request.
{

"applicationName": "string",
"CurrentapplicationVersionId": number,
"ReferenceDataSource": {
"ReferenceSchema": {
"RecordColumns": [
{
"IsDropped": boolean,
"Mapping": "string",
"Name": "string",
"SqlType": "string"
}
],
"RecordEncoding": "string",
"RecordFormat": {
"MappingParameters": {
"CSVMappingParameters": {
"RecordColumnDelimiter": "string",
"RecordRowDelimiter": "string"
},
"JSONMappingParameters": {
"RecordRowPath": "string"
}
},
"RecordFormatType": "string"
}
},
"S3ReferenceDataSource": {
"BucketARN": "string",
"FileKey": "string",

8

Amazon Kinesis Data Analytics Developer Guide
Working with JSONPath

}

}

"ReferenceRoleARN": "string"
},
"TableName": "string"

Working with JSONPath
JSONPath is a standardized way to query elements of a JSON object. JSONPath uses path expressions to
navigate elements, nested elements, and arrays in a JSON document. For more information about JSON,
see Introducing JSON.

Accessing JSON Elements with JSONPath
Following, you can find how to use JSONPath expressions to access various elements in JSON-formatted
data. For the examples in this section, assume that the source stream contains a JSON record in the
following format.
{

}

"customerName":"John Doe",
"address":
{
"streetAddress":
[
"number":"123",
"street":"AnyStreet"
],
"city":"Anytown"
}
"orders":
[
{ "orderId":"23284", "itemName":"Widget", "itemPrice":"33.99" },
{ "orderId":"63122", "itemName":"Gadget", "itemPrice":"22.50" },
{ "orderId":"77284", "itemName":"Sprocket", "itemPrice":"12.00" }
]

Accessing JSON Elements
To query an element in JSON data using JSONPath, use the following syntax. Here, $ represents the root
of the data hierarchy and elementName is the name of the element node to query.
$.elementName

The following expression queries the customerName element in the preceding JSON example.
$.customerName

The preceding expression returns the following from the preceding JSON record.
John Doe

Note

Path expressions are case sensitive. The expression $.Name returns null from the preceding
JSON example.

9

Amazon Kinesis Data Analytics Developer Guide
Working with JSONPath

Note

If no element appears at the location where the path expression specifies, the expression returns
null. The following expression returns null from the preceding JSON example, because there
is no matching element.
$.customerId

Accessing Nested JSON Elements
To query a nested JSON element, use the following syntax.
$.parentElement.element

The following expression queries the city element in the preceding JSON example.
$.address.city

The preceding expression returns the following from the preceding JSON record.
Anytown

You can query further levels of subelements using the following syntax.
$.parentElement.element.subElement

The following expression queries the street element in the preceding JSON example.
$.address.streetAddress.street

The preceding expression returns the following from the preceding JSON record.
AnyStreet

Accessing Arrays
Arrays are queried using an array index expression inside square brackets ([]). Currently, the only index
expression supported is 0:, meaning that all the elements in the array are returned.
The format of the data returned depends on whether the array index expression is the last expression in
the path:
• When the array index is the last expression in the path expression, all of the contents of the array are
returned as a single field in a single data row.
• When there is a nested expression after the array index expression, the array is "flattened." In other
words, each element in the array is returned as a separate data row.
To query the entire contents of an array as a single row, use the following syntax.
$.arrayObject[0:]

The following expression queries the entire contents of the orders element in the preceding JSON
example. It returns the array contents in a single column in a single row.

10

Amazon Kinesis Data Analytics Developer Guide
Working with JSONPath

$.orders[0:]

The preceding expression returns the following from the preceding JSON record.
[{"orderId":"23284","itemName":"Widget","itemPrice":"33.99"},
{"orderId":"61322","itemName":"Gadget","itemPrice":"22.50"},
{"orderId":"77284","itemName":"Sprocket","itemPrice":"12.00"}]

To query the individual elements in an array as separate rows, use the following syntax.
$.arrayObject[0:].element

The following expression queries the orderId elements in the preceding JSON example, and returns
each array element as a separate row.
$.orders[0:].orderId

The preceding expression returns the following from the preceding JSON record, with each data item
returned as a separate row.
23284
63122
77284

Note

If expressions that query nonarray elements are included in a schema that queries individual
array elements, the nonarray elements are repeated for each element in the array. For example,
suppose that a schema for the preceding JSON example includes the following expressions:
• $.customerName
• $.orders[0:].orderId
In this case, the returned data rows from the sample input stream element resemble the
following, with the name element repeated for every orderId element.
John Doe

23284

John Doe

63122

John Doe

77284

Note

The following limitations apply to array expressions in Amazon Kinesis Data Analytics:
• Only one level of dereferencing is supported in an array expression. The following expression
format is not supported.
$.arrayObject[0:].element[0:].subElement

11

Amazon Kinesis Data Analytics Developer Guide
Mapping Streaming Source Elements to SQL Input Columns

• Only one array can be flattened in a schema. Multiple arrays can be referenced—returned as
one row containing all of the elements in the array. However, only one array can have each of
its elements returned as individual rows.
A schema containing elements in the following format is valid. This format returns the
contents of the second array as a single column, repeated for every element in the first array.
$.arrayObjectOne[0:].element
$.arrayObjectTwo[0:]

A schema containing elements in the following format is not valid.
$.arrayObjectOne[0:].element
$.arrayObjectTwo[0:].element

Other Considerations
Additional considerations for working with JSONPath are as follows:
• If no arrays are accessed by an individual element in the JSONPath expression, then a single row is
created for each JSON record processed. Every JSONPath expression corresponds to a single column.
• When an array is flattened, any missing elements result in a null value being created in the inapplication stream.
• An array is always flattened to at least one row. If no values would be returned (that is, the array is
empty or none of its elements are queried), a single row with all null values is returned.
The following expression returns records with null values from the preceding JSON example, because
there is no matching element at the specified path.
$.orders[0:].itemId

The preceding expression returns the following from the preceding JSON example record.

null
null
null

Related Topics
• Introducing JSON

Mapping Streaming Source Elements to SQL Input
Columns
With Amazon Kinesis Data Analytics, you can process and analyze streaming data in either JSON or CSV
formats using standard SQL.

12

Amazon Kinesis Data Analytics Developer Guide
Mapping Streaming Source Elements to SQL Input Columns

• To process and analyze streaming CSV data, you assign column names and data types for the columns
of the input stream. Your application imports one column from the input stream per column definition,
in order.
You don't have to include all of the columns in the application input stream, but you cannot skip
columns from the source stream. For example, you can import the first three columns from an input
stream containing five elements, but you cannot import only columns 1, 2, and 4.
• To process and analyze streaming JSON data, you use JSONPath expressions to map JSON elements
from a streaming source to SQL columns in an input stream. For more information about using
JSONPath with Amazon Kinesis Data Analytics, see Working with JSONPath (p. 9). The columns in
the SQL table have data types that are mapped from JSON types. For supported data types, see Data
Types. For details about converting JSON data to SQL data, see Mapping JSON Data Types to SQL Data
Types (p. 15).
For more information about how to configure input steams, see Configuring Application Input (p. 5).

Mapping JSON Data to SQL Columns
You can map JSON elements to input columns using the AWS Management Console or the Kinesis Data
Analytics API.
• To map elements to columns using the console, see Working with the Schema Editor (p. 56).
• To map elements to columns using the Kinesis Data Analytics API, see the following section.
To map JSON elements to columns in the in-application input stream, you need a schema with the
following information for each column:
• Source Expression: The JSONPath expression that identifies the location of the data for the column.
• Column Name: The name that your SQL queries use to reference the data.
• Data Type: The SQL data type for the column.

Using the API
To map elements from a streaming source to input columns, you can use the Kinesis Data Analytics API
CreateApplication (p. 191) action. To create the in-application stream, specify a schema to transform
your data into a schematized version used in SQL. The CreateApplication (p. 191) action configures
your application to receive input from a single streaming source. To map JSON elements or CSV
columns to SQL columns, you create a RecordColumn (p. 275) object in the SourceSchema (p. 286)
RecordColumns array. The RecordColumn (p. 275) object has the following schema:

{

}

"Mapping": "String",
"Name": "String",
"SqlType": "String"

The fields in the RecordColumn (p. 275) object have the following values:
• Mapping: The JSONPath expression that identifies the location of the data in the input stream record.
This value is not present for an input schema for a source stream in CSV format.
• Name: The column name in the in-application SQL data stream.
13

Amazon Kinesis Data Analytics Developer Guide
Mapping Streaming Source Elements to SQL Input Columns

• SqlType: The data type of the data in the in-application SQL data stream.

JSON Input Schema Example
The following example demonstrates the format of the InputSchema value for a JSON schema.
"InputSchema": {
"RecordColumns": [
{
"SqlType": "VARCHAR(4)",
"Name": "TICKER_SYMBOL",
"Mapping": "$.TICKER_SYMBOL"
},
{
"SqlType": "VARCHAR(16)",
"Name": "SECTOR",
"Mapping": "$.SECTOR"
},
{
"SqlType": "TINYINT",
"Name": "CHANGE",
"Mapping": "$.CHANGE"
},
{
"SqlType": "DECIMAL(5,2)",
"Name": "PRICE",
"Mapping": "$.PRICE"
}
],
"RecordFormat": {
"MappingParameters": {
"JSONMappingParameters": {
"RecordRowPath": "$"
}
},
"RecordFormatType": "JSON"
},
"RecordEncoding": "UTF-8"
}

CSV Input Schema Example
The following example demonstrates the format of the InputSchema value for a schema in commaseparated value (CSV) format.
"InputSchema": {
"RecordColumns": [
{
"SqlType": "VARCHAR(16)",
"Name": "LastName"
},
{
"SqlType": "VARCHAR(16)",
"Name": "FirstName"
},
{
"SqlType": "INTEGER",
"Name": "CustomerId"
}
],
"RecordFormat": {

14

Amazon Kinesis Data Analytics Developer Guide
Mapping Streaming Source Elements to SQL Input Columns
"MappingParameters": {
"CSVMappingParameters": {
"RecordColumnDelimiter": ",",
"RecordRowDelimiter": "\n"
}
},
"RecordFormatType": "CSV"

}

},
"RecordEncoding": "UTF-8"

Mapping JSON Data Types to SQL Data Types
JSON data types are converted to corresponding SQL data types according to the application's input
schema. For information about supported SQL data types, see Data Types. Amazon Kinesis Data
Analytics converts JSON data types to SQL data types according to the following rules.

Null Literal
A null literal in the JSON input stream (that is, "City":null) converts to a SQL null regardless of
destination data type.

Boolean Literal
A Boolean literal in the JSON input stream (that is, "Contacted":true) converts to SQL data as
follows:
• Numeric (DECIMAL, INT, and so on): true converts to 1; false converts to 0.
• Binary (BINARY or VARBINARY):
• true: Result has lowest bit set and remaining bits cleared.
• false: Result has all bits cleared.
Conversion to VARBINARY results in a value 1 byte in length.
• BOOLEAN: Converts to the corresponding SQL BOOLEAN value.
• Character (CHAR or VARCHAR): Converts to the corresponding string value (true or false). The value
is truncated to fit the length of the field.
• Datetime (DATE, TIME, or TIMESTAMP): Conversion fails and a coercion error is written to the error
stream.

Number
A number literal in the JSON input stream (that is, "CustomerId":67321) converts to SQL data as
follows:
• Numeric (DECIMAL, INT, and so on): Converts directly. If the converted value exceeds the size or
precision of the target data type (that is, converting 123.4 to INT), conversion fails and a coercion
error is written to the error stream.
• Binary (BINARY or VARBINARY): Conversion fails and a coercion error is written to the error stream.
• BOOLEAN:
• 0: Converts to false.
• All other numbers: Converts to true.
• Character (CHAR or VARCHAR): Converts to a string representation of the number.
15

Amazon Kinesis Data Analytics Developer Guide
Using the Schema Discovery Feature on Streaming Data

• Datetime (DATE, TIME, or TIMESTAMP): Conversion fails and a coercion error is written to the error
stream.

String
A string value in the JSON input stream (that is, "CustomerName":"John Doe") converts to SQL data
as follows:
• Numeric (DECIMAL, INT, and so on): Amazon Kinesis Data Analytics attempts to convert the value to
the target data type. If the value cannot be converted, conversion fails and a coercion error is written
to the error stream.
• Binary (BINARY or VARBINARY): If the source string is a valid binary literal (that is, X'3F67A23A', with
an even number of f), the value is converted to the target data type. Otherwise, conversion fails and a
coercion error is written to the error stream.
• BOOLEAN: If the source string is "true", converts to true. This comparison is case-insensitive.
Otherwise, converts to false.
• Character (CHAR or VARCHAR): Converts to the string value in the input. If the value is longer than the
target data type, it is truncated and no error is written to the error stream.
• Datetime (DATE, TIME, or TIMESTAMP): If the source string is in a format that can be converted to the
target value, the value is converted. Otherwise, conversion fails and a coercion error is written to the
error stream.
Valid datetime formats include:
• "1992-02-14"
• "1992-02-14 18:35:44.0"

Array or Object
An array or object in the JSON input stream converts to SQL data as follows:
• Character (CHAR or VARCHAR): Converts to the source text of the array or object. See Accessing
Arrays (p. 10).
• All other data types: Conversion fails and a coercion error is written to the error stream.
For an example of a JSON array, see Working with JSONPath (p. 9).

Related Topics
• Configuring Application Input (p. 5)
• Data Types
• Working with the Schema Editor (p. 56)
• CreateApplication (p. 191)
• RecordColumn (p. 275)
• SourceSchema (p. 286)

Using the Schema Discovery Feature on Streaming
Data
Providing an input schema that describes how records on the streaming input map to an in-application
stream can be cumbersome and error prone. You can use the DiscoverInputSchema (p. 210) API (called

16

Amazon Kinesis Data Analytics Developer Guide
Using the Schema Discovery Feature on Streaming Data

the discovery API) to infer a schema. Using random samples of records on the streaming source, the API
can infer a schema (that is, column names, data types, and position of the data element in the incoming
data).

Note

To use the Discovery API to generate a schema from a file stored in Amazon S3, see Using the
Schema Discovery Feature on Static Data (p. 18).
The console uses the Discovery API to generate a schema for a specified streaming source. Using the
console, you can also update the schema, including adding or removing columns, changing column
names or data types, and so on. However, make changes carefully to ensure that you do not create an
invalid schema.
After you finalize a schema for your in-application stream, there are functions you can use to manipulate
string and datetime values. You can use these functions in your application code when working with
rows in the resulting in-application stream. For more information, see Example: Transforming DateTime
Values (p. 96).

Column Naming During Schema Discovery
During schema discovery, Amazon Kinesis Data Analytics tries to retain as much of the original column
name as possible from the streaming input source, except in the following cases:
• The source stream column name is a reserved SQL keyword, such as TIMESTAMP, USER, VALUES, or
YEAR.
• The source stream column name contains unsupported characters. Only letters, numbers, and the
underscore character ( _ ) are supported.
• The source stream column name begins with a number.
• The source stream column name is longer than 100 characters.
If a column is renamed, the renamed schema column name begins with COL_. In some cases, none of
the original column name can be retained—for example, if the entire name is unsupported characters.
In such a case, the column is named COL_#, with # being a number indicating the column's place in the
column order.
After discovery completes, you can update the schema using the console to add or remove columns, or
change column names, data types, or data size.

Examples of Discovery-Suggested Column Names
Source Stream Column Name

Discovery-Suggested Column Name

USER

COL_USER

USER@DOMAIN

COL_USERDOMAIN

@@

COL_0

Schema Discovery Issues
What happens if Kinesis Data Analytics does not infer a schema for a given streaming source?
Kinesis Data Analytics infers your schema for common formats, such as CSV and JSON, which are UTF-8
encoded. Kinesis Data Analytics supports any UTF-8 encoded records (including raw text like application

17

Amazon Kinesis Data Analytics Developer Guide
Using the Schema Discovery Feature on Static Data

logs and records) with a custom column and row delimiter. If Kinesis Data Analytics doesn't infer a
schema, you can define a schema manually using the schema editor on the console (or using the API).
If your data does not follow a pattern (which you can specify using the schema editor), you can define
a schema as a single column of type VARCHAR(N), where N is the largest number of characters you
expect your record to include. From there, you can use string and date-time manipulation to structure
your data after it is in an in-application stream. For examples, see Example: Transforming DateTime
Values (p. 96).

Using the Schema Discovery Feature on Static Data
The schema discovery feature can generate a schema from either the data in a stream or data in a static
file that is stored in an Amazon S3 bucket. Suppose that you want to generate a schema for a Kinesis
Data Analytics application for reference purposes or when live streaming data isn't available. You can use
the schema discovery feature on a static file that contains a sample of the data in the expected format of
your streaming or reference data. Kinesis Data Analytics can run schema discovery on sample data from a
JSON or CSV file that's stored in an Amazon S3 bucket. Using schema discovery on a data file uses either
the console, or the DiscoverInputSchema (p. 210) API with the S3Configuration parameter specified.

Running Schema Discovery Using the Console
To run discovery on a static file using the console, do the following:
1.

Add a reference data object to an S3 bucket.

2.

Choose Connect reference data in the application's main page in the Kinesis Data Analytics console.

3.

Provide the bucket, path and IAM role data for accessing the Amazon S3 object containing the
reference data.

4.

Choose Discover schema.

For more information on how to add reference data and discover schema in the console, see Example:
Adding Reference Data to a Kinesis Data Analytics Application (p. 116).

Running Schema Discovery Using the API
To run discovery on a static file using the API, you provide the API with an S3Configuration structure
with the following information:
• BucketARN: The Amazon Resource Name (ARN) of the Amazon S3 bucket that contains the file.
For the format of an Amazon S3 bucket ARN, see Amazon Resource Names (ARNs) and AWS Service
Namespaces: Amazon Simple Storage Service (Amazon S3).
• RoleARN: The ARN of an IAM role with the AmazonS3ReadOnlyAccess policy. For information about
how to add a policy to a role, see Modifying a Role.
• FileKey: The file name of the object.

To generate a schema from an Amazon S3 object using the DiscoverInputSchema API
1.

Make sure that you have the AWS CLI set up. For more information, see Step 2: Set Up the AWS
Command Line Interface (AWS CLI) (p. 46) in the Getting Started section.

2.

Create a file named data.csv with the following contents:
year,month,state,producer_type,energy_source,units,consumption
2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615

18

Amazon Kinesis Data Analytics Developer Guide
Using the Schema Discovery Feature on Static Data
2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535
2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890
2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601
2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681

3.

Sign in to the Amazon S3 console at https://console.aws.amazon.com/s3/.

4.

Create an Amazon S3 bucket and upload the data.csv file you created. Note the ARN of the
created bucket. For information about creating an Amazon S3 bucket and uploading a file, see
Getting Started with Amazon Simple Storage Service.

5.

Open the IAM console at https://console.aws.amazon.com/iam/. Create a role with the
AmazonS3ReadOnlyAccess policy. Note the ARN of the new role. For information about creating a
role, see Creating a Role to Delegate Permissions to an AWS Service. For information about how to
add a policy to a role, see Modifying a Role.

6.

Run the following DiscoverInputSchema command in the AWS CLI, substituting the ARNs for
your Amazon S3 bucket and IAM role:
$aws kinesisanalytics discover-input-schema --s3-configuration '{ "RoleARN":
"arn:aws:iam::123456789012:role/service-role/your-IAM-role", "BucketARN":
"arn:aws:s3:::your-bucket-name", "FileKey": "data.csv" }'

7.

The response looks similar to the following:
{

"InputSchema": {
"RecordEncoding": "UTF-8",
"RecordColumns": [
{
"SqlType": "INTEGER",
"Name": "COL_year"
},
{
"SqlType": "INTEGER",
"Name": "COL_month"
},
{
"SqlType": "VARCHAR(4)",
"Name": "state"
},
{
"SqlType": "VARCHAR(64)",
"Name": "producer_type"
},
{
"SqlType": "VARCHAR(4)",
"Name": "energy_source"
},
{
"SqlType": "VARCHAR(16)",
"Name": "units"
},
{
"SqlType": "INTEGER",
"Name": "consumption"
}
],
"RecordFormat": {
"RecordFormatType": "CSV",
"MappingParameters": {
"CSVMappingParameters": {
"RecordRowDelimiter": "\r\n",
"RecordColumnDelimiter": ","
}

19

Amazon Kinesis Data Analytics Developer Guide
Using the Schema Discovery Feature on Static Data

}

}

},
"RawInputRecords": [
"year,month,state,producer_type,energy_source,units,consumption
\r\n2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615\r
\n2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535\r
\n2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890\r
\n2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601\r
\n2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681"
],
"ParsedInputRecords": [
[
null,
null,
"state",
"producer_type",
"energy_source",
"units",
null
],
[
"2001",
"1",
"AK",
"TotalElectricPowerIndustry",
"Coal",
"ShortTons",
"47615"
],
[
"2001",
"1",
"AK",
"ElectricGeneratorsElectricUtilities",
"Coal",
"ShortTons",
"16535"
],
[
"2001",
"1",
"AK",
"CombinedHeatandPowerElectricPower",
"Coal",
"ShortTons",
"22890"
],
[
"2001",
"1",
"AL",
"TotalElectricPowerIndustry",
"Coal",
"ShortTons",
"3020601"
],
[
"2001",
"1",
"AL",
"ElectricGeneratorsElectricUtilities",
"Coal",
"ShortTons",
"2987681"
]

20

Amazon Kinesis Data Analytics Developer Guide
Preprocessing Data Using a Lambda Function
]

}

Preprocessing Data Using a Lambda Function
If the data in your stream needs format conversion, transformation, enrichment, or filtering, you can
preprocess the data using an AWS Lambda function. You can do this before your application SQL code
executes or before your application creates a schema from your data stream.
Using a Lambda function for preprocessing records is useful in the following scenarios:
• Transforming records from other formats (such as KPL or GZIP) into formats that Kinesis Data Analytics
can analyze. Kinesis Data Analytics currently supports JSON or CSV data formats.
• Expanding data into a format that is more accessible for operations such as aggregation or anomaly
detection. For instance, if several data values are stored together in a string, you can expand the data
into separate columns.
• Data enrichment with other AWS services, such as extrapolation or error correction.
• Applying complex string transformation to record fields.
• Data filtering for cleaning up the data.

Using a Lambda Function for Preprocessing Records
When creating your Kinesis Data Analytics application, you enable Lambda preprocessing in the Connect
to a Source page.

To use a Lambda function to preprocess records in a Kinesis Data Analytics application
1.

Sign in to the AWS Management Console and open the Kinesis Data Analytics console at https://
console.aws.amazon.com/kinesisanalytics.

2.

On the Connect to a Source page for your application, choose Enabled in the Record preprocessing with AWS Lambda section.

3.

To use a Lambda function that you have already created, choose the function in the Lambda
function drop-down list.

4.

To create a new Lambda function from one of the Lambda preprocessing templates, choose the
template from the drop-down list. Then choose View