Amazon Kinesis Data Analytics Developer Guide 2.Dev

User Manual:

Open the PDF directly: View PDF PDF.
Page Count: 296 [warning: Documents this large are best viewed by clicking the View PDF Link!]

Amazon Kinesis Data Analytics
Developer Guide
Amazon Kinesis Data Analytics Developer Guide
Amazon Kinesis Data Analytics: Developer Guide
Copyright © 2019 Amazon Web Services, Inc. and/or its affiliates. All rights reserved.
Amazon's trademarks and trade dress may not be used in connection with any product or service that is not Amazon's, in any manner
that is likely to cause confusion among customers, or in any manner that disparages or discredits Amazon. All other trademarks not
owned by Amazon are the property of their respective owners, who may or may not be affiliated with, connected to, or sponsored by
Amazon.
Amazon Kinesis Data Analytics Developer Guide
Table of Contents
What Is Amazon Kinesis Data Analytics? ............................................................................................... 1
When Should I Use Amazon Kinesis Data Analytics? ........................................................................ 1
Are You a First-Time User of Amazon Kinesis Data Analytics? ........................................................... 1
How It Works .................................................................................................................................... 3
Input ........................................................................................................................................ 5
Conguring a Streaming Source ........................................................................................... 5
Conguring a Reference Source ........................................................................................... 7
Working with JSONPath ...................................................................................................... 9
Mapping Streaming Source Elements to SQL Input Columns .................................................. 12
Using the Schema Discovery Feature on Streaming Data ....................................................... 16
Using the Schema Discovery Feature on Static Data .............................................................. 18
Preprocessing Data Using a Lambda Function ...................................................................... 21
Parallelizing Input Streams for Increased Throughput ........................................................... 27
Application Code ...................................................................................................................... 30
Output .................................................................................................................................... 32
Creating an Output Using the AWS CLI ............................................................................... 32
Using a Lambda Function as Output ................................................................................... 33
Application Output Delivery Model ..................................................................................... 39
Error Handling ......................................................................................................................... 40
Reporting Errors Using an In-Application Error Stream .......................................................... 40
Granting Permissions ................................................................................................................ 41
Trust Policy ..................................................................................................................... 41
Permissions Policy ............................................................................................................ 41
Auto Scaling Applications .......................................................................................................... 43
Getting Started ................................................................................................................................ 45
Step 1: Set Up an Account ........................................................................................................ 45
Sign Up for AWS .............................................................................................................. 45
Create an IAM User .......................................................................................................... 46
Next Step ........................................................................................................................ 46
Step 2: Set Up the AWS CLI ....................................................................................................... 46
Next Step ........................................................................................................................ 47
Step 3: Create Your Starter Analytics Application ........................................................................ 47
Step 3.1: Create an Application .......................................................................................... 49
Step 3.2: Congure Input .................................................................................................. 50
Step 3.3: Add Real-Time Analytics (Add Application Code) ..................................................... 52
Step 3.4: (Optional) Update the Application Code ................................................................. 54
Step 4 (Optional) Edit the Schema and SQL Code Using the Console ............................................... 56
Working with the Schema Editor ........................................................................................ 56
Working with the SQL Editor ............................................................................................. 63
Streaming SQL Concepts ................................................................................................................... 66
In-Application Streams and Pumps ............................................................................................. 66
Timestamps and the ROWTIME Column ...................................................................................... 67
Understanding Various Times in Streaming Analytics ............................................................ 67
Continuous Queries .................................................................................................................. 69
Windowed Queries ................................................................................................................... 70
Stagger Windows ............................................................................................................. 70
Tumbling Windows ........................................................................................................... 75
Sliding Windows ............................................................................................................... 76
Stream Joins ............................................................................................................................ 80
Example 1: Report Orders Where There Are Trades Within One Minute of the Order Being
Placed ............................................................................................................................. 80
Examples ......................................................................................................................................... 82
Transforming Data .................................................................................................................... 82
Preprocessing Streams with Lambda ................................................................................... 82
iii
Amazon Kinesis Data Analytics Developer Guide
Transforming String Values ................................................................................................ 82
Transforming DateTime Values ........................................................................................... 96
Transforming Multiple Data Types ...................................................................................... 99
Windows and Aggregation ....................................................................................................... 104
Stagger Window ............................................................................................................. 104
Tumbling Window Using ROWTIME ................................................................................... 107
Tumbling Window Using an Event Timestamp .................................................................... 109
Most Frequently Occurring Values (TOP_K_ITEMS_TUMBLING) .............................................. 112
Aggregating Partial Results .............................................................................................. 114
Joins ..................................................................................................................................... 116
Example: Add Reference Data Source ................................................................................ 116
Machine Learning ................................................................................................................... 119
Detecting Anomalies ....................................................................................................... 119
Example: Detect Anomalies and Get an Explanation ............................................................ 125
Example: Detect Hotspots ................................................................................................ 129
Alerts and Errors .................................................................................................................... 139
Simple Alerts ................................................................................................................. 139
Throttled Alerts .............................................................................................................. 140
In-Application Error Stream ............................................................................................. 141
Solution Accelerators .............................................................................................................. 142
Real-Time Insights on AWS Account Activity ...................................................................... 142
Real-Time IoT Device Monitoring with Kinesis Data Analytics ................................................ 143
Real-Time Web Analytics with Kinesis Data Analytics ........................................................... 143
AWS Connected Vehicle Solution ...................................................................................... 143
Monitoring ..................................................................................................................................... 144
Monitoring Tools .................................................................................................................... 144
Automated Tools ............................................................................................................ 145
Manual Tools ................................................................................................................. 145
Monitoring with Amazon CloudWatch ....................................................................................... 145
Metrics and Dimensions ................................................................................................... 146
Viewing Metrics and Dimensions ...................................................................................... 147
Alarms .......................................................................................................................... 148
Logs .............................................................................................................................. 149
Limits ............................................................................................................................................ 154
Best Practices ................................................................................................................................. 155
Managing Applications ............................................................................................................ 155
Dening Input Schema ............................................................................................................ 156
Connecting to Outputs ............................................................................................................ 157
Authoring Application Code ..................................................................................................... 157
Testing Applications ................................................................................................................ 157
Setting up a Test Application ........................................................................................... 157
Testing Schema Changes ................................................................................................. 158
Testing Code Changes ..................................................................................................... 158
Troubleshooting ............................................................................................................................. 159
Unable to Run SQL Code ......................................................................................................... 159
Unable to Detect or Discover My Schema .................................................................................. 159
Reference Data is Out of Date .................................................................................................. 160
Application Not Writing to Destination ...................................................................................... 160
Important Application Health Parameters to Monitor .................................................................. 160
Invalid Code Errors When Running an Application ....................................................................... 161
Application is Writing Errors to the Error Stream ........................................................................ 161
Insucient Throughput or High MillisBehindLatest ..................................................................... 161
Authentication and Access Control .................................................................................................... 163
Authentication ....................................................................................................................... 163
Access Control ........................................................................................................................ 164
Overview of Managing Access .................................................................................................. 164
Amazon Kinesis Data Analytics Resources and Operations .................................................... 165
iv
Amazon Kinesis Data Analytics Developer Guide
Understanding Resource Ownership .................................................................................. 165
Managing Access to Resources ......................................................................................... 165
Specifying Policy Elements: Actions, Effects, and Principals .................................................. 167
Specifying Conditions in a Policy ...................................................................................... 167
Using Identity-Based Policies (IAM Policies) ................................................................................ 168
Permissions Required to Use the Amazon Kinesis Data Analytics Console ................................ 168
AWS Managed (Predefined) Policies for Amazon Kinesis Data Analytics .................................. 169
Customer Managed Policy Examples ................................................................................. 170
Amazon Kinesis Data Analytics API Permissions Reference ........................................................... 173
SQL Reference ............................................................................................................................... 175
API Reference ................................................................................................................................. 176
Actions .................................................................................................................................. 176
AddApplicationCloudWatchLoggingOption ......................................................................... 177
AddApplicationInput ....................................................................................................... 179
AddApplicationInputProcessingConguration ..................................................................... 182
AddApplicationOutput ..................................................................................................... 185
AddApplicationReferenceDataSource ................................................................................. 188
CreateApplication ........................................................................................................... 191
DeleteApplication ........................................................................................................... 196
DeleteApplicationCloudWatchLoggingOption ..................................................................... 198
DeleteApplicationInputProcessingConguration .................................................................. 200
DeleteApplicationOutput ................................................................................................. 202
DeleteApplicationReferenceDataSource .............................................................................. 204
DescribeApplication ........................................................................................................ 206
DiscoverInputSchema ...................................................................................................... 210
ListApplications .............................................................................................................. 214
StartApplication ............................................................................................................. 216
StopApplication .............................................................................................................. 218
UpdateApplication .......................................................................................................... 220
Data Types ............................................................................................................................ 223
ApplicationDetail ............................................................................................................ 225
ApplicationSummary ....................................................................................................... 228
ApplicationUpdate .......................................................................................................... 229
CloudWatchLoggingOption .............................................................................................. 230
CloudWatchLoggingOptionDescription .............................................................................. 231
CloudWatchLoggingOptionUpdate .................................................................................... 232
CSVMappingParameters ................................................................................................... 233
DestinationSchema ......................................................................................................... 234
Input ............................................................................................................................. 235
InputConguration .......................................................................................................... 237
InputDescription ............................................................................................................. 238
InputLambdaProcessor .................................................................................................... 240
InputLambdaProcessorDescription .................................................................................... 241
InputLambdaProcessorUpdate .......................................................................................... 242
InputParallelism ............................................................................................................. 243
InputParallelismUpdate ................................................................................................... 244
InputProcessingConguration ........................................................................................... 245
InputProcessingCongurationDescription ........................................................................... 246
InputProcessingCongurationUpdate ................................................................................. 247
InputSchemaUpdate ........................................................................................................ 248
InputStartingPositionConguration ................................................................................... 249
InputUpdate .................................................................................................................. 250
JSONMappingParameters ................................................................................................ 252
KinesisFirehoseInput ........................................................................................................ 253
KinesisFirehoseInputDescription ........................................................................................ 254
KinesisFirehoseInputUpdate ............................................................................................. 255
KinesisFirehoseOutput ..................................................................................................... 256
v
Amazon Kinesis Data Analytics Developer Guide
KinesisFirehoseOutputDescription ..................................................................................... 257
KinesisFirehoseOutputUpdate ........................................................................................... 258
KinesisStreamsInput ........................................................................................................ 259
KinesisStreamsInputDescription ........................................................................................ 260
KinesisStreamsInputUpdate .............................................................................................. 261
KinesisStreamsOutput ..................................................................................................... 262
KinesisStreamsOutputDescription ..................................................................................... 263
KinesisStreamsOutputUpdate ........................................................................................... 264
LambdaOutput ............................................................................................................... 265
LambdaOutputDescription ............................................................................................... 266
LambdaOutputUpdate ..................................................................................................... 267
MappingParameters ........................................................................................................ 268
Output .......................................................................................................................... 269
OutputDescription .......................................................................................................... 271
OutputUpdate ................................................................................................................ 273
RecordColumn ................................................................................................................ 275
RecordFormat ................................................................................................................. 276
ReferenceDataSource ...................................................................................................... 277
ReferenceDataSourceDescription ....................................................................................... 278
ReferenceDataSourceUpdate ............................................................................................ 280
S3Conguration ............................................................................................................. 282
S3ReferenceDataSource ................................................................................................... 283
S3ReferenceDataSourceDescription ................................................................................... 284
S3ReferenceDataSourceUpdate ......................................................................................... 285
SourceSchema ................................................................................................................ 286
Document History .......................................................................................................................... 287
AWS Glossary ................................................................................................................................. 290
vi
Amazon Kinesis Data Analytics Developer Guide
When Should I Use Amazon Kinesis Data Analytics?
What Is Amazon Kinesis Data
Analytics?
With Amazon Kinesis Data Analytics, you can process and analyze streaming data using standard SQL.
The service enables you to quickly author and run powerful SQL code against streaming sources to
perform time series analytics, feed real-time dashboards, and create real-time metrics.
To get started with Kinesis Data Analytics, you create a Kinesis data analytics application that
continuously reads and processes streaming data. The service supports ingesting data from Amazon
Kinesis Data Streams and Amazon Kinesis Data Firehose streaming sources. Then, you author your SQL
code using the interactive editor and test it with live streaming data. You can also configure destinations
where you want Kinesis Data Analytics to send the results.
Kinesis Data Analytics supports Amazon Kinesis Data Firehose (Amazon S3, Amazon Redshift, and
Amazon Elasticsearch Service), AWS Lambda, and Amazon Kinesis Data Streams as destinations.
When Should I Use Amazon Kinesis Data Analytics?
Amazon Kinesis Data Analytics enables you to quickly author SQL code that continuously reads,
processes, and stores data in near real time. Using standard SQL queries on the streaming data, you can
construct applications that transform and provide insights into your data. Following are some of example
scenarios for using Kinesis Data Analytics:
Generate time-series analytics – You can calculate metrics over time windows, and then stream values
to Amazon S3 or Amazon Redshift through a Kinesis data delivery stream.
Feed real-time dashboards – You can send aggregated and processed streaming data results
downstream to feed real-time dashboards.
Create real-time metrics – You can create custom metrics and triggers for use in real-time monitoring,
notifications, and alarms.
For information about the SQL language elements that are supported by Kinesis Data Analytics, see
Amazon Kinesis Data Analytics SQL Reference.
Are You a First-Time User of Amazon Kinesis Data
Analytics?
If you are a first-time user of Amazon Kinesis Data Analytics, we recommend that you read the following
sections in order:
1. Read the How It Works section of this guide. This section introduces various Kinesis Data Analytics
components that you work with to create an end-to-end experience. For more information, see
Amazon Kinesis Data Analytics: How It Works (p. 3).
2. Try the Getting Started exercises. For more information, see Getting Started with Amazon Kinesis
Data Analytics (p. 45).
3. Explore the streaming SQL concepts. For more information, see Streaming SQL Concepts (p. 66).
1
Amazon Kinesis Data Analytics Developer Guide
Are You a First-Time User of
Amazon Kinesis Data Analytics?
4. Try additional examples. For more information, see Example Applications (p. 82).
2
Amazon Kinesis Data Analytics Developer Guide
Amazon Kinesis Data Analytics: How
It Works
An application is the primary resource in Amazon Kinesis Data Analytics that you can create in your
account. You can create and manage applications using the AWS Management Console or the Kinesis
Data Analytics API. Kinesis Data Analytics provides API operations to manage applications. For a list of
API operations, see Actions (p. 176).
Kinesis Data Analytics applications continuously read and process streaming data in real time. You write
application code using SQL to process the incoming streaming data and produce output. Then, Kinesis
Data Analytics writes the output to a configured destination. The following diagram illustrates a typical
application architecture.
Each application has a name, description, version ID, and status. Amazon Kinesis Data Analytics assigns
a version ID when you first create an application. This version ID is updated when you update any
application configuration. For example, if you add an input configuration, add or delete a reference
data source, add or delete an output configuration, or update application code, Kinesis Data Analytics
updates the current application version ID. Kinesis Data Analytics also maintains timestamps for when an
application was created and last updated.
In addition to these basic properties, each application consists of the following:
Input – The streaming source for your application. You can select either a Kinesis data stream or
a Kinesis Data Firehose data delivery stream as the streaming source. In the input configuration,
you map the streaming source to an in-application input stream. The in-application stream is like a
continuously updating table upon which you can perform the SELECT and INSERT SQL operations.
In your application code, you can create additional in-application streams to store intermediate query
results.
3
Amazon Kinesis Data Analytics Developer Guide
You can optionally partition a single streaming source in multiple in-application input streams to
improve the throughput. For more information, see Limits (p. 154) and Configuring Application
Input (p. 5).
Amazon Kinesis Data Analytics provides a timestamp column in each application stream called
Timestamps and the ROWTIME Column (p. 67). You can use this column in time-based windowed
queries. For more information, see Windowed Queries (p. 70).
You can optionally configure a reference data source to enrich your input data stream within the
application. It results in an in-application reference table. You must store your reference data as
an object in your S3 bucket. When the application starts, Amazon Kinesis Data Analytics reads
the Amazon S3 object and creates an in-application table. For more information, see Configuring
Application Input (p. 5).
Application code – A series of SQL statements that process input and produce output. You can write
SQL statements against in-application streams and reference tables. You can also write JOIN queries to
combine data from both of these sources.
For information about the SQL language elements that are supported by Kinesis Data Analytics, see
Amazon Kinesis Data Analytics SQL Reference.
In its simplest form, application code can be a single SQL statement that selects from a streaming
input and inserts results into a streaming output. It can also be a series of SQL statements where
output of one feeds into the input of the next SQL statement. Further, you can write application code
to split an input stream into multiple streams. You can then apply additional queries to process these
streams. For more information, see Application Code (p. 30).
Output – In application code, query results go to in-application streams. In your application code,
you can create one or more in-application streams to hold intermediate results. You can then
optionally configure the application output to persist data in the in-application streams that hold
your application output (also referred to as in-application output streams) to external destinations.
External destinations can be a Kinesis Data Firehose delivery stream or a Kinesis data stream. Note the
following about these destinations:
You can configure a Kinesis Data Firehose delivery stream to write results to Amazon S3, Amazon
Redshift, or Amazon Elasticsearch Service (Amazon ES).
You can also write application output to a custom destination instead of Amazon S3 or Amazon
Redshift. To do that, you specify a Kinesis data stream as the destination in your output
configuration. Then, you configure AWS Lambda to poll the stream and invoke your Lambda
function. Your Lambda function code receives stream data as input. In your Lambda function code,
you can write the incoming data to your custom destination. For more information, see Using AWS
Lambda with Amazon Kinesis Data Analytics.
For more information, see Configuring Application Output (p. 32).
4
Amazon Kinesis Data Analytics Developer Guide
Input
In addition, note the following:
Amazon Kinesis Data Analytics needs permissions to read records from a streaming source and write
application output to the external destinations. You use IAM roles to grant these permissions.
Kinesis Data Analytics automatically provides an in-application error stream for each application. If
your application has issues while processing certain records (for example, because of a type mismatch
or late arrival), that record is written to the error stream. You can configure application output to direct
Kinesis Data Analytics to persist the error stream data to an external destination for further evaluation.
For more information, see Error Handling (p. 40).
Amazon Kinesis Data Analytics ensures that your application output records are written to the
configured destination. It uses an "at least once" processing and delivery model, even if you experience
an application interruption. For more information, see Delivery Model for Persisting Application
Output to an External Destination (p. 39).
Topics
Configuring Application Input (p. 5)
Application Code (p. 30)
Configuring Application Output (p. 32)
Error Handling (p. 40)
Granting Amazon Kinesis Data Analytics Permissions to Access Streaming and Reference Sources
(Creating an IAM Role) (p. 41)
Automatically Scaling Applications to Increase Throughput (p. 43)
Configuring Application Input
Your Amazon Kinesis Data Analytics application can receive input from a single streaming source and,
optionally, use one reference data source. For more information, see Amazon Kinesis Data Analytics: How
It Works (p. 3). The sections in this topic describe the application input sources.
Topics
Configuring a Streaming Source (p. 5)
Configuring a Reference Source (p. 7)
Working with JSONPath (p. 9)
Mapping Streaming Source Elements to SQL Input Columns (p. 12)
Using the Schema Discovery Feature on Streaming Data (p. 16)
Using the Schema Discovery Feature on Static Data (p. 18)
Preprocessing Data Using a Lambda Function (p. 21)
Parallelizing Input Streams for Increased Throughput (p. 27)
Configuring a Streaming Source
At the time that you create an application, you specify a streaming source. You can also modify an input
after you create the application. Amazon Kinesis Data Analytics supports the following streaming sources
for your application:
5
Amazon Kinesis Data Analytics Developer Guide
Configuring a Streaming Source
A Kinesis data stream
A Kinesis Data Firehose delivery stream
Note
If the Kinesis data stream is encrypted, Kinesis Data Analytics accesses the data in the encrypted
stream seamlessly with no further configuration needed. Kinesis Data Analytics does not store
unencrypted data read from Kinesis Data Streams. For more information, see What Is Server-
Side Encryption For Kinesis Data Streams?.
Kinesis Data Analytics continuously polls the streaming source for new data and ingests it in in-
application streams according to the input configuration. Your application code can query the in-
application stream. As part of input configuration you provide the following:
Streaming source – You provide the Amazon Resource Name (ARN) of the stream and an IAM role that
Kinesis Data Analytics can assume to access the stream on your behalf.
In-application stream name prefix – When you start the application, Kinesis Data Analytics creates
the specified in-application stream. In your application code, you access the in-application stream
using this name.
You can optionally map a streaming source to multiple in-application streams. For more information,
see Limits (p. 154). In this case, Amazon Kinesis Data Analytics creates the specified number of in-
application streams with names as follows: prefix_001, prefix_002, and prefix_003. By default,
Kinesis Data Analytics maps the streaming source to one in-application stream named prefix_001.
There is a limit on the rate that you can insert rows in an in-application stream. Therefore, Kinesis
Data Analytics supports multiple such in-application streams so that you can bring records into your
application at a much faster rate. If you find that your application is not keeping up with the data in
the streaming source, you can add units of parallelism to improve performance.
Mapping schema – You describe the record format (JSON, CSV) on the streaming source. You also
describe how each record on the stream maps to columns in the in-application stream that is created.
This is where you provide column names and data types.
Note
Kinesis Data Analytics adds quotation marks around the identifiers (stream name and column
names) when creating the input in-application stream. When querying this stream and the
columns, you must specify them in quotation marks using the same casing (matching lowercase
and uppercase letters exactly). For more information about identifiers, see Identifiers in the
Amazon Kinesis Data Analytics SQL Reference.
You can create an application and configure inputs in the Amazon Kinesis Data Analytics console. The
console then makes the necessary API calls. You can configure application input when you create a
new application API or add input configuration to an existing application. For more information, see
CreateApplication (p. 191) and AddApplicationInput (p. 179). The following is the input configuration
part of the Createapplication API request body:
"Inputs": [
{
"InputSchema": {
"RecordColumns": [
{
"Mapping": "string",
"Name": "string",
"SqlType": "string"
}
],
"RecordEncoding": "string",
"RecordFormat": {
6
Amazon Kinesis Data Analytics Developer Guide
Configuring a Reference Source
"MappingParameters": {
"CSVMappingParameters": {
"RecordColumnDelimiter": "string",
"RecordRowDelimiter": "string"
},
"JSONMappingParameters": {
"RecordRowPath": "string"
}
},
"RecordFormatType": "string"
}
},
"KinesisFirehoseInput": {
"ResourceARN": "string",
"RoleARN": "string"
},
"KinesisStreamsInput": {
"ResourceARN": "string",
"RoleARN": "string"
},
"Name": "string"
}
]
Configuring a Reference Source
You can also optionally add a reference data source to an existing application to enrich the data coming
in from streaming sources. You must store reference data as an object in your Amazon S3 bucket. When
the application starts, Amazon Kinesis Data Analytics reads the Amazon S3 object and creates an in-
application reference table. Your application code can then join it with an in-application stream.
You store reference data in the Amazon S3 object using supported formats (CSV, JSON). For example,
suppose that your application performs analytics on stock orders. Assume the following record format
on the streaming source:
Ticker, SalePrice, OrderId
AMZN $700 1003
XYZ $250 1004
...
In this case, you might then consider maintaining a reference data source to provide details for each
stock ticker, such as company name.
Ticker, Company
AMZN, Amazon
XYZ, SomeCompany
...
You can add an application reference data source either with the API or with the console. Amazon Kinesis
Data Analytics provides the following API actions to manage reference data sources:
AddApplicationReferenceDataSource (p. 188)
UpdateApplication (p. 220)
For information about adding reference data using the console, see Example: Adding Reference Data to a
Kinesis Data Analytics Application (p. 116).
Note the following:
7
Amazon Kinesis Data Analytics Developer Guide
Configuring a Reference Source
If the application is running, Kinesis Data Analytics creates an in-application reference table, and then
loads the reference data immediately.
If the application is not running (for example, it's in the ready state), Kinesis Data Analytics saves only
the updated input configuration. When the application starts running, Kinesis Data Analytics loads the
reference data in your application as a table.
Suppose that you want to refresh the data after Kinesis Data Analytics creates the in-application
reference table. Perhaps you updated the Amazon S3 object, or you want to use a different Amazon
S3 object. In this case, you can either explicitly call UpdateApplication (p. 220), or choose Actions,
Synchronize reference data table in the console. Kinesis Data Analytics does not refresh the in-
application reference table automatically.
There is a limit on the size of the Amazon S3 object that you can create as a reference data source. For
more information, see Limits (p. 154). If the object size exceeds the limit, Kinesis Data Analytics can't
load the data. The application state appears as running, but the data is not being read.
When you add a reference data source, you provide the following information:
S3 bucket and object key name – In addition to the bucket name and object key, you also provide an
IAM role that Kinesis Data Analytics can assume to read the object on your behalf.
In-application reference table name – Kinesis Data Analytics creates this in-application table and
populates it by reading the Amazon S3 object. This is the table name you specify in your application
code.
Mapping schema – You describe the record format (JSON, CSV), encoding of data stored in the
Amazon S3 object. You also describe how each data element maps to columns in the in-application
reference table.
The following shows the request body in the AddApplicationReferenceDataSource API request.
{
"applicationName": "string",
"CurrentapplicationVersionId": number,
"ReferenceDataSource": {
"ReferenceSchema": {
"RecordColumns": [
{
"IsDropped": boolean,
"Mapping": "string",
"Name": "string",
"SqlType": "string"
}
],
"RecordEncoding": "string",
"RecordFormat": {
"MappingParameters": {
"CSVMappingParameters": {
"RecordColumnDelimiter": "string",
"RecordRowDelimiter": "string"
},
"JSONMappingParameters": {
"RecordRowPath": "string"
}
},
"RecordFormatType": "string"
}
},
"S3ReferenceDataSource": {
"BucketARN": "string",
"FileKey": "string",
8
Amazon Kinesis Data Analytics Developer Guide
Working with JSONPath
"ReferenceRoleARN": "string"
},
"TableName": "string"
}
}
Working with JSONPath
JSONPath is a standardized way to query elements of a JSON object. JSONPath uses path expressions to
navigate elements, nested elements, and arrays in a JSON document. For more information about JSON,
see Introducing JSON.
Accessing JSON Elements with JSONPath
Following, you can find how to use JSONPath expressions to access various elements in JSON-formatted
data. For the examples in this section, assume that the source stream contains a JSON record in the
following format.
{
"customerName":"John Doe",
"address":
{
"streetAddress":
[
"number":"123",
"street":"AnyStreet"
],
"city":"Anytown"
}
"orders":
[
{ "orderId":"23284", "itemName":"Widget", "itemPrice":"33.99" },
{ "orderId":"63122", "itemName":"Gadget", "itemPrice":"22.50" },
{ "orderId":"77284", "itemName":"Sprocket", "itemPrice":"12.00" }
]
}
Accessing JSON Elements
To query an element in JSON data using JSONPath, use the following syntax. Here, $ represents the root
of the data hierarchy and elementName is the name of the element node to query.
$.elementName
The following expression queries the customerName element in the preceding JSON example.
$.customerName
The preceding expression returns the following from the preceding JSON record.
John Doe
Note
Path expressions are case sensitive. The expression $.Name returns null from the preceding
JSON example.
9
Amazon Kinesis Data Analytics Developer Guide
Working with JSONPath
Note
If no element appears at the location where the path expression specifies, the expression returns
null. The following expression returns null from the preceding JSON example, because there
is no matching element.
$.customerId
Accessing Nested JSON Elements
To query a nested JSON element, use the following syntax.
$.parentElement.element
The following expression queries the city element in the preceding JSON example.
$.address.city
The preceding expression returns the following from the preceding JSON record.
Anytown
You can query further levels of subelements using the following syntax.
$.parentElement.element.subElement
The following expression queries the street element in the preceding JSON example.
$.address.streetAddress.street
The preceding expression returns the following from the preceding JSON record.
AnyStreet
Accessing Arrays
Arrays are queried using an array index expression inside square brackets ([]). Currently, the only index
expression supported is 0:, meaning that all the elements in the array are returned.
The format of the data returned depends on whether the array index expression is the last expression in
the path:
When the array index is the last expression in the path expression, all of the contents of the array are
returned as a single field in a single data row.
When there is a nested expression after the array index expression, the array is "flattened." In other
words, each element in the array is returned as a separate data row.
To query the entire contents of an array as a single row, use the following syntax.
$.arrayObject[0:]
The following expression queries the entire contents of the orders element in the preceding JSON
example. It returns the array contents in a single column in a single row.
10
Amazon Kinesis Data Analytics Developer Guide
Working with JSONPath
$.orders[0:]
The preceding expression returns the following from the preceding JSON record.
[{"orderId":"23284","itemName":"Widget","itemPrice":"33.99"},
{"orderId":"61322","itemName":"Gadget","itemPrice":"22.50"},
{"orderId":"77284","itemName":"Sprocket","itemPrice":"12.00"}]
To query the individual elements in an array as separate rows, use the following syntax.
$.arrayObject[0:].element
The following expression queries the orderId elements in the preceding JSON example, and returns
each array element as a separate row.
$.orders[0:].orderId
The preceding expression returns the following from the preceding JSON record, with each data item
returned as a separate row.
23284
63122
77284
Note
If expressions that query nonarray elements are included in a schema that queries individual
array elements, the nonarray elements are repeated for each element in the array. For example,
suppose that a schema for the preceding JSON example includes the following expressions:
• $.customerName
• $.orders[0:].orderId
In this case, the returned data rows from the sample input stream element resemble the
following, with the name element repeated for every orderId element.
John Doe 23284
John Doe 63122
John Doe 77284
Note
The following limitations apply to array expressions in Amazon Kinesis Data Analytics:
Only one level of dereferencing is supported in an array expression. The following expression
format is not supported.
$.arrayObject[0:].element[0:].subElement
11
Amazon Kinesis Data Analytics Developer Guide
Mapping Streaming Source Elements to SQL Input Columns
Only one array can be flattened in a schema. Multiple arrays can be referenced—returned as
one row containing all of the elements in the array. However, only one array can have each of
its elements returned as individual rows.
A schema containing elements in the following format is valid. This format returns the
contents of the second array as a single column, repeated for every element in the first array.
$.arrayObjectOne[0:].element
$.arrayObjectTwo[0:]
A schema containing elements in the following format is not valid.
$.arrayObjectOne[0:].element
$.arrayObjectTwo[0:].element
Other Considerations
Additional considerations for working with JSONPath are as follows:
If no arrays are accessed by an individual element in the JSONPath expression, then a single row is
created for each JSON record processed. Every JSONPath expression corresponds to a single column.
When an array is flattened, any missing elements result in a null value being created in the in-
application stream.
An array is always flattened to at least one row. If no values would be returned (that is, the array is
empty or none of its elements are queried), a single row with all null values is returned.
The following expression returns records with null values from the preceding JSON example, because
there is no matching element at the specified path.
$.orders[0:].itemId
The preceding expression returns the following from the preceding JSON example record.
null
null
null
Related Topics
Introducing JSON
Mapping Streaming Source Elements to SQL Input
Columns
With Amazon Kinesis Data Analytics, you can process and analyze streaming data in either JSON or CSV
formats using standard SQL.
12
Amazon Kinesis Data Analytics Developer Guide
Mapping Streaming Source Elements to SQL Input Columns
To process and analyze streaming CSV data, you assign column names and data types for the columns
of the input stream. Your application imports one column from the input stream per column definition,
in order.
You don't have to include all of the columns in the application input stream, but you cannot skip
columns from the source stream. For example, you can import the first three columns from an input
stream containing five elements, but you cannot import only columns 1, 2, and 4.
To process and analyze streaming JSON data, you use JSONPath expressions to map JSON elements
from a streaming source to SQL columns in an input stream. For more information about using
JSONPath with Amazon Kinesis Data Analytics, see Working with JSONPath (p. 9). The columns in
the SQL table have data types that are mapped from JSON types. For supported data types, see Data
Types. For details about converting JSON data to SQL data, see Mapping JSON Data Types to SQL Data
Types (p. 15).
For more information about how to configure input steams, see Configuring Application Input (p. 5).
Mapping JSON Data to SQL Columns
You can map JSON elements to input columns using the AWS Management Console or the Kinesis Data
Analytics API.
To map elements to columns using the console, see Working with the Schema Editor (p. 56).
To map elements to columns using the Kinesis Data Analytics API, see the following section.
To map JSON elements to columns in the in-application input stream, you need a schema with the
following information for each column:
Source Expression: The JSONPath expression that identifies the location of the data for the column.
Column Name: The name that your SQL queries use to reference the data.
Data Type: The SQL data type for the column.
Using the API
To map elements from a streaming source to input columns, you can use the Kinesis Data Analytics API
CreateApplication (p. 191) action. To create the in-application stream, specify a schema to transform
your data into a schematized version used in SQL. The CreateApplication (p. 191) action configures
your application to receive input from a single streaming source. To map JSON elements or CSV
columns to SQL columns, you create a RecordColumn (p. 275) object in the SourceSchema (p. 286)
RecordColumns array. The RecordColumn (p. 275) object has the following schema:
{
"Mapping": "String",
"Name": "String",
"SqlType": "String"
}
The fields in the RecordColumn (p. 275) object have the following values:
Mapping: The JSONPath expression that identifies the location of the data in the input stream record.
This value is not present for an input schema for a source stream in CSV format.
Name: The column name in the in-application SQL data stream.
13
Amazon Kinesis Data Analytics Developer Guide
Mapping Streaming Source Elements to SQL Input Columns
SqlType: The data type of the data in the in-application SQL data stream.
JSON Input Schema Example
The following example demonstrates the format of the InputSchema value for a JSON schema.
"InputSchema": {
"RecordColumns": [
{
"SqlType": "VARCHAR(4)",
"Name": "TICKER_SYMBOL",
"Mapping": "$.TICKER_SYMBOL"
},
{
"SqlType": "VARCHAR(16)",
"Name": "SECTOR",
"Mapping": "$.SECTOR"
},
{
"SqlType": "TINYINT",
"Name": "CHANGE",
"Mapping": "$.CHANGE"
},
{
"SqlType": "DECIMAL(5,2)",
"Name": "PRICE",
"Mapping": "$.PRICE"
}
],
"RecordFormat": {
"MappingParameters": {
"JSONMappingParameters": {
"RecordRowPath": "$"
}
},
"RecordFormatType": "JSON"
},
"RecordEncoding": "UTF-8"
}
CSV Input Schema Example
The following example demonstrates the format of the InputSchema value for a schema in comma-
separated value (CSV) format.
"InputSchema": {
"RecordColumns": [
{
"SqlType": "VARCHAR(16)",
"Name": "LastName"
},
{
"SqlType": "VARCHAR(16)",
"Name": "FirstName"
},
{
"SqlType": "INTEGER",
"Name": "CustomerId"
}
],
"RecordFormat": {
14
Amazon Kinesis Data Analytics Developer Guide
Mapping Streaming Source Elements to SQL Input Columns
"MappingParameters": {
"CSVMappingParameters": {
"RecordColumnDelimiter": ",",
"RecordRowDelimiter": "\n"
}
},
"RecordFormatType": "CSV"
},
"RecordEncoding": "UTF-8"
}
Mapping JSON Data Types to SQL Data Types
JSON data types are converted to corresponding SQL data types according to the application's input
schema. For information about supported SQL data types, see Data Types. Amazon Kinesis Data
Analytics converts JSON data types to SQL data types according to the following rules.
Null Literal
A null literal in the JSON input stream (that is, "City":null) converts to a SQL null regardless of
destination data type.
Boolean Literal
A Boolean literal in the JSON input stream (that is, "Contacted":true) converts to SQL data as
follows:
Numeric (DECIMAL, INT, and so on): true converts to 1; false converts to 0.
Binary (BINARY or VARBINARY):
true: Result has lowest bit set and remaining bits cleared.
false: Result has all bits cleared.
Conversion to VARBINARY results in a value 1 byte in length.
BOOLEAN: Converts to the corresponding SQL BOOLEAN value.
Character (CHAR or VARCHAR): Converts to the corresponding string value (true or false). The value
is truncated to fit the length of the field.
Datetime (DATE, TIME, or TIMESTAMP): Conversion fails and a coercion error is written to the error
stream.
Number
A number literal in the JSON input stream (that is, "CustomerId":67321) converts to SQL data as
follows:
Numeric (DECIMAL, INT, and so on): Converts directly. If the converted value exceeds the size or
precision of the target data type (that is, converting 123.4 to INT), conversion fails and a coercion
error is written to the error stream.
Binary (BINARY or VARBINARY): Conversion fails and a coercion error is written to the error stream.
• BOOLEAN:
0: Converts to false.
All other numbers: Converts to true.
Character (CHAR or VARCHAR): Converts to a string representation of the number.
15
Amazon Kinesis Data Analytics Developer Guide
Using the Schema Discovery Feature on Streaming Data
Datetime (DATE, TIME, or TIMESTAMP): Conversion fails and a coercion error is written to the error
stream.
String
A string value in the JSON input stream (that is, "CustomerName":"John Doe") converts to SQL data
as follows:
Numeric (DECIMAL, INT, and so on): Amazon Kinesis Data Analytics attempts to convert the value to
the target data type. If the value cannot be converted, conversion fails and a coercion error is written
to the error stream.
Binary (BINARY or VARBINARY): If the source string is a valid binary literal (that is, X'3F67A23A', with
an even number of f), the value is converted to the target data type. Otherwise, conversion fails and a
coercion error is written to the error stream.
BOOLEAN: If the source string is "true", converts to true. This comparison is case-insensitive.
Otherwise, converts to false.
Character (CHAR or VARCHAR): Converts to the string value in the input. If the value is longer than the
target data type, it is truncated and no error is written to the error stream.
Datetime (DATE, TIME, or TIMESTAMP): If the source string is in a format that can be converted to the
target value, the value is converted. Otherwise, conversion fails and a coercion error is written to the
error stream.
Valid datetime formats include:
• "1992-02-14"
"1992-02-14 18:35:44.0"
Array or Object
An array or object in the JSON input stream converts to SQL data as follows:
Character (CHAR or VARCHAR): Converts to the source text of the array or object. See Accessing
Arrays (p. 10).
All other data types: Conversion fails and a coercion error is written to the error stream.
For an example of a JSON array, see Working with JSONPath (p. 9).
Related Topics
Configuring Application Input (p. 5)
Data Types
Working with the Schema Editor (p. 56)
CreateApplication (p. 191)
RecordColumn (p. 275)
SourceSchema (p. 286)
Using the Schema Discovery Feature on Streaming
Data
Providing an input schema that describes how records on the streaming input map to an in-application
stream can be cumbersome and error prone. You can use the DiscoverInputSchema (p. 210) API (called
16
Amazon Kinesis Data Analytics Developer Guide
Using the Schema Discovery Feature on Streaming Data
the discovery API) to infer a schema. Using random samples of records on the streaming source, the API
can infer a schema (that is, column names, data types, and position of the data element in the incoming
data).
Note
To use the Discovery API to generate a schema from a file stored in Amazon S3, see Using the
Schema Discovery Feature on Static Data (p. 18).
The console uses the Discovery API to generate a schema for a specified streaming source. Using the
console, you can also update the schema, including adding or removing columns, changing column
names or data types, and so on. However, make changes carefully to ensure that you do not create an
invalid schema.
After you finalize a schema for your in-application stream, there are functions you can use to manipulate
string and datetime values. You can use these functions in your application code when working with
rows in the resulting in-application stream. For more information, see Example: Transforming DateTime
Values (p. 96).
Column Naming During Schema Discovery
During schema discovery, Amazon Kinesis Data Analytics tries to retain as much of the original column
name as possible from the streaming input source, except in the following cases:
The source stream column name is a reserved SQL keyword, such as TIMESTAMP, USER, VALUES, or
YEAR.
The source stream column name contains unsupported characters. Only letters, numbers, and the
underscore character ( _ ) are supported.
The source stream column name begins with a number.
The source stream column name is longer than 100 characters.
If a column is renamed, the renamed schema column name begins with COL_. In some cases, none of
the original column name can be retained—for example, if the entire name is unsupported characters.
In such a case, the column is named COL_#, with # being a number indicating the column's place in the
column order.
After discovery completes, you can update the schema using the console to add or remove columns, or
change column names, data types, or data size.
Examples of Discovery-Suggested Column Names
Source Stream Column Name Discovery-Suggested Column Name
USER COL_USER
USER@DOMAIN COL_USERDOMAIN
@@ COL_0
Schema Discovery Issues
What happens if Kinesis Data Analytics does not infer a schema for a given streaming source?
Kinesis Data Analytics infers your schema for common formats, such as CSV and JSON, which are UTF-8
encoded. Kinesis Data Analytics supports any UTF-8 encoded records (including raw text like application
17
Amazon Kinesis Data Analytics Developer Guide
Using the Schema Discovery Feature on Static Data
logs and records) with a custom column and row delimiter. If Kinesis Data Analytics doesn't infer a
schema, you can define a schema manually using the schema editor on the console (or using the API).
If your data does not follow a pattern (which you can specify using the schema editor), you can define
a schema as a single column of type VARCHAR(N), where N is the largest number of characters you
expect your record to include. From there, you can use string and date-time manipulation to structure
your data after it is in an in-application stream. For examples, see Example: Transforming DateTime
Values (p. 96).
Using the Schema Discovery Feature on Static Data
The schema discovery feature can generate a schema from either the data in a stream or data in a static
file that is stored in an Amazon S3 bucket. Suppose that you want to generate a schema for a Kinesis
Data Analytics application for reference purposes or when live streaming data isn't available. You can use
the schema discovery feature on a static file that contains a sample of the data in the expected format of
your streaming or reference data. Kinesis Data Analytics can run schema discovery on sample data from a
JSON or CSV file that's stored in an Amazon S3 bucket. Using schema discovery on a data file uses either
the console, or the DiscoverInputSchema (p. 210) API with the S3Configuration parameter specified.
Running Schema Discovery Using the Console
To run discovery on a static file using the console, do the following:
1. Add a reference data object to an S3 bucket.
2. Choose Connect reference data in the application's main page in the Kinesis Data Analytics console.
3. Provide the bucket, path and IAM role data for accessing the Amazon S3 object containing the
reference data.
4. Choose Discover schema.
For more information on how to add reference data and discover schema in the console, see Example:
Adding Reference Data to a Kinesis Data Analytics Application (p. 116).
Running Schema Discovery Using the API
To run discovery on a static file using the API, you provide the API with an S3Configuration structure
with the following information:
BucketARN: The Amazon Resource Name (ARN) of the Amazon S3 bucket that contains the file.
For the format of an Amazon S3 bucket ARN, see Amazon Resource Names (ARNs) and AWS Service
Namespaces: Amazon Simple Storage Service (Amazon S3).
RoleARN: The ARN of an IAM role with the AmazonS3ReadOnlyAccess policy. For information about
how to add a policy to a role, see Modifying a Role.
FileKey: The file name of the object.
To generate a schema from an Amazon S3 object using the DiscoverInputSchema API
1. Make sure that you have the AWS CLI set up. For more information, see Step 2: Set Up the AWS
Command Line Interface (AWS CLI) (p. 46) in the Getting Started section.
2. Create a file named data.csv with the following contents:
year,month,state,producer_type,energy_source,units,consumption
2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615
18
Amazon Kinesis Data Analytics Developer Guide
Using the Schema Discovery Feature on Static Data
2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535
2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890
2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601
2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681
3. Sign in to the Amazon S3 console at https://console.aws.amazon.com/s3/.
4. Create an Amazon S3 bucket and upload the data.csv file you created. Note the ARN of the
created bucket. For information about creating an Amazon S3 bucket and uploading a file, see
Getting Started with Amazon Simple Storage Service.
5. Open the IAM console at https://console.aws.amazon.com/iam/. Create a role with the
AmazonS3ReadOnlyAccess policy. Note the ARN of the new role. For information about creating a
role, see Creating a Role to Delegate Permissions to an AWS Service. For information about how to
add a policy to a role, see Modifying a Role.
6. Run the following DiscoverInputSchema command in the AWS CLI, substituting the ARNs for
your Amazon S3 bucket and IAM role:
$aws kinesisanalytics discover-input-schema --s3-configuration '{ "RoleARN":
"arn:aws:iam::123456789012:role/service-role/your-IAM-role", "BucketARN":
"arn:aws:s3:::your-bucket-name", "FileKey": "data.csv" }'
7. The response looks similar to the following:
{
"InputSchema": {
"RecordEncoding": "UTF-8",
"RecordColumns": [
{
"SqlType": "INTEGER",
"Name": "COL_year"
},
{
"SqlType": "INTEGER",
"Name": "COL_month"
},
{
"SqlType": "VARCHAR(4)",
"Name": "state"
},
{
"SqlType": "VARCHAR(64)",
"Name": "producer_type"
},
{
"SqlType": "VARCHAR(4)",
"Name": "energy_source"
},
{
"SqlType": "VARCHAR(16)",
"Name": "units"
},
{
"SqlType": "INTEGER",
"Name": "consumption"
}
],
"RecordFormat": {
"RecordFormatType": "CSV",
"MappingParameters": {
"CSVMappingParameters": {
"RecordRowDelimiter": "\r\n",
"RecordColumnDelimiter": ","
}
19
Amazon Kinesis Data Analytics Developer Guide
Using the Schema Discovery Feature on Static Data
}
}
},
"RawInputRecords": [
"year,month,state,producer_type,energy_source,units,consumption
\r\n2001,1,AK,TotalElectricPowerIndustry,Coal,ShortTons,47615\r
\n2001,1,AK,ElectricGeneratorsElectricUtilities,Coal,ShortTons,16535\r
\n2001,1,AK,CombinedHeatandPowerElectricPower,Coal,ShortTons,22890\r
\n2001,1,AL,TotalElectricPowerIndustry,Coal,ShortTons,3020601\r
\n2001,1,AL,ElectricGeneratorsElectricUtilities,Coal,ShortTons,2987681"
],
"ParsedInputRecords": [
[
null,
null,
"state",
"producer_type",
"energy_source",
"units",
null
],
[
"2001",
"1",
"AK",
"TotalElectricPowerIndustry",
"Coal",
"ShortTons",
"47615"
],
[
"2001",
"1",
"AK",
"ElectricGeneratorsElectricUtilities",
"Coal",
"ShortTons",
"16535"
],
[
"2001",
"1",
"AK",
"CombinedHeatandPowerElectricPower",
"Coal",
"ShortTons",
"22890"
],
[
"2001",
"1",
"AL",
"TotalElectricPowerIndustry",
"Coal",
"ShortTons",
"3020601"
],
[
"2001",
"1",
"AL",
"ElectricGeneratorsElectricUtilities",
"Coal",
"ShortTons",
"2987681"
]
20
Amazon Kinesis Data Analytics Developer Guide
Preprocessing Data Using a Lambda Function
]
}
Preprocessing Data Using a Lambda Function
If the data in your stream needs format conversion, transformation, enrichment, or filtering, you can
preprocess the data using an AWS Lambda function. You can do this before your application SQL code
executes or before your application creates a schema from your data stream.
Using a Lambda function for preprocessing records is useful in the following scenarios:
Transforming records from other formats (such as KPL or GZIP) into formats that Kinesis Data Analytics
can analyze. Kinesis Data Analytics currently supports JSON or CSV data formats.
Expanding data into a format that is more accessible for operations such as aggregation or anomaly
detection. For instance, if several data values are stored together in a string, you can expand the data
into separate columns.
Data enrichment with other AWS services, such as extrapolation or error correction.
Applying complex string transformation to record fields.
Data filtering for cleaning up the data.
Using a Lambda Function for Preprocessing Records
When creating your Kinesis Data Analytics application, you enable Lambda preprocessing in the Connect
to a Source page.
To use a Lambda function to preprocess records in a Kinesis Data Analytics application
1. Sign in to the AWS Management Console and open the Kinesis Data Analytics console at https://
console.aws.amazon.com/kinesisanalytics.
2. On the Connect to a Source page for your application, choose Enabled in the Record pre-
processing with AWS Lambda section.
3. To use a Lambda function that you have already created, choose the function in the Lambda
function drop-down list.
4. To create a new Lambda function from one of the Lambda preprocessing templates, choose the
template from the drop-down list. Then choose View <template name> in Lambda to edit the
function.
5. To create a new Lambda function, choose Create new. For information about creating a Lambda
function, see Create a HelloWorld Lambda Function and Explore the Console in the AWS Lambda
Developer Guide.
6. Choose the version of the Lambda function to use. To use the latest version, choose $LATEST.
When you choose or create a Lambda function for record preprocessing, the records are preprocessed
before your application SQL code executes or your application generates a schema from the records.
Lambda Preprocessing Permissions
To use Lambda preprocessing, the application's IAM role requires the following permissions policy:
{
21
Amazon Kinesis Data Analytics Developer Guide
Preprocessing Data Using a Lambda Function
"Sid": "UseLambdaFunction",
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
"lambda:GetFunctionConfiguration"
],
"Resource": "<FunctionARN>"
}
For more information about adding permissions policies, see Authentication and Access Control for
Amazon Kinesis Data Analytics (p. 163).
Lambda Preprocessing Metrics
You can use Amazon CloudWatch to monitor the number of Lambda invocations, bytes processed,
successes and failures, and so on. For information about CloudWatch metrics that are emitted by Kinesis
Data Analytics Lambda preprocessing, see Amazon Kinesis Analytics Metrics.
Using AWS Lambda with the Kinesis Producer Library
The Kinesis Producer Library (KPL) aggregates small user-formatted records into larger records up to 1
MB to make better use of Amazon Kinesis Data Streams throughput. The Kinesis Client Library (KCL) for
Java supports deaggregating these records. However, you must use a special module to deaggregate the
records when you use AWS Lambda as the consumer of your streams.
To get the necessary project code and instructions, see the Kinesis Producer Library Deaggregation
Modules for AWS Lambda on GitHub. You can use the components in this project to process KPL
serialized data within AWS Lambda in Java, Node.js, and Python. You can also use these components as
part of a multi-lang KCL application.
Data Preprocessing Event Input Data Model/Record Response
Model
To preprocess records, your Lambda function must be compliant with the required event input data and
record response models.
Event Input Data Model
Kinesis Data Analytics continuously reads data from your Kinesis data stream or Kinesis Data Firehose
delivery stream. For each batch of records it retrieves, the service manages how each batch gets passed
to your Lambda function. Your function receives a list of records as input. Within your function, you
iterate through the list and apply your business logic to accomplish your preprocessing requirements
(such as data format conversion or enrichment).
The input model to your preprocessing function varies slightly, depending on whether the data was
received from a Kinesis data stream or a Kinesis Data Firehose delivery stream.
If the source is a Kinesis Data Firehose delivery stream, the event input data model is as follows:
Kinesis Data Firehose Request Data Model
Field Description
invocationId The Lambda invocation Id (random GUID).
22
Amazon Kinesis Data Analytics Developer Guide
Preprocessing Data Using a Lambda Function
Field Description
applicationArn Kinesis Data Analytics application Amazon Resource Name (ARN)
streamArn Delivery stream ARN
records
Field Description
recordId record ID (random GUID)
kinesisFirehoseRecordMetadata
Field Description
approximateArrivalTimestampDelivery stream record
approximate arrival
time
data Base64-encoded source record payload
If the source is a Kinesis data stream, the event input data model is as follows:
Kinesis Streams Request Data Model
Field Description
invocationId The Lambda invocation Id (random GUID).
applicationArn Kinesis Data Analytics application ARN
streamArn Delivery stream ARN
records
Field Description
recordId record ID based off of Kinesis record sequence
number
kinesisStreamRecordMetadata
Field Description
sequenceNumberSequence number
from the Kinesis
stream record
partitionKeyPartition key from the
Kinesis stream record
shardId ShardId from the
Kinesis stream record
approximateArrivalTimestampDelivery stream record
approximate arrival
time
23
Amazon Kinesis Data Analytics Developer Guide
Preprocessing Data Using a Lambda Function
Field Description
Field Description
data Base64-encoded source record payload
Record Response Model
All records returned from your Lambda preprocessing function (with record IDs) that are sent to the
Lambda function must be returned. They must contain the following parameters, or Kinesis Data
Analytics rejects them and treats it as a data preprocessing failure. The data payload part of the record
can be transformed to accomplish preprocessing requirements.
Response Data Model
records
Field Description
recordId The record ID is passed from Kinesis Data Analytics to Lambda
during the invocation. The transformed record must contain the
same record ID. Any mismatch between the ID of the original
record and the ID of the transformed record is treated as a data
preprocessing failure.
result The status of the data transformation of the record. The possible
values are:
Ok: The record was transformed successfully. Kinesis Data
Analytics ingests the record for SQL processing.
Dropped: The record was dropped intentionally by your
processing logic. Kinesis Data Analytics drops the record
from SQL processing. The data payload field is optional for a
Dropped record.
ProcessingFailed: The record could not be transformed.
Kinesis Data Analytics considers it unsuccessfully processed
by your Lambda function and writes an error to the error
stream. For more information about the error stream, see Error
Handling (p. 40). The data payload field is optional for a
ProcessingFailed record.
data The transformed data payload, after base64-encoding. Each data
payload can contain multiple JSON documents if the application
ingestion data format is JSON. Or each can contain multiple CSV
rows (with a row delimiter specified in each row) if the application
ingestion data format is CSV. The Kinesis Data Analytics service
successfully parses and processes data with either multiple JSON
documents or CSV rows within the same data payload.
Common Data Preprocessing Failures
The following are common reasons why preprocessing can fail.
24
Amazon Kinesis Data Analytics Developer Guide
Preprocessing Data Using a Lambda Function
Not all records (with record IDs) in a batch that are sent to the Lambda function are returned back to
the Kinesis Data Analytics service.
The response is missing either the record ID, status, or data payload field. The data payload field is
optional for a Dropped or ProcessingFailed record.
The Lambda function timeouts are not sufficient to preprocess the data.
The Lambda function response exceeds the response limits imposed by the AWS Lambda service.
For data preprocessing failures, Kinesis Data Analytics continues to retry Lambda invocations on the
same set of records until successful. You can monitor the following CloudWatch metrics to gain insight
into failures.
Kinesis Data Analytics application MillisBehindLatest: Indicates how far behind an application is
reading from the streaming source.
Kinesis Data Analytics application InputPreprocessing CloudWatch metrics: Indicates the number
of successes and failures, among other statistics. For more information, see Amazon Kinesis Analytics
Metrics.
AWS Lambda function CloudWatch metrics and logs.
Creating Lambda Functions for Preprocessing
Your Amazon Kinesis Data Analytics application can use Lambda functions for preprocessing records as
they are ingested into the application. Kinesis Data Analytics provides the following templates on the
console to use as a starting point for preprocessing your data.
Topics
Creating a Preprocessing Lambda Function in Node.js (p. 25)
Creating a Preprocessing Lambda Function in Python (p. 25)
Creating a Preprocessing Lambda Function in Java (p. 26)
Creating a Preprocessing Lambda Function in .NET (p. 26)
Creating a Preprocessing Lambda Function in Node.js
The following templates for creating preprocessing Lambda function in Node.js are available on the
Kinesis Data Analytics console:
Lambda Blueprint Language and version Description
General Kinesis
Data Analytics Input
Processing
Node.js 6.10 A Kinesis Data Analytics record preprocessor
that receives JSON or CSV records as input and
then returns them with a processing status. Use
this processor as a starting point for custom
transformation logic.
Compressed Input
Processing
Node.js 6.10 A Kinesis Data Analytics record processor that
receives compressed (GZIP or Deflate compressed)
JSON or CSV records as input and returns
decompressed records with a processing status.
Creating a Preprocessing Lambda Function in Python
The following templates for creating preprocessing Lambda function in Python are available on the
console:
25
Amazon Kinesis Data Analytics Developer Guide
Preprocessing Data Using a Lambda Function
Lambda Blueprint Language and version Description
General Kinesis
Analytics Input
Processing
Python 2.7 A Kinesis Data Analytics record preprocessor
that receives JSON or CSV records as input and
then returns them with a processing status. Use
this processor as a starting point for custom
transformation logic.
KPL Input Processing Python 2.7 A Kinesis Data Analytics record processor that
receives Kinesis Producer Library (KPL) aggregates
of JSON or CSV records as input and returns
disaggregated records with a processing status.
Creating a Preprocessing Lambda Function in Java
To create a Lambda function in Java for preprocessing records, use the Java events classes.
The following code demonstrates a sample Lambda function that preprocesses records using Java:
public class LambdaFunctionHandler implements
RequestHandler<KinesisAnalyticsStreamsInputPreprocessingEvent,
KinesisAnalyticsInputPreprocessingResponse> {
@Override
public KinesisAnalyticsInputPreprocessingResponse handleRequest(
KinesisAnalyticsStreamsInputPreprocessingEvent event, Context context) {
context.getLogger().log("InvocatonId is : " + event.invocationId);
context.getLogger().log("StreamArn is : " + event.streamArn);
context.getLogger().log("ApplicationArn is : " + event.applicationArn);
List<KinesisAnalyticsInputPreprocessingResponse.Record> records = new
ArrayList<KinesisAnalyticsInputPreprocessingResponse.Record>();
KinesisAnalyticsInputPreprocessingResponse response = new
KinesisAnalyticsInputPreprocessingResponse(records);
event.records.stream().forEach(record -> {
context.getLogger().log("recordId is : " + record.recordId);
context.getLogger().log("record aat is :" +
record.kinesisStreamRecordMetadata.approximateArrivalTimestamp);
// Add your record.data pre-processing logic here.
// response.records.add(new Record(record.recordId,
KinesisAnalyticsInputPreprocessingResult.Ok, <preprocessedrecordData>));
});
return response;
}
}
Creating a Preprocessing Lambda Function in .NET
To create a Lambda function in .NET for preprocessing records, use the .NET events classes.
The following code demonstrates a sample Lambda function that preprocesses records using C#:
public class Function
{
public KinesisAnalyticsInputPreprocessingResponse
FunctionHandler(KinesisAnalyticsStreamsInputPreprocessingEvent evnt, ILambdaContext
context)
26
Amazon Kinesis Data Analytics Developer Guide
Parallelizing Input Streams for Increased Throughput
{
context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
context.Logger.LogLine($"StreamArn: {evnt.StreamArn}");
context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");
var response = new KinesisAnalyticsInputPreprocessingResponse
{
Records = new List<KinesisAnalyticsInputPreprocessingResponse.Record>()
};
foreach (var record in evnt.Records)
{
context.Logger.LogLine($"\tRecordId: {record.RecordId}");
context.Logger.LogLine($"\tShardId: {record.RecordMetadata.ShardId}");
context.Logger.LogLine($"\tPartitionKey:
{record.RecordMetadata.PartitionKey}");
context.Logger.LogLine($"\tRecord ApproximateArrivalTime:
{record.RecordMetadata.ApproximateArrivalTimestamp}");
context.Logger.LogLine($"\tData: {record.DecodeData()}");
// Add your record preprocessig logic here.
var preprocessedRecord = new
KinesisAnalyticsInputPreprocessingResponse.Record
{
RecordId = record.RecordId,
Result = KinesisAnalyticsInputPreprocessingResponse.OK
};
preprocessedRecord.EncodeData(record.DecodeData().ToUpperInvariant());
response.Records.Add(preprocessedRecord);
}
return response;
}
}
For more information about creating Lambda functions for preprocessing and destinations in .NET, see
Amazon.Lambda.KinesisAnalyticsEvents.
Parallelizing Input Streams for Increased Throughput
Amazon Kinesis Data Analytics applications can support multiple in-application input streams, to scale
an application beyond the throughput of a single in-application input stream. For more information on
in-application input streams, see Amazon Kinesis Data Analytics: How It Works (p. 3).
In almost all cases, Amazon Kinesis Data Analytics scales your application to handle the capacity of the
Kinesis streams or Kinesis Data Firehose source streams that feed into your application. However, if your
source stream's throughput exceeds the throughput of a single in-application input stream, you can
explicitly increase the number of in-application input streams that your application uses. You do so with
the InputParallelism parameter.
When the InputParallelism parameter is greater than one, Amazon Kinesis Data Analytics evenly
splits the partitions of your source stream among the in-application streams. For instance, if your source
stream has 50 shards, and you set InputParallelism to 2, each in-application input stream receives
the input from 25 source stream shards.
When you increase the number of in-application streams, your application must access the data in each
stream explicitly. For information about accessing multiple in-application streams in your code, see
Accessing Separate In-Application Streams in Your Amazon Kinesis Data Analytics Application (p. 29).
Although Kinesis Data Analytics and Kinesis Data Firehose stream shards are both divided among in-
application streams in the same way, they differ in the way they appear to your application:
27
Amazon Kinesis Data Analytics Developer Guide
Parallelizing Input Streams for Increased Throughput
The records from a Kinesis Data Analytics stream include a shard_id field that can be used to identify
the source shard for the record.
The records from a Kinesis Data Firehose stream don't include a field that identifies the record's source
shard or partition. This is because Kinesis Data Firehose abstracts this information away from your
application.
Evaluating Whether to Increase Your Number of In-Application
Input Streams
In most cases, a single in-application input stream can handle the throughput of a single source stream,
depending on the complexity and data size of the input streams. To determine if you need to increase
the number of in-application input streams, you can monitor the MillisBehindLatest metric in
Amazon CloudWatch. If the MillisBehindLatest metric has either of the following characteristics,
you should increase your application's InputParallelism setting:
The MillisBehindLatest metric is gradually increasing, indicating that your application is falling
behind the latest data in the stream.
The MillisBehindLatest metric is consistently above 1000 (one second).
You don't need to increase your application's InputParallelism setting if the following are true:
The MillisBehindLatest metric is gradually decreasing, indicating that your application is catching
up to the latest data in the stream.
The MillisBehindLatest metric is below 1000 (one second).
For more information on using CloudWatch, see the CloudWatch User Guide.
Implementing Multiple In-Application Input Streams
You can set the number of in-application input streams when an application is created
using CreateApplication (p. 191). You set this number after an application is created using
UpdateApplication (p. 220).
Note
You can only set the InputParallelism setting using the Amazon Kinesis Data Analytics
API or the AWS CLI. You cannot set this setting using the AWS Management Console. For
information on setting up the AWS CLI, see Step 2: Set Up the AWS Command Line Interface
(AWS CLI) (p. 46).
Setting a New Application's Input Stream Count
The following example demonstrates how to use the CreateApplication API action to set a new
application's input stream count to 2.
For more information about CreateApplication, see CreateApplication (p. 191).
{
"ApplicationCode": "<The SQL code the new application will run on the input stream>",
"ApplicationDescription": "<A friendly description for the new application>",
"ApplicationName": "<The name for the new application>",
"Inputs": [
{
"InputId": "ID for the new input stream",
"InputParallelism": {
28
Amazon Kinesis Data Analytics Developer Guide
Parallelizing Input Streams for Increased Throughput
"Count": 2
}],
"Outputs": [ ... ],
}]
}
Setting an Existing Application's Input Stream Count
The following example demonstrates how to use the UpdateApplication API action to set an existing
application's input stream count to 2.
For more information about Update_Application, see UpdateApplication (p. 220).
{
"InputUpdates": [
{
"InputId": "yourInputId",
"InputParallelismUpdate": {
"CountUpdate": 2
}
}
],
}
Accessing Separate In-Application Streams in Your Amazon
Kinesis Data Analytics Application
To use multiple in-application input streams in your application, you must explicitly select from the
different streams. The following code example demonstrates how to query multiple input streams in the
application created in the Getting Started tutorial.
In the following example, each source stream is first aggregated using COUNT before being combined
into a single in-application stream called in_application_stream001. Aggregating the source
streams beforehand helps make sure that the combined in-application stream can handle the traffic from
multiple streams without being overloaded.
Note
To run this example and get results from both in-application input streams, update both
the number of shards in your source stream and the InputParallelism parameter in your
application.
CREATE OR REPLACE STREAM in_application_stream_001 (
ticker VARCHAR(64),
ticker_count INTEGER
);
CREATE OR REPLACE PUMP pump001 AS
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_001
GROUP BY STEP(source_sql_stream_001.rowtime BY INTERVAL '60' SECOND),
ticker_symbol;
CREATE OR REPLACE PUMP pump002 AS
INSERT INTO in_application_stream_001
SELECT STREAM ticker_symbol, COUNT(ticker_symbol)
FROM source_sql_stream_002
GROUP BY STEP(source_sql_stream_002.rowtime BY INTERVAL '60' SECOND),
ticker_symbol;
29
Amazon Kinesis Data Analytics Developer Guide
Application Code
The preceding code example produces output in in_application_stream001 similar to the following:
Additional Considerations
When using multiple input streams, be aware of the following:
The maximum number of in-application input streams is 64.
The in-application input streams are distributed evenly among the shards of the application's input
stream.
The performance gains from adding in-application streams don't scale linearly. That is, doubling
the number of in-application streams doesn't double throughput. With a typical row size, each in-
application stream can achieve throughput of about 5,000 to 15,000 rows per second. By increasing
the in-application stream count to 10, you can achieve a throughput of 20,000 to 30,000 rows per
second. Throughput speed is dependent on the count, data types, and data size of the fields in the
input stream.
Some aggregate functions (such as AVG) can produce unexpected results when applied to input
streams partitioned into different shards. Because you need to run the aggregate operation on
individual shards before combining them into an aggregate stream, the results might be weighted
toward whichever stream contains more records.
If your application continues to experience poor performance (reflected by a high
MillisBehindLatest metric) after you increase your number of input streams, you might have
reached your limit of Kinesis Processing Units (KPUs). For more information, see Automatically Scaling
Applications to Increase Throughput (p. 43).
Application Code
Application code is a series of SQL statements that process input and produce output. These SQL
statements operate on in-application streams and reference tables. For more information, see Amazon
Kinesis Data Analytics: How It Works (p. 3).
For information about the SQL language elements that are supported by Kinesis Data Analytics, see
Amazon Kinesis Data Analytics SQL Reference.
In relational databases, you work with tables, using INSERT statements to add records and the SELECT
statement to query the data. In Amazon Kinesis Data Analytics, you work with streams. You can write
30
Amazon Kinesis Data Analytics Developer Guide
Application Code
a SQL statement to query these streams. The results of querying one in-application stream are always
sent to another in-application stream. When performing complex analytics, you might create several
in-application streams to hold the results of intermediate analytics. And then finally, you configure
application output to persist results of the final analytics (from one or more in-application streams) to
external destinations. In summary, the following is a typical pattern for writing application code:
The SELECT statement is always used in the context of an INSERT statement. That is, when you select
rows, you insert results into another in-application stream.
The INSERT statement is always used in the context of a pump. That is, you use pumps to write to an
in-application stream.
The following example application code reads records from one in-application
(SOURCE_SQL_STREAM_001) stream and write it to another in-application stream
(DESTINATION_SQL_STREAM). You can insert records to in-application streams using pumps, as shown
following:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4),
change DOUBLE,
price DOUBLE);
-- Create a pump and insert into output stream.
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM ticker_symbol, change,price
FROM "SOURCE_SQL_STREAM_001";
Note
The identifiers that you specify for stream names and column names follow standard SQL
conventions. For example, if you put quotation marks around an identifier, it makes the
identifier case sensitive. If you don't, the identifier defaults to uppercase. For more information
about identifiers, see Identifiers in the Amazon Kinesis Data Analytics SQL Reference.
Your application code can consist of many SQL statements. For example:
You can write SQL queries in a sequential manner where the result of one SQL statement feeds into
the next SQL statement.
You can also write SQL queries that run independent of each other. For example, you can write
two SQL statements that query the same in-application stream, but send output into different in-
applications streams. You can then query the newly created in-application streams independently.
You can create in-application streams to save intermediate results. You insert data in in-application
streams using pumps. For more information, see In-Application Streams and Pumps (p. 66).
If you add an in-application reference table, you can write SQL to join data in in-application streams and
reference tables. For more information, see Example: Adding Reference Data to a Kinesis Data Analytics
Application (p. 116).
According to the application's output configuration, Amazon Kinesis Data Analytics writes data from
specific in-application streams to the external destination according to the application's output
configuration. Make sure that your application code writes to the in-application streams specified in the
output configuration.
For more information, see the following topics:
Streaming SQL Concepts (p. 66)
Amazon Kinesis Data Analytics SQL Reference
31
Amazon Kinesis Data Analytics Developer Guide
Output
Configuring Application Output
In your application code, you write the output of SQL statements to one or more in-application streams.
You can optionally add an output configuration to your application. to persist everything written to an
in-application stream to an external destination such as an Amazon Kinesis data stream, a Kinesis Data
Firehose delivery stream, or an AWS Lambda function.
There is a limit on the number of external destinations you can use to persist an application output. For
more information, see Limits (p. 154).
Note
We recommend that you use one external destination to persist in-application error stream data
so that you can investigate the errors.
In each of these output configurations, you provide the following:
In-application stream name – The stream that you want to persist to an external destination.
Kinesis Data Analytics looks for the in-application stream that you specified in the output
configuration. (The stream name is case sensitive and must match exactly.) Make sure that your
application code creates this in-application stream.
External destination – You can persist data to a Kinesis data stream, a Kinesis Data Firehose delivery
stream, or a Lambda function. You provide the Amazon Resource Name (ARN) of the stream or
function. You also provide an IAM role that Kinesis Data Analytics can assume to write to the stream or
function on your behalf. You describe the record format (JSON, CSV) to Kinesis Data Analytics to use
when writing to the external destination.
If Kinesis Data Analytics can't write to the streaming or Lambda destination, the service continues to
try indefinitely. This creates back pressure, causing your application to fall behind. If this issue is not
resolved, your application eventually stops processing new data. You can monitor Kinesis Data Analytics
Metrics and set alarms for failures. For more information about metrics and alarms, see Using Amazon
CloudWatch Metrics and Creating Amazon CloudWatch Alarms.
You can configure the application output using the AWS Management Console. The console makes the
API call to save the configuration.
Creating an Output Using the AWS CLI
This section describes how to create the Outputs section of the request body for a
CreateApplication or AddApplicationOutput operation.
Creating a Kinesis Stream Output
The following JSON fragment shows the Outputs section in the CreateApplication request body for
creating an Amazon Kinesis data stream destination.
"Outputs": [
{
"DestinationSchema": {
"RecordFormatType": "string"
},
"KinesisStreamsOutput": {
"ResourceARN": "string",
"RoleARN": "string"
},
32
Amazon Kinesis Data Analytics Developer Guide
Using a Lambda Function as Output
"Name": "string"
}
]
Creating a Kinesis Data Firehose Delivery Stream Output
The following JSON fragment shows the Outputs section in the CreateApplication request body for
creating an Amazon Kinesis Data Firehose delivery stream destination.
"Outputs": [
{
"DestinationSchema": {
"RecordFormatType": "string"
},
"KinesisFirehoseOutput": {
"ResourceARN": "string",
"RoleARN": "string"
},
"Name": "string"
}
]
Creating a Lambda Function Output
The following JSON fragment shows the Outputs section in the CreateApplication request body for
creating an AWS Lambda function destination.
"Outputs": [
{
"DestinationSchema": {
"RecordFormatType": "string"
},
"LambdaOutput": {
"ResourceARN": "string",
"RoleARN": "string"
},
"Name": "string"
}
]
Using a Lambda Function as Output
Using AWS Lambda as a destination allows you to more easily perform post-processing of your SQL
results before sending them to a final destination. Common post-processing tasks include the following:
Aggregating multiple rows into a single record
Combining current results with past results to address late-arriving data
Delivering to different destinations based on the type of information
Record format translation (such as translating to Protobuf)
String manipulation or transformation
Data enrichment after analytical processing
Custom processing for geospatial use cases
Data encryption
33
Amazon Kinesis Data Analytics Developer Guide
Using a Lambda Function as Output
Lambda functions can deliver analytic information to a variety of AWS services and other destinations,
including the following:
Amazon Simple Storage Service (Amazon S3)
Custom APIs
Amazon DynamoDB
Apache Aurora
Amazon Redshift
Amazon Simple Notification Service (Amazon SNS)
Amazon Simple Queue Service (Amazon SQS)
Amazon CloudWatch
For more information about creating Lambda applications, see Getting Started with AWS Lambda.
Topics
Lambda as Output Permissions (p. 34)
Lambda as Output Metrics (p. 34)
Lambda as Output Event Input Data Model and Record Response Model (p. 34)
Lambda Output Invocation Frequency (p. 36)
Adding a Lambda Function for Use as an Output (p. 36)
Common Lambda as Output Failures (p. 37)
Creating Lambda Functions for Application Destinations (p. 37)
Lambda as Output Permissions
To use Lambda as output, the application’s Lambda output IAM role requires the following permissions
policy:
{
"Sid": "UseLambdaFunction",
"Effect": "Allow",
"Action": [
"lambda:InvokeFunction",
"lambda:GetFunctionConfiguration"
],
"Resource": "FunctionARN"
}
Lambda as Output Metrics
You use Amazon CloudWatch to monitor the number of bytes sent, successes and failures, and so on.
For information about CloudWatch metrics that are emitted by Kinesis Data Analytics using Lambda as
output, see Amazon Kinesis Analytics Metrics.
Lambda as Output Event Input Data Model and Record
Response Model
To send Kinesis Data Analytics output records, your Lambda function must be compliant with the
required event input data and record response models.
34
Amazon Kinesis Data Analytics Developer Guide
Using a Lambda Function as Output
Event Input Data Model
Kinesis Data Analytics continuously sends the output records from the application to the Lambda as an
output function with the following request model. Within your function, you iterate through the list and
apply your business logic to accomplish your output requirements (such as data transformation before
sending to a final destination).
Field Description
invocationId The Lambda invocation ID (random GUID).
applicationArn The Kinesis Data Analytics application Amazon Resource Name
(ARN).
records
Field Description
recordId record ID (random GUID)
lambdaDeliveryRecordMetadata
Field Description
retryHint Number of delivery
retries
data Base64-encoded output record payload
Note
The retryHint is a value that increases for every delivery failure. This value is not durably
persisted, and resets if the application is disrupted.
Record Response Model
Each record sent to your Lambda as an output function (with record IDs) must be acknowledged with
either Ok or DeliveryFailed, and it must contain the following parameters. Otherwise, Kinesis Data
Analytics treats them as a delivery failure.
records
Field Description
recordId The record ID is passed from Kinesis Data Analytics to Lambda
during the invocation. Any mismatch between the ID of the
original record and the ID of the acknowledged record is treated as
a delivery failure.
result The status of the delivery of the record. The following are possible
values:
Ok: The record was transformed successfully and sent to the
final destination. Kinesis Data Analytics ingests the record for
SQL processing.
DeliveryFailed: The record was not delivered successfully to
the final destination by the Lambda as output function. Kinesis
35
Amazon Kinesis Data Analytics Developer Guide
Using a Lambda Function as Output
Field Description
Data Analytics continuously retries sending the delivery failed
records to the Lambda as output function.
Lambda Output Invocation Frequency
A Kinesis Data Analytics application buffers the output records and invokes the AWS Lambda destination
function frequently.
If records are emitted to the destination in-application stream within the data analytics application
as a tumbling window, the AWS Lambda destination function is invoked per tumbling window trigger.
For example, if a tumbling window of 60 seconds is used to emit the records to the destination in-
application stream, the Lambda function is invoked once every 60 seconds.
If records are emitted to the destination in-application stream within the application as a continuous
query or a sliding window, the Lambda destination function is invoked about once per second.
Note
Per-Lambda function invoke request payload size limits apply. Exceeding those limits results in
output records being split and sent across multiple Lambda function calls.
Adding a Lambda Function for Use as an Output
The following procedure demonstrates how to add a Lambda function as an output for a Kinesis Data
Analytics application.
1. Sign in to the AWS Management Console and open the Kinesis Data Analytics console at https://
console.aws.amazon.com/kinesisanalytics.
2. Choose the application in the list, and then choose Application details.
3. In the Destination section, choose Connect new destination.
4. For the Destination item, choose AWS Lambda function.
5. In the Deliver records to AWS Lambda section, either choose an existing Lambda function or choose
Create new.
6. If you are creating a new Lambda function, do the following:
a. Choose one of the templates provided. For more information, Creating Lambda Functions for
Application Destinations (p. 37).
b. The Create Function page opens in a new browser tab. In the Name box, give the function a
meaningful name (for example, myLambdaFunction).
c. Update the template with post-processing functionality for your application. For information
about creating a Lambda function, see Getting Started in the AWS Lambda Developer Guide.
d. On the Kinesis Data Analytics console, in the Lambda function list, choose the Lambda function
that you just created.
7. In the In-application stream section, choose Choose an existing in-application stream. For In-
application stream name, choose your application's output stream. The results from the selected
output stream are sent to the Lambda output function.
8. Leave the rest of the form with the default values, and choose Save and continue.
Your application now sends records from the in-application stream to your Lambda function. You
can see the results of the default template in the Amazon CloudWatch console. Monitor the AWS/
36
Amazon Kinesis Data Analytics Developer Guide
Using a Lambda Function as Output
KinesisAnalytics/LambdaDelivery.OkRecords metric to see the number of records being
delivered to the Lambda function.
Common Lambda as Output Failures
The following are common reasons why delivery to a Lambda function can fail.
Not all records (with record IDs) in a batch that are sent to the Lambda function are returned to the
Kinesis Data Analytics service.
The response is missing either the record ID or the status field.
The Lambda function timeouts are not sufficient to accomplish the business logic within the Lambda
function.
The business logic within the Lambda function does not catch all the errors, resulting in a timeout and
backpressure due to unhandled exceptions. These are often referred as “poison pill” messages.
For data delivery failures, Kinesis Data Analytics continues to retry Lambda invocations on the same
set of records until successful. To gain insight into failures, you can monitor the following CloudWatch
metrics:
Kinesis Data Analytics application Lambda as Output CloudWatch metrics: Indicates the number of
successes and failures, among other statistics. For more information, see Amazon Kinesis Analytics
Metrics.
AWS Lambda function CloudWatch metrics and logs.
Creating Lambda Functions for Application Destinations
Your Kinesis Data Analytics application can use AWS Lambda functions as an output. Kinesis Data
Analytics provides templates for creating Lambda functions to use as a destination for your applications.
Use these templates as a starting point for post-processing output from your application.
Topics
Creating a Lambda Function Destination in Node.js (p. 37)
Creating a Lambda Function Destination in Python (p. 37)
Creating a Lambda Function Destination in Java (p. 38)
Creating a Lambda Function Destination in .NET (p. 38)
Creating a Lambda Function Destination in Node.js
The following template for creating a destination Lambda function in Node.js is available on the console:
Lambda as Output Blueprint Language and Version Description
kinesis-analytics-output Node.js 6.10 Deliver output records from
a Kinesis Data Analytics
application to a custom
destination.
Creating a Lambda Function Destination in Python
The following templates for creating a destination Lambda function in Python are available on the
console:
37
Amazon Kinesis Data Analytics Developer Guide
Using a Lambda Function as Output
Lambda as Output Blueprint Language and Version Description
kinesis-analytics-
output-sns
Python 2.7 Deliver output records from
a Kinesis Data Analytics
application to Amazon SNS.
kinesis-analytics-
output-ddb
Python 2.7 Deliver output records from
a Kinesis Data Analytics
application to Amazon
DynamoDB.
Creating a Lambda Function Destination in Java
To create a destination Lambda function in Java, use the Java events classes.
The following code demonstrates a sample destination Lambda function using Java:
public class LambdaFunctionHandler
implements RequestHandler<KinesisAnalyticsOutputDeliveryEvent,
KinesisAnalyticsOutputDeliveryResponse> {
@Override
public KinesisAnalyticsOutputDeliveryResponse
handleRequest(KinesisAnalyticsOutputDeliveryEvent event,
Context context) {
context.getLogger().log("InvocatonId is : " + event.invocationId);
context.getLogger().log("ApplicationArn is : " + event.applicationArn);
List<KinesisAnalyticsOutputDeliveryResponse.Record> records = new
ArrayList<KinesisAnalyticsOutputDeliveryResponse.Record>();
KinesisAnalyticsOutputDeliveryResponse response = new
KinesisAnalyticsOutputDeliveryResponse(records);
event.records.stream().forEach(record -> {
context.getLogger().log("recordId is : " + record.recordId);
context.getLogger().log("record retryHint is :" +
record.lambdaDeliveryRecordMetadata.retryHint);
// Add logic here to transform and send the record to final destination of your
choice.
response.records.add(new Record(record.recordId,
KinesisAnalyticsOutputDeliveryResult.Ok));
});
return response;
}
}
Creating a Lambda Function Destination in .NET
To create a destination Lambda function in .NET, use the .NET events classes.
The following code demonstrates a sample destination Lambda function using C#:
public class Function
{
public KinesisAnalyticsOutputDeliveryResponse
FunctionHandler(KinesisAnalyticsOutputDeliveryEvent evnt, ILambdaContext context)
{
context.Logger.LogLine($"InvocationId: {evnt.InvocationId}");
context.Logger.LogLine($"ApplicationArn: {evnt.ApplicationArn}");
38
Amazon Kinesis Data Analytics Developer Guide
Application Output Delivery Model
var response = new KinesisAnalyticsOutputDeliveryResponse
{
Records = new List<KinesisAnalyticsOutputDeliveryResponse.Record>()
};
foreach (var record in evnt.Records)
{
context.Logger.LogLine($"\tRecordId: {record.RecordId}");
context.Logger.LogLine($"\tRetryHint: {record.RecordMetadata.RetryHint}");
context.Logger.LogLine($"\tData: {record.DecodeData()}");
// Add logic here to send to the record to final destination of your
choice.
var deliveredRecord = new KinesisAnalyticsOutputDeliveryResponse.Record
{
RecordId = record.RecordId,
Result = KinesisAnalyticsOutputDeliveryResponse.OK
};
response.Records.Add(deliveredRecord);
}
return response;
}
}
For more information about creating Lambda functions for pre-processing and destinations in .NET, see
Amazon.Lambda.KinesisAnalyticsEvents.
Delivery Model for Persisting Application Output to
an External Destination
Amazon Kinesis Data Analytics uses an "at least once" delivery model for application output to
the configured destinations. When an application is running, Kinesis Data Analytics takes internal
checkpoints. These checkpoints are points in time when output records have been delivered to
the destinations without data loss. The service uses the checkpoints as needed to ensure that your
application output is delivered at least once to the configured destinations.
In a normal situation, your application processes incoming data continuously. Kinesis Data Analytics
writes the output to the configured destinations, such as a Kinesis data stream or a Kinesis Data Firehose
delivery stream. However, your application can be interrupted occasionally, for example:
You choose to stop your application and restart it later.
You delete the IAM role that Kinesis Data Analytics needs to write your application output to the
configured destination. Without the IAM role, Kinesis Data Analytics doesn't have any permissions to
write to the external destination on your behalf.
A network outage or other internal service failure causes your application to stop running
momentarily.
When your application restarts, Kinesis Data Analytics ensures that it continues to process and write
output from a point before or equal to when the failure occurred. This helps ensure that it doesn't miss
delivering any application output to the configured destinations.
Suppose that you configured multiple destinations from the same in-application stream. After the
application recovers from failure, Kinesis Data Analytics resumes persisting output to the configured
destinations from the last record that was delivered to the slowest destination. This might result in
the same output record delivered more than once to other destinations. In this case, you must handle
potential duplications in the destination externally.
39
Amazon Kinesis Data Analytics Developer Guide
Error Handling
Error Handling
Amazon Kinesis Data Analytics returns API or SQL errors directly to you. For more information about API
operations, see Actions (p. 176). For more information about handling SQL errors, see Amazon Kinesis
Data Analytics SQL Reference.
Amazon Kinesis Data Analytics reports runtime errors using an in-application error stream called
error_stream.
Reporting Errors Using an In-Application Error
Stream
Amazon Kinesis Data Analytics reports runtime errors to the in-application error stream called
error_stream. The following are examples of errors that might occur:
A record read from the streaming source does not conform to the input schema.
Your application code specifies division by zero.
The rows are out of order (for example, a record appears on the stream with a ROWTIME value that a
user modified that causes a record to go out of order).
The data in the source stream can't be converted to the data type specified in the schema (Coercion
error). For information about what data types can be converted, see Mapping JSON Data Types to SQL
Data Types (p. 15).
We recommend that you handle these errors programmatically in your SQL code or persist the data
on the error stream to an external destination. This requires that you add an output configuration (see
Configuring Application Output (p. 32)) to your application. For an example of how the in-application
error stream works, see Example: Exploring the In-Application Error Stream (p. 141).
Note
Your Kinesis data analytics application can't access or modify the error stream programmatically
because the error stream is created using the system account. You must use the error output to
determine what errors your application might encounter. You then write your application's SQL
code to handle anticipated error conditions.
Error Stream Schema
The error stream has the following schema:
Field Data Type Notes
ERROR_TIME TIMESTAMP The time when the error
occurred
ERROR_LEVEL VARCHAR(10)
ERROR_NAME VARCHAR(32)
MESSAGE VARCHAR(4096) 
DATA_ROWTIME TIMESTAMP The row time of the incoming
record
DATA_ROW VARCHAR(49152) The hex-encoded data in the
original row. You can use
40
Amazon Kinesis Data Analytics Developer Guide
Granting Permissions
standard libraries to hex decode
this value.
PUMP_NAME VARCHAR(128) The originating pump, as
defined with CREATE PUMP
Granting Amazon Kinesis Data Analytics
Permissions to Access Streaming and Reference
Sources (Creating an IAM Role)
Amazon Kinesis Data Analytics needs permissions to read records from a streaming source that you
specify in your application input configuration. Amazon Kinesis Data Analytics also needs permissions to
write your application output to streams that you specify in your application output configuration.
You can grant these permissions by creating an IAM role that Amazon Kinesis Data Analytics can assume.
Permissions that you grant to this role determine what Amazon Kinesis Data Analytics can do when the
service assumes the role.
Note
The information in this section is useful if you want to create an IAM role yourself. When you
create an application in the Amazon Kinesis Data Analytics console, the console can create an
IAM role for you at that time. The console uses the following naming convention for IAM roles
that it creates:
kinesis-analytics-ApplicationName
After the role is created, you can review the role and attached policies in the IAM console.
Each IAM role has two policies attached to it. In the trust policy, you specify who can assume the role. In
the permissions policy (there can be one or more), you specify the permissions that you want to grant to
this role. The following sections describe these policies, which you can use when you create an IAM role.
Trust Policy
To grant Amazon Kinesis Data Analytics permissions to assume a role to access a streaming or reference
source, you can attach the following trust policy to an IAM role:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Principal": {
"Service": "kinesisanalytics.amazonaws.com"
},
"Action": "sts:AssumeRole"
}
]
}
Permissions Policy
If you are creating an IAM role to allow Amazon Kinesis Data Analytics to read from an application's
streaming source, you must grant permissions for relevant read actions. Depending on your source (for
41
Amazon Kinesis Data Analytics Developer Guide
Permissions Policy
example, an Kinesis stream, a Kinesis Data Firehose delivery stream, or a reference source in an Amazon
S3 bucket), you can attach the following permissions policy.
Permissions Policy for Reading an Kinesis Stream
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadInputKinesis",
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:GetShardIterator",
"kinesis:GetRecords",
"kinesis:ListShards"
],
"Resource": [
"arn:aws:kinesis:aws-region:aws-account-id:stream/inputStreamName"
]
}
]
}
Permissions Policy for Reading a Kinesis Data Firehose Delivery
Stream
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "ReadInputFirehose",
"Effect": "Allow",
"Action": [
"firehose:DescribeDeliveryStream",
"firehose:Get*"
],
"Resource": [
"arn:aws:firehose:aws-region:aws-account-
id:deliverystream/inputFirehoseName"
]
}
]
}
Note
The firehose:Get* permission refers to an internal accessor that Kinesis Data Analytics uses
to access the stream. There is no public accessor for a Kinesis Data Firehose delivery stream.
If you direct Amazon Kinesis Data Analytics to write output to external destinations in your application
output configuration, you need to grant the following permission to the IAM role.
Permissions Policy for Writing to a Kinesis Stream
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "WriteOutputKinesis",
42
Amazon Kinesis Data Analytics Developer Guide
Auto Scaling Applications
"Effect": "Allow",
"Action": [
"kinesis:DescribeStream",
"kinesis:PutRecord",
"kinesis:PutRecords"
],
"Resource": [
"arn:aws:kinesis:aws-region:aws-account-id:stream/output-stream-name"
]
}
]
}
Permissions Policy for Writing to a Firehose Delivery Stream
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "WriteOutputFirehose",
"Effect": "Allow",
"Action": [
"firehose:DescribeDeliveryStream",
"firehose:PutRecord",
"firehose:PutRecordBatch"
],
"Resource": [
"arn:aws:firehose:aws-region:aws-account-id:deliverystream/output-firehose-
name"
]
}
]
}
Permissions Policy for Reading a Reference Data Source from an
Amazon S3 Bucket
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"s3:Get*",
"s3:List*"
],
"Resource": "*"
}
]
}
Automatically Scaling Applications to Increase
Throughput
Amazon Kinesis Data Analytics elastically scales your application to accommodate the data throughput
of your source stream and your query complexity for most scenarios. Kinesis Data Analytics provisions
43
Amazon Kinesis Data Analytics Developer Guide
Auto Scaling Applications
capacity in the form of Kinesis Processing Units (KPU). A single KPU provides you with the memory (4 GB)
and corresponding computing and networking.
The default limit for KPUs for your application is eight. For instructions on how to request an increase to
this limit, see To request a limit increase in AWS Service Limits.
Note
The drop-down item that is used to select a limit increase for KPUs is not yet available. When
requesting an increase, choose the following options on the support form:
Regarding: Service limit increase
Limit Type: Kinesis Analytics
Region: Select your application's Region
Limit: Number of applications limit
New limit value: 100
Use Case Description: Provide your application prefix, and specify that you are requesting a
limit increase for KPUs.
44
Amazon Kinesis Data Analytics Developer Guide
Step 1: Set Up an Account
Getting Started with Amazon Kinesis
Data Analytics
Following, you can find topics to help get you started using Amazon Kinesis Data Analytics. If you are
new to Kinesis Data Analytics, we recommend that you review the concepts and terminology presented
in Amazon Kinesis Data Analytics: How It Works (p. 3) before performing the steps in the Getting Started
section.
Topics
Step 1: Set Up an AWS Account and Create an Administrator User (p. 45)
Step 2: Set Up the AWS Command Line Interface (AWS CLI) (p. 46)
Step 3: Create Your Starter Amazon Kinesis Data Analytics Application (p. 47)
Step 4 (Optional) Edit the Schema and SQL Code Using the Console (p. 56)
Step 1: Set Up an AWS Account and Create an
Administrator User
Before you use Amazon Kinesis Data Analytics for the first time, complete the following tasks:
1. Sign Up for AWS (p. 45)
2. Create an IAM User (p. 46)
Sign Up for AWS
When you sign up for Amazon Web Services (AWS), your AWS account is automatically signed up for all
services in AWS, including Amazon Kinesis Data Analytics. You are charged only for the services that you
use.
With Kinesis Data Analytics, you pay only for the resources you use. If you are a new AWS customer, you
can get started with Kinesis Data Analytics for free. For more information, see AWS Free Usage Tier.
If you already have an AWS account, skip to the next task. If you don't have an AWS account, perform the
steps in the following procedure to create one.
To create an AWS account
1. Open https://aws.amazon.com/, and then choose Create an AWS Account.
Note
If you previously signed in to the AWS Management Console using AWS account root user
credentials, choose Sign in to a different account. If you previously signed in to the console
using IAM credentials, choose Sign-in using root account credentials. Then choose Create
a new AWS account.
2. Follow the online instructions.
45
Amazon Kinesis Data Analytics Developer Guide
Create an IAM User
Part of the sign-up procedure involves receiving a phone call and entering a verification code using
the phone keypad.
Note your AWS account ID because you'll need it for the next task.
Create an IAM User
Services in AWS, such as Amazon Kinesis Data Analytics, require that you provide credentials when you
access them so that the service can determine whether you have permissions to access the resources
owned by that service. The console requires your password. You can create access keys for your AWS
account to access the AWS CLI or API. However, we don't recommend that you access AWS using the
credentials for your AWS account. Instead, we recommend that you use AWS Identity and Access
Management (IAM). Create an IAM user, add the user to an IAM group with administrative permissions,
and then grant administrative permissions to the IAM user that you created. You can then access AWS
using a special URL and that IAM user's credentials.
If you signed up for AWS, but you haven't created an IAM user for yourself, you can create one using the
IAM console.
The Getting Started exercises in this guide assume that you have a user (adminuser) with administrator
privileges. Follow the procedure to create adminuser in your account.
To create an administrator user and sign in to the console
1. Create an administrator user called adminuser in your AWS account. For instructions, see Creating
Your First IAM User and Administrators Group in the IAM User Guide.
2. A user can sign in to the AWS Management Console using a special URL. For more information, How
Users Sign In to Your Account in the IAM User Guide.
For more information about IAM, see the following:
AWS Identity and Access Management (IAM)
Getting Started
IAM User Guide
Next Step
Step 2: Set Up the AWS Command Line Interface (AWS CLI) (p. 46)
Step 2: Set Up the AWS Command Line Interface
(AWS CLI)
Follow the steps to download and configure the AWS Command Line Interface (AWS CLI).
Important
You don't need the AWS CLI to perform the steps in the Getting Started exercise. However, some
of the exercises in this guide use the AWS CLI. You can skip this step and go to Step 3: Create
Your Starter Amazon Kinesis Data Analytics Application (p. 47), and then set up the AWS CLI
later when you need it.
46
Amazon Kinesis Data Analytics Developer Guide
Next Step
To set up the AWS CLI
1. Download and configure the AWS CLI. For instructions, see the following topics in the AWS
Command Line Interface User Guide:
Getting Set Up with the AWS Command Line Interface
Configuring the AWS Command Line Interface
2. Add a named profile for the administrator user in the AWS CLI config file. You use this profile when
executing the AWS CLI commands. For more information about named profiles, see Named Profiles
in the AWS Command Line Interface User Guide.
[profile adminuser]
aws_access_key_id = adminuser access key ID
aws_secret_access_key = adminuser secret access key
region = aws-region
For a list of available AWS Regions, see Regions and Endpoints in the Amazon Web Services General
Reference.
3. Verify the setup by entering the following help command at the command prompt:
aws help
Next Step
Step 3: Create Your Starter Amazon Kinesis Data Analytics Application (p. 47)
Step 3: Create Your Starter Amazon Kinesis Data
Analytics Application
By following the steps in this section, you can create your first Kinesis Data Analytics application using
the console.
Note
We suggest that you review Amazon Kinesis Data Analytics: How It Works (p. 3) before trying the
Getting Started exercise.
For this Getting Started exercise, you can use the console to work with either the demo stream or
templates with application code.
If you choose to use the demo stream, the console creates a Kinesis data stream in your account that is
called kinesis-analytics-demo-stream.
A Kinesis data analytics application requires a streaming source. For this source, several SQL examples
in this guide use the demo stream kinesis-analytics-demo-stream. The console also runs a
script that continuously adds sample data (simulated stock trade records) to this stream, as shown
following.
47
Amazon Kinesis Data Analytics Developer Guide
Step 3: Create Your Starter Analytics Application
You can use kinesis-analytics-demo-stream as the streaming source for your application in this
exercise.
Note
The demo stream remains in your account. You can use it to test other examples in this guide.
However, when you leave the console, the script that the console uses stops populating the
data. When needed, the console provides the option to start populating the stream again.
If you choose to use the templates with example application code, you use template code that the
console provides to perform simple analytics on the demo stream.
You use these features to quickly set up your first application as follows:
1. Create an application – You only need to provide a name. The console creates the application and the
service sets the application state to READY.
2. Configure input – First, you add a streaming source, the demo stream. You must create a demo stream
in the console before you can use it. Then, the console takes a random sample of records on the demo
stream and infers a schema for the in-application input stream that is created. The console names the
in-application stream SOURCE_SQL_STREAM_001.
The console uses the discovery API to infer the schema. If necessary, you can edit the inferred schema.
For more information, see DiscoverInputSchema (p. 210). Kinesis Data Analytics uses this schema to
create an in-application stream.
When you start the application, Kinesis Data Analytics reads the demo stream continuously on your
behalf and inserts rows in the SOURCE_SQL_STREAM_001 in-application input stream.
3. Specify application code – You use a template (called Continuous filter) that provides the following
code:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
(symbol VARCHAR(4), sector VARCHAR(12), CHANGE DOUBLE, price DOUBLE);
-- Create pump to insert into output.
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
48
Amazon Kinesis Data Analytics Developer Guide
Step 3.1: Create an Application
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM ticker_symbol, sector, CHANGE, price
FROM "SOURCE_SQL_STREAM_001"
WHERE sector SIMILAR TO '%TECH%';
The application code queries the in-application stream SOURCE_SQL_STREAM_001. The code then
inserts the resulting rows in another in-application stream DESTINATION_SQL_STREAM, using pumps.
For more information about this coding pattern, see Application Code (p. 30).
For information about the SQL language elements that are supported by Kinesis Data Analytics, see
Amazon Kinesis Data Analytics SQL Reference.
4. Configuring output – In this exercise, you don't configure any output. That is, you don't persist data in
the in-application stream that your application creates to any external destination. Instead, you verify
query results in the console. Additional examples in this guide show how to configure output. For one
example, see Example: Creating Simple Alerts (p. 139).
Important
The exercise uses the US East (N. Virginia) Region (us-east-1) to set up the application. You can
use any of the supported AWS Regions.
Next Step
Step 3.1: Create an Application (p. 49)
Step 3.1: Create an Application
In this section, you create an Amazon Kinesis Data Analytics application. You configure application input
in the next step.
To create a data analytics application
1. Sign in to the AWS Management Console and open the Kinesis Data Analytics console at https://
console.aws.amazon.com/kinesisanalytics.
2. Choose Create new application.
3. On the New application page, type an application name, type a description, and then choose Save
and continue.
Doing this creates a Kinesis data analytics application with a status of READY. The console shows the
application hub where you can configure input and output.
49
Amazon Kinesis Data Analytics Developer Guide
Step 3.2: Configure Input
Note
To create an application, the CreateApplication (p. 191) operation requires only the
application name. You can add input and output configuration after you create an
application in the console.
In the next step, you configure input for the application. In the input configuration, you add a
streaming data source to the application and discover a schema for an in-application input stream
by sampling data on the streaming source.
Next Step
Step 3.2: Configure Input (p. 50)
Step 3.2: Configure Input
Your application needs a streaming source. To help you get started, the console can create a demo
stream (called kinesis-analytics-demo-stream). The console also runs a script that populates
records in the stream.
To add a streaming source to your application
1. On the application hub page in the console, choose Connect to a source.
2. On the page that appears, review the following:
Source section, where you specify a streaming source for your application. You can select an
existing stream source or create one. In this exercise, you create a new stream, the demo stream.
By default the console names the in-application input stream that is created as
INPUT_SQL_STREAM_001. For this exercise, keep this name as it appears.
50
Amazon Kinesis Data Analytics Developer Guide
Step 3.2: Configure Input
Stream reference name – This option shows the name of the in-application input stream that is
created, SOURCE_SQL_STREAM_001. You can change the name, but for this exercise, keep this
name.
In the input configuration, you map the demo stream to an in-application input stream that is
created. When you start the application, Amazon Kinesis Data Analytics continuously reads the
demo stream and insert rows in the in-application input stream. You query this in-application
input stream in your application code.
Record pre-processing with AWS Lambda: This option is where you specify an AWS Lambda
expression that modifies the records in the input stream before your application code executes.
In this exercise, leave the Disabled option selected. For more information about Lambda
preprocessing, see Preprocessing Data Using a Lambda Function (p. 21).
After you provide all the information on this page, the console sends an update request (see
UpdateApplication (p. 220)) to add the input configuration the application.
3. On the Source page, choose Configure a new stream.
4. Choose Create demo stream. The console configures the application input by doing the following:
The console creates a Kinesis data stream called kinesis-analytics-demo-stream.
The console populates the stream with sample stock ticker data.
Using the DiscoverInputSchema (p. 210) input action, the console infers a schema by reading
sample records on the stream. The schema that is inferred is the schema for the in-application
input stream that is created. For more information, see Configuring Application Input (p. 5).
The console shows the inferred schema and the sample data it read from the streaming source to
infer the schema.
The console displays the sample records on the streaming source.
The following appear on the Stream sample console page:
The Raw stream sample tab shows the raw stream records sampled by the
DiscoverInputSchema (p. 210) API action to infer the schema.
51
Amazon Kinesis Data Analytics Developer Guide
Step 3.3: Add Real-Time Analytics (Add Application Code)
The Formatted stream sample tab shows the tabular version of the data in the Raw stream
sample tab.
If you choose Edit schema, you can edit the inferred schema. For this exercise, don't change the
inferred schema. For more information about editing a schema, see Working with the Schema
Editor (p. 56).
If you choose Rediscover schema, you can request the console to run
DiscoverInputSchema (p. 210) again and infer the schema.
5. Choose Save and continue.
You now have an application with input configuration added to it. In the next step, you add SQL
code to perform some analytics on the data in-application input stream.
Next Step
Step 3.3: Add Real-Time Analytics (Add Application Code) (p. 52)
Step 3.3: Add Real-Time Analytics (Add Application
Code)
You can write your own SQL queries against the in-application stream, but for the following step you use
one of the templates that provides sample code.
1. On the application hub page, choose Go to SQL editor.
2. In the Would you like to start running "GSExample1"? dialog box, choose Yes, start application.
The console sends a request to start the application (see StartApplication (p. 216)), and then the
SQL editor page appears.
3. The console opens the SQL editor page. Review the page, including the buttons (Add SQL from
templates, Save and run SQL) and various tabs.
4. In the SQL editor, choose Add SQL from templates.
52
Amazon Kinesis Data Analytics Developer Guide
Step 3.3: Add Real-Time Analytics (Add Application Code)
5. From the available template list, choose Continuous filter. The sample code reads data from one in-
application stream (the WHERE clause filters the rows) and inserts it in another in-application stream
as follows:
It creates the in-application stream DESTINATION_SQL_STREAM.
It creates a pump STREAM_PUMP, and uses it to select rows from SOURCE_SQL_STREAM_001 and
insert them in the DESTINATION_SQL_STREAM.
6. Choose Add this SQL to editor.
7. Test the application code as follows:
Remember, you already started the application (status is RUNNING). Therefore, Amazon Kinesis
Data Analytics is already continuously reading from the streaming source and adding rows to the in-
application stream SOURCE_SQL_STREAM_001.
a. In the SQL Editor, choose Save and run SQL. The console first sends update request to save the
application code. Then, the code continuously executes.
b. You can see the results in the Real-time analytics tab.
The SQL editor has the following tabs:
53
Amazon Kinesis Data Analytics Developer Guide
Step 3.4: (Optional) Update the Application Code
The Source data tab shows an in-application input stream that is mapped to the streaming
source. Choose the in-application stream, and you can see data coming in. Note the additional
columns in the in-application input stream that weren't specified in the input configuration.
These include the following timestamp columns:
ROWTIME – Each row in an in-application stream has a special column called ROWTIME.
This column is the timestamp when Amazon Kinesis Data Analytics inserted the row in the
first in-application stream (the in-application input stream that is mapped to the streaming
source).
Approximate_Arrival_Time – Each Kinesis Data Analytics record includes a value called
Approximate_Arrival_Time. This value is the approximate arrival timestamp that is
set when the streaming source successfully receives and stores the record. When Kinesis
Data Analytics reads records from a streaming source, it fetches this column into the in-
application input stream.
These timestamp values are useful in windowed queries that are time-based. For more
information, see Windowed Queries (p. 70).
The Real-time analytics tab shows all the other in-application streams created by your
application code. It also includes the error stream. Kinesis Data Analytics sends any rows it
cannot process to the error stream. For more information, see Error Handling (p. 40).
Choose DESTINATION_SQL_STREAM to view the rows your application code inserted. Note
the additional columns that your application code didn't create. These columns include the
ROWTIME timestamp column. Kinesis Data Analytics simply copies these values from the
source (SOURCE_SQL_STREAM_001).
The Destination tab shows the external destination where Kinesis Data Analytics writes the
query results. You haven't configured any external destination for your application output yet.
Next Step
Step 3.4: (Optional) Update the Application Code (p. 54)
Step 3.4: (Optional) Update the Application Code
In this step, you explore how to update the application code.
To update application code
1. Create another in-application stream as follows:
Create another in-application stream called DESTINATION_SQL_STREAM_2.
Create a pump, and then use it to insert rows in the newly created stream by selecting rows from
the DESTINATION_SQL_STREAM.
In the SQL editor, append the following code to the existing application code:
54
Amazon Kinesis Data Analytics Developer Guide
Step 3.4: (Optional) Update the Application Code
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM_2"
(ticker_symbol VARCHAR(4),
change DOUBLE,
price DOUBLE);
CREATE OR REPLACE PUMP "STREAM_PUMP_2" AS
INSERT INTO "DESTINATION_SQL_STREAM_2"
SELECT STREAM ticker_symbol, change, price
FROM "DESTINATION_SQL_STREAM";
Save and run the code. Additional in-application streams appear on the Real-time analytics tab.
2. Create two in-application streams. Filter rows in the SOURCE_SQL_STREAM_001 based on the stock
ticker, and then insert them in to these separate streams.
Append the following SQL statements to your application code:
CREATE OR REPLACE STREAM "AMZN_STREAM"
(ticker_symbol VARCHAR(4),
change DOUBLE,
price DOUBLE);
CREATE OR REPLACE PUMP "AMZN_PUMP" AS
INSERT INTO "AMZN_STREAM"
SELECT STREAM ticker_symbol, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE ticker_symbol SIMILAR TO '%AMZN%';
CREATE OR REPLACE STREAM "TGT_STREAM"
(ticker_symbol VARCHAR(4),
change DOUBLE,
price DOUBLE);
CREATE OR REPLACE PUMP "TGT_PUMP" AS
INSERT INTO "TGT_STREAM"
SELECT STREAM ticker_symbol, change, price
FROM "SOURCE_SQL_STREAM_001"
WHERE ticker_symbol SIMILAR TO '%TGT%';
Save and run the code. Notice additional in-application streams on the Real-time analytics tab.
You now have your first working Amazon Kinesis Data Analytics application. In this exercise, you did the
following:
Created your first Kinesis data analytics application.
Configured application input that identified the demo stream as the streaming source and mapped
it to an in-application stream (SOURCE_SQL_STREAM_001) that is created. Kinesis Data Analytics
continuously reads the demo stream and inserts records in the in-application stream.
Your application code queried the SOURCE_SQL_STREAM_001 and wrote output to another in-
application stream called DESTINATION_SQL_STREAM.
Now you can optionally configure application output to write the application output to an
external destination. That is, you can configure the application output to write records in the
55
Amazon Kinesis Data Analytics Developer Guide
Step 4 (Optional) Edit the Schema
and SQL Code Using the Console
DESTINATION_SQL_STREAM to an external destination. For this exercise, this is an optional step. To
learn how to configure the destination, go to the next step.
Next Step
Step 4 (Optional) Edit the Schema and SQL Code Using the Console (p. 56).
Step 4 (Optional) Edit the Schema and SQL Code
Using the Console
Following, you can find information about how to edit an inferred schema and how to edit SQL code for
Amazon Kinesis Data Analytics. You do so by working with the schema editor and SQL editor that are
part of the Kinesis Data Analytics console.
Topics
Working with the Schema Editor (p. 56)
Working with the SQL Editor (p. 63)
Working with the Schema Editor
The schema for an Amazon Kinesis Data Analytics application's input stream defines how data from the
stream is made available to SQL queries in the application.
The schema contains selection criteria for determining what part of the streaming input is transformed
into a data column in the in-application input stream. This input can be one of the following:
A JSONPath expression for JSON input streams. JSONPath is a tool for querying JSON data.
A column number for input streams in comma-separated values (CSV) format.
A column name and a SQL data type for presenting the data in the in-application data stream. The
data type also contains a length for character or binary data.
The console attempts to generate the schema using DiscoverInputSchema (p. 210). If schema discovery
fails or returns an incorrect or incomplete schema, you must edit the schema manually by using the
schema editor.
56
Amazon Kinesis Data Analytics Developer Guide
Working with the Schema Editor
Schema Editor Main Screen
The following screenshot shows the main screen for the Schema Editor.
You can apply the following edits to the schema:
Add a column (1): You might need to add a data column if a data item is not detected automatically.
Delete a column (2): You can exclude data from the source stream if your application doesn't require
it. This exclusion doesn't affect the data in the source stream. If data is excluded, that data simply isn't
made available to the application.
57
Amazon Kinesis Data Analytics Developer Guide
Working with the Schema Editor
Rename a column (3). A column name can't be blank, must be longer than a single character, and
must not contain reserved SQL keywords. The name must also meet naming criteria for SQL ordinary
identifiers: The name must start with a letter and contain only letters, underscore characters, and
digits.
Change the data type (4) or length (5) of a column: You can specify a compatible data type for a
column. If you specify an incompatible data type, the column is either populated with NULL or the in-
application stream is not populated at all. In the latter case, errors are written to the error stream. If
you specify a length for a column that is too small, the incoming data is truncated.
Change the selection criteria of a column (6): You can edit the JSONPath expression or CSV column
order used to determine the source of the data in a column. To change the selection criteria for a
JSON schema, enter a new value for the row path expression. A CSV schema uses the column order as
selection criteria. To change the selection criteria for a CSV schema, change the order of the columns.
Editing the Schema for a Streaming Source
If you need to edit a schema for a streaming source, follow these steps.
To edit the schema for a streaming source
1. On the Source page, choose Edit schema.
2. On the Edit schema page, edit the source schema.
58
Amazon Kinesis Data Analytics Developer Guide
Working with the Schema Editor
3. For Format, choose JSON or CSV. For JSON or CSV format, the supported encoding is ISO 8859-1.
For further information on editing the schema for JSON or CSV format, see the procedures in the next
sections.
Editing a JSON Schema
You can edit a JSON schema by using the following steps.
To edit a JSON schema
1. In the schema editor, choose Add column to add a column.
A new column appears in the first column position. To change the column order, choose the up and
down arrows next to the column name.
For a new column, provide the following information:
For Column name, type a name.
A column name cannot be blank, must be longer than a single character, and must not contain
reserved SQL keywords. It must also meet naming criteria for SQL ordinary identifiers: It must
start with a letter and contain only letters, underscore characters, and digits.
For Column type, type an SQL data type.
A column type can be any supported SQL data type. If the new data type is CHAR, VARBINARY, or
VARCHAR, specify a data length for Length. For more information, see Data Types.
59
Amazon Kinesis Data Analytics Developer Guide
Working with the Schema Editor
For Row path, provide a row path. A row path is a valid JSONPath expression that maps to a JSON
element.
Note
The base Row path value is the path to the top-level parent that contains the data to
be imported. This value is $ by default. For more information, see RecordRowPath in
JSONMappingParameters.
2. To delete a column, choose the x icon next to the column number.
3. To rename a column, enter a new name for Column name. The new column name cannot be blank,
must be longer than a single character, and must not contain reserved SQL keywords. It must also
meet naming criteria for SQL ordinary identifiers: It must start with a letter and contain only letters,
underscore characters, and digits.
4. To change the data type of a column, choose a new data type for Column type. If the new data type
is CHAR, VARBINARY, or VARCHAR, specify a data length for Length. For more information, see Data
Types.
5. Choose Save schema and update stream to save your changes.
The modified schema appears in the editor and looks similar to the following.
60
Amazon Kinesis Data Analytics Developer Guide
Working with the Schema Editor
If your schema has many rows, you can filter the rows using Filter by column name. For example, to edit
column names that start with P, such as a Price column, enter P in the Filter by column name box.
Editing a CSV Schema
You can edit a CSV schema by using the following steps.
To edit a CSV schema
1. In the schema editor, for Row delimiter, choose the delimiter used by your incoming data stream.
This is the delimiter between records of data in your stream, such as a newline character.
2. For Column delimiter, choose the delimiter used by your incoming data stream. This is the delimiter
between fields of data in your stream, such as a comma.
3. To add a column, choose Add column.
A new column appears in the first column position. To change the column order, choose the up and
down arrows next to the column name.
For a new column, provide the following information:
For Column name, enter a name.
A column name cannot be blank, must be longer than a single character, and must not contain
reserved SQL keywords. It must also meet naming criteria for SQL ordinary identifiers: It must
start with a letter and contain only letters, underscore characters, and digits.
For Column type, enter a SQL data type.
A column type can be any supported SQL data type. If the new data type is CHAR, VARBINARY, or
VARCHAR, specify a data length for Length. For more information, see Data Types.
61
Amazon Kinesis Data Analytics Developer Guide
Working with the Schema Editor
4. To delete a column, choose the x icon next to the column number.
5. To rename a column, enter a new name in Column name. The new column name cannot be blank,
must be longer than a single character, and must not contain reserved SQL keywords. It must also
meet naming criteria for SQL ordinary identifiers: It must start with a letter and contain only letters,
underscore characters, and digits.
6. To change the data type of a column, choose a new data type for Column type. If the new data type
is CHAR, VARBINARY, or VARCHAR, specify a data length for Length. For more information, see Data
Types.
7. Choose Save schema and update stream to save your changes.
The modified schema appears in the editor and looks similar to the following.
62
Amazon Kinesis Data Analytics Developer Guide
Working with the SQL Editor
If your schema has many rows, you can filter the rows using Filter by column name. For example, to edit
column names that start with P, such as a Price column, enter P in the Filter by column name box.
Working with the SQL Editor
Following, you can find information about sections of the SQL editor and how each works. In the
SQL editor, you can either author your own code yourself or choose Add SQL from templates. A SQL
template gives you example SQL code that can help you write common Amazon Kinesis Data Analytics
applications. The example applications in this guide use some of these templates. For more information,
see Example Applications (p. 82).
Source Data Tab
The Source data tab identifies a streaming source. It also identifies the in-application input stream that
this source maps to and that provides the application input configuration.
63
Amazon Kinesis Data Analytics Developer Guide
Working with the SQL Editor
Amazon Kinesis Data Analytics provides the following timestamp columns, so that you don't need to
provide explicit mapping in your input configuration:
ROWTIME – Each row in an in-application stream has a special column called ROWTIME. This column
is the timestamp for the point when Kinesis Data Analytics inserted the row in the first in-application
stream.
Approximate_Arrival_Time – Records on your streaming source include the
Approximate_Arrival_Timestamp column. It is the approximate arrival timestamp that is set when
the streaming source successfully receives and stores the related record. Kinesis Data Analytics fetches
this column into the in-application input stream as Approximate_Arrival_Time. Amazon Kinesis
Data Analytics provides this column only in the in-application input stream that is mapped to the
streaming source.
These timestamp values are useful in windowed queries that are time-based. For more information, see
Windowed Queries (p. 70).
Real-Time Analytics Tab
The Real-time analytics tab shows all the in-application streams that your application code creates.
This group of streams includes the error stream (error_stream) that Amazon Kinesis Data Analytics
provides for all applications.
64
Amazon Kinesis Data Analytics Developer Guide
Working with the SQL Editor
Destination Tab
The Destination tab enables you to configure the application output to persist in-application streams
to external destinations. You can configure output to persist data in any of the in-application streams to
external destinations. For more information, see Configuring Application Output (p. 32).
65
Amazon Kinesis Data Analytics Developer Guide
In-Application Streams and Pumps
Streaming SQL Concepts
Amazon Kinesis Data Analytics implements the ANSI 2008 SQL standard with extensions. These
extensions enable you to process streaming data. The following topics cover key streaming SQL
concepts.
Topics
In-Application Streams and Pumps (p. 66)
Timestamps and the ROWTIME Column (p. 67)
Continuous Queries (p. 69)
Windowed Queries (p. 70)
Streaming Data Operations: Stream Joins (p. 80)
In-Application Streams and Pumps
When you configure application input, you map a streaming source to an in-application stream that
is created. Data continuously flows from the streaming source into the in-application stream. An in-
application stream works like a table that you can query using SQL statements, but it's called a stream
because it represents continuous data flow.
Note
Do not confuse in-application streams with Amazon Kinesis data streams and Kinesis Data
Firehose delivery streams. In-application streams exist only in the context of an Amazon Kinesis
Data Analytics application. Kinesis data streams and Kinesis Data Firehose delivery streams
exist independent of your application. You can configure them as a streaming source in your
application input configuration or as a destination in output configuration.
You can also create more in-application streams as needed to store intermediate query results. Creating
an in-application stream is a two-step process. First, you create an in-application stream, and then
you pump data into it. For example, suppose that the input configuration of your application creates
an in-application stream named INPUTSTREAM. In the following example, you create another stream
(TEMPSTREAM), and then you pump data from INPUTSTREAM into it.
1. Create an in-application stream (TEMPSTREAM) with three columns, as shown following:
CREATE OR REPLACE STREAM "TEMPSTREAM" (
"column1" BIGINT NOT NULL,
"column2" INTEGER,
"column3" VARCHAR(64));
The column names are specified in quotes, making them case sensitive. For more information, see
Identifiers in the Amazon Kinesis Data Analytics SQL Reference.
2. Insert data into the stream using a pump. A pump is a continuous insert query running that inserts
data from one in-application stream to another in-application stream. The following statement
creates a pump (SAMPLEPUMP) and inserts data into the TEMPSTREAM by selecting records from
another stream (INPUTSTREAM).
CREATE OR REPLACE PUMP "SAMPLEPUMP" AS
66
Amazon Kinesis Data Analytics Developer Guide
Timestamps and the ROWTIME Column
INSERT INTO "TEMPSTREAM" ("column1",
"column2",
"column3")
SELECT STREAM inputcolumn1,
inputcolumn2,
inputcolumn3
FROM "INPUTSTREAM";
You can have multiple writers insert into an in-application stream, and there can be multiple readers
selected from the stream. Think of an in-application stream as implementing a publish/subscribe
messaging paradigm. In this paradigm, the data row, including the time of creation and time of receipt,
can be processed, interpreted, and forwarded by a cascade of streaming SQL statements, without having
to be stored in a traditional RDBMS.
After an in-application stream is created, you can perform normal SQL queries.
Note
When you query streams, most SQL statements are bound using a row-based or time-based
window. For more information, see Windowed Queries (p. 70).
You can also join streams. For examples of joining streams, see Streaming Data Operations: Stream
Joins (p. 80).
Timestamps and the ROWTIME Column
In-application streams include a special column called ROWTIME. It stores a timestamp when Amazon
Kinesis Data Analytics inserts a row in the first in-application stream. ROWTIME reflects the timestamp at
which Amazon Kinesis Data Analytics inserted a record into the first in-application stream after reading
from the streaming source. This ROWTIME value is then maintained throughout your application.
Note
When you pump records from one in-application stream into another, you don't need to
explicitly copy the ROWTIME column, Amazon Kinesis Data Analytics copies this column for you.
Amazon Kinesis Data Analytics guarantees that the ROWTIME values are monotonically increased.
You use this timestamp in time-based windowed queries. For more information, see Windowed
Queries (p. 70).
You can access the ROWTIME column in your SELECT statement like any other columns in your in-
application stream. For example:
SELECT STREAM ROWTIME,
some_col_1,
some_col_2
FROM SOURCE_SQL_STREAM_001
Understanding Various Times in Streaming Analytics
In addition to ROWTIME, there are other types of times in real-time streaming applications. These are:
Event time – The timestamp when the event occurred. This is also sometimes called the client-side
time. It is often desirable to use this time in analytics because it is the time when an event occurred.
However, many event sources, such as mobile phones and web clients, do not have reliable clocks,
which can lead to inaccurate times. In addition, connectivity issues can lead to records appearing on a
stream not in the same order the events occurred.
67
Amazon Kinesis Data Analytics Developer Guide
Understanding Various Times in Streaming Analytics
Ingest time – The timestamp of when record was added to the streaming source. Amazon Kinesis
Data Streams includes a field called APPROXIMATE_ARRIVAL_TIME in every record that provides this
timestamp. This is also sometimes referred to as the server-side time. This ingest time is often the close
approximation of event time. If there is any kind of delay in the record ingestion to the stream, this
can lead to inaccuracies, which are typically rare. Also, the ingest time is rarely out of order, but it can
occur due to the distributed nature of streaming data. Therefore, Ingest time is a mostly accurate and
in-order reflection of the event time.
Processing time – The timestamp when Amazon Kinesis Data Analytics inserts a row in the first in-
application stream. Amazon Kinesis Data Analytics provides this timestamp in the ROWTIME column
that exists in each in-application stream. The processing time is always monotonically increasing. But it
will not be accurate if your application falls behind. (If an application falls behind, the processing time
does not accurately reflect the event time.) This ROWTIME is accurate in relation to the wall clock, but it
might not be the time when the event actually occurred.
Using each of these times in windowed queries that are time-based has advantages and disadvantages.
We recommend that you choose one or more of these times, and a strategy to deal with the relevant
disadvantages based on your use case scenario.
Note
If you are using row-based windows, time is not an issue and you can ignore this section.
We recommend a two-window strategy that uses two time-based, both ROWTIME and one of the other
times (ingest or event time).
Use ROWTIME as the first window, which controls how frequently the query emits the results, as shown
in the following example. It is not used as a logical time.
Use one of the other times that is the logical time that you want to associate with your analytics. This
time represents when the event occurred. In the following example, the analytics goal is to group the
records and return count by ticker.
The advantage of this strategy is that it can use a time that represents when the event occurred. It
can gracefully handle when your application falls behind or when events arrive out of order. If the
application falls behind when bringing records into the in-application stream, they are still grouped by
the logical time in the second window. The query uses ROWTIME to guarantee the order of processing.
Any records that are late (the ingest timestamp shows an earlier value compared to the ROWTIME value)
are also processed successfully.
Consider the following query against the demo stream used in the Getting Started Exercise. The query
uses the GROUP BY clause and emits a ticker count in a one-minute tumbling window.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"
("ingest_time" timestamp,
"APPROXIMATE_ARRIVAL_TIME" timestamp,
"ticker_symbol" VARCHAR(12),
"symbol_count" integer);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND) AS
"ingest_time",
STEP("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME BY INTERVAL '60' SECOND) AS
"APPROXIMATE_ARRIVAL_TIME",
68
Amazon Kinesis Data Analytics Developer Guide
Continuous Queries
"TICKER_SYMBOL",
COUNT(*) AS "symbol_count"
FROM "SOURCE_SQL_STREAM_001"
GROUP BY "TICKER_SYMBOL",
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND),
STEP("SOURCE_SQL_STREAM_001".APPROXIMATE_ARRIVAL_TIME BY INTERVAL '60' SECOND);
In GROUP BY, you first group the records based on ROWTIME in a one-minute window and then by
APPROXIMATE_ARRIVAL_TIME.
The timestamp values in the result are rounded down to the nearest 60-second interval. The first group
result emitted by the query shows records in the first minute. The second group of results emitted shows
records in the next minutes based on ROWTIME. The last record indicates that the application was late in
bringing the record in the in-application stream (it shows a late ROWTIME value compared to the ingest
timestamp).
ROWTIME INGEST_TIME TICKER_SYMBOL SYMBOL_COUNT
--First one minute window.
2016-07-19 17:05:00.0 2016-07-19 17:05:00.0 ABC 10
2016-07-19 17:05:00.0 2016-07-19 17:05:00.0 DEF 15
2016-07-19 17:05:00.0 2016-07-19 17:05:00.0 XYZ 6
–-Second one minute window.
2016-07-19 17:06:00.0 2016-07-19 17:06:00.0 ABC 11
2016-07-19 17:06:00.0 2016-07-19 17:06:00.0 DEF 11
2016-07-19 17:06:00.0 2016-07-19 17:05:00.0 XYZ 1 ***
***late-arriving record, instead of appearing in the result of the
first 1-minute windows (based on ingest_time, it is in the result
of the second 1-minute window.
You can combine the results for a final accurate count per minute by pushing the results to a
downstream database. For example, you can configure the application output to persist the results to a
Kinesis Data Firehose delivery stream that can write to an Amazon Redshift table. After results are in an
Amazon Redshift table, you can query the table to compute the total count group by Ticker_Symbol.
In the case of XYZ, the total is accurate (6+1) even though a record arrived late.
Continuous Queries
A query over a stream executes continuously over streaming data. This continuous execution enables
scenarios, such as the ability for applications to continuously query a stream and generate alerts.
In the Getting Started exercise, you have an in-application stream named SOURCE_SQL_STREAM_001. It
continuously receives stock prices from a demo stream (a Kinesis data stream). The schema is as follows:
(TICKER_SYMBOL VARCHAR(4),
SECTOR varchar(16),
CHANGE REAL,
PRICE REAL)
Suppose that you are interested in stock price changes greater than 15 percent. You can use the
following query in your application code. This query runs continuously and emits records when a stock
price change greater than 1 percent is detected.
SELECT STREAM TICKER_SYMBOL, PRICE
FROM "SOURCE_SQL_STREAM_001"
69
Amazon Kinesis Data Analytics Developer Guide
Windowed Queries
WHERE (ABS((CHANGE / (PRICE-CHANGE)) * 100)) > 1
Use the following procedure to set up an Amazon Kinesis Data Analytics application and test this query.
To test the query
1. Create an application by following the Getting Started Exercise.
2. Replace the SELECT statement in the application code with the preceding SELECT query. The
resulting application code is shown following:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4),
price DOUBLE);
-- CREATE OR REPLACE PUMP to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM TICKER_SYMBOL,
PRICE
FROM "SOURCE_SQL_STREAM_001"
WHERE (ABS((CHANGE / (PRICE-CHANGE)) * 100)) > 1;
Windowed Queries
SQL queries in your application code execute continuously over in-application streams. An in-application
stream represents unbounded data that flows continuously through your application. Therefore, to get
result sets from this continuously updating input, you often bound queries using a window defined in
terms of time or rows. These are also called windowed SQL.
For a time-based windowed query, you specify the window size in terms of time (for example, a one-
minute window). This requires a timestamp column in your in-application stream that is monotonically
increasing. (The timestamp for a new row is greater than or equal to the previous row.) Amazon Kinesis
Data Analytics provides such a timestamp column called ROWTIME for each in-application stream. You
can use this column when specifying time-based queries. For your application, you might choose some
other timestamp option. For more information, see Timestamps and the ROWTIME Column (p. 67).
For a row-based windowed query, you specify the window size in terms of the number of rows.
You can specify a query to process records in a tumbling window, sliding window, or stagger window
manner, depending on your application needs. Kinesis Data Analytics supports the following window
types:
Stagger Windows (p. 70): A query that aggregates data using keyed time-based windows that open
as data arrives. The keys allow for multiple overlapping windows. This is the recommended way to
aggregate data using time-based windows, because Stagger Windows reduce late or out-of-order data
compared to Tumbling windows.
Tumbling Windows (p. 75): A query that aggregates data using distinct time-based windows that
open and close at regular intervals.
Sliding Windows (p. 76): A query that aggregates data continuously, using a fixed time or rowcount
interval.
Stagger Windows
Using stagger windows is a windowing method that is suited for analyzing groups of data that arrive at
inconsistent times. It is well suited for any time-series analytics use case, such as a set of related sales or
log records.
70
Amazon Kinesis Data Analytics Developer Guide
Stagger Windows
For example, VPC Flow Logs have a capture window of approximately 10 minutes. But they can have a
capture window of up to 15 minutes if you're aggregating data on the client. Stagger windows are ideal
for aggregating these logs for analysis.
Stagger windows address the issue of related records not falling into the same time-restricted window,
such as when tumbling windows were used.
Partial Results with Tumbling Windows
There are certain limitations with using Tumbling Windows (p. 75) for aggregating late or out-of-
order data.
If tumbling windows are used to analyze groups of time-related data, the individual records might fall
into separate windows. So then the partial results from each window must be combined later to yield
complete results for each group of records.
In the following tumbling window query, records are grouped into windows by row time, event time, and
ticker symbol:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
TICKER_SYMBOL VARCHAR(4),
TICKER_COUNT DOUBLE);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
TICKER_SYMBOL,
FLOOR(EVENT_TIME TO MINUTE),
COUNT(TICKER_SYMBOL) AS TICKER_COUNT
FROM "SOURCE_SQL_STREAM_001"
GROUP BY ticker_symbol, FLOOR(EVENT_TIME TO MINUTE),
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '1' MINUTE);
In the following diagram, an application is counting the number of trades it receives, based on when
the trades happened (event time) with one minute of granularity. The application can use a tumbling
window for grouping data based on row time and event time. The application receives four records
that all arrive within one minute of each other. It groups the records by row time, event time, and ticker
symbol. Because some of the records arrive after the first tumbling window ends, the records do not all
fall within the same one-minute tumbling window.
71
Amazon Kinesis Data Analytics Developer Guide
Stagger Windows
The preceding diagram has the following events.
ROWTIME EVENT_TIME TICKER_SYMBOL
11:00:20 11:00:10 AMZN
11:00:30 11:00:20 AMZN
11:01:05 11:00:55 AMZN
11:01:15 11:01:05 AMZN
The result set from the tumbling window application looks similar to the following.
ROWTIME EVENT_TIME TICKER_SYMBOL COUNT
11:01:00 11:00:00 AMZN 2
11:02:00 11:00:00 AMZN 1
11:02:00 11:01:00 AMZN 1
72
Amazon Kinesis Data Analytics Developer Guide
Stagger Windows
In the result set preceding, three results are returned:
A record with a ROWTIME of 11:01:00 that aggregates the first two records.
A record at 11:02:00 that aggregates just the third record. This record has a ROWTIME within the
second window, but an EVENT_TIME within the first window.
A record at 11:02:00 that aggregates just the fourth record.
To analyze the complete result set, the records must be aggregated in the persistence store. This adds
complexity and processing requirements to the application.
Complete Results with Stagger Windows
To improve the accuracy of analyzing time-related data records, Kinesis Data Analytics offers a new
window type called stagger windows. In this window type, windows open when the first event matching
the partition key arrives, and not on a fixed time interval. The windows close based on the age specified,
which is measured from the time when the window opened.
A stagger window is a separate time-restricted window for each key grouping in a window clause. The
application aggregates each result of the window clause inside its own time window, rather than using a
single window for all results.
In the following stagger window query, records are grouped into windows by event time and ticker
symbol:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
ticker_symbol VARCHAR(4),
event_time TIMESTAMP,
ticker_count DOUBLE);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
TICKER_SYMBOL,
FLOOR(EVENT_TIME TO MINUTE),
COUNT(TICKER_SYMBOL) AS ticker_count
FROM "SOURCE_SQL_STREAM_001"
WINDOWED BY STAGGER (
PARTITION BY FLOOR(EVENT_TIME TO MINUTE), TICKER_SYMBOL RANGE INTERVAL '1'
MINUTE);
In the following diagram, events are aggregated by event time and ticker symbol into stagger windows.
73
Amazon Kinesis Data Analytics Developer Guide
Stagger Windows
The preceding diagram has the following events, which are the same events as the tumbling window
application analyzed:
ROWTIME EVENT_TIME TICKER_SYMBOL
11:00:20 11:00:10 AMZN
11:00:30 11:00:20 AMZN
11:01:05 11:00:55 AMZN
11:01:15 11:01:05 AMZN
The result set from the stagger window application looks similar to the following.
ROWTIME EVENT_TIME TICKER_SYMBOL Count
11:01:20 11:00:00 AMZN 3
11:02:15 11:01:00 AMZN 1
74
Amazon Kinesis Data Analytics Developer Guide
Tumbling Windows
The returned record aggregates the first three input records. The records are grouped by one-minute
stagger windows. The stagger window starts when the application receives the first AMZN record (with
a ROWTIME of 11:00:20). When the 1-minute stagger window expires (at 11:00:20), a record with the
results that fall within the stagger window (based on ROWTIME and EVENT_TIME) is written to the
output stream. Using a stagger window, all of the records with a ROWTIME and EVENT_TIME within a
one-minute window are emitted in a single result.
The last record (with an EVENT_TIME outside the one-minute aggregation) is aggregated separately. This
is because EVENT_TIME is one of the partition keys that is used to separate the records into result sets,
and the partition key for EVENT_TIME for the first window is 11:00.
The syntax for a stagger window is defined in a special clause, WINDOWED BY. This clause is used instead
of the GROUP BY clause for streaming aggregations. The clause appears immediately after the optional
WHERE clause and before the HAVING clause.
The stagger window is defined in the WINDOWED BY clause and takes two parameters: partition keys
and window length. The partition keys partition the incoming data stream and define when the window
opens. A stagger window opens when the first event with a unique partition key appears on the stream.
The stagger window closes after a fixed time period defined by the window length. The syntax is shown
in the following code example:
...
FROM <stream-name>
WHERE <... optional statements...>
WINDOWED BY STAGGER(
PARTITION BY <partition key(s)>
RANGE INTERVAL <window length, interval>
);
Tumbling Windows (Aggregations Using GROUP BY)
When a windowed query processes each window in a non-overlapping manner, the window is referred to
as a tumbling window. In this case, each record on an in-application stream belongs to a specific window.
It is processed only once (when the query processes the window to which the record belongs).
For example, an aggregation query using a GROUP BY clause processes rows in a tumbling window. The
demo stream in the getting started exercise receives stock price data that is mapped to the in-application
stream SOURCE_SQL_STREAM_001 in your application. This stream has the following schema.
(TICKER_SYMBOL VARCHAR(4),
SECTOR varchar(16),
CHANGE REAL,
PRICE REAL)
In your application code, suppose that you want to find aggregate (min, max) prices for each ticker over a
one-minute window. You can use the following query.
SELECT STREAM ROWTIME,
75
Amazon Kinesis Data Analytics Developer Guide
Sliding Windows
Ticker_Symbol,
MIN(Price) AS Price,
MAX(Price) AS Price
FROM "SOURCE_SQL_STREAM_001"
GROUP BY Ticker_Symbol,
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
The preceding is an example of a windowed query that is time-based. The query groups records by
ROWTIME values. For reporting on a per-minute basis, the STEP function rounds down the ROWTIME
values to the nearest minute.
Note
You can also use the FLOOR function to group records into windows. However, FLOOR can
only round time values down to a whole time unit (hour, minute, second, and so on). STEP is
recommended for grouping records into tumbling windows because it can round values down to
an arbitrary interval, for example, 30 seconds.
This query is an example of a nonoverlapping (tumbling) window. The GROUP BY clause groups records
in a one-minute window, and each record belongs to a specific window (no overlapping). The query
emits one output record per minute, providing the min/max ticker price recorded at the specific minute.
This type of query is useful for generating periodic reports from the input data stream. In this example,
reports are generated each minute.
To test the query
1. Set up an application by following the getting started exercise.
2. Replace the SELECT statement in the application code by the preceding SELECT query. The resulting
application code is shown following:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
ticker_symbol VARCHAR(4),
Min_Price DOUBLE,
Max_Price DOUBLE);
-- CREATE OR REPLACE PUMP to insert into output
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM Ticker_Symbol,
MIN(Price) AS Min_Price,
MAX(Price) AS Max_Price
FROM "SOURCE_SQL_STREAM_001"
GROUP BY Ticker_Symbol,
STEP("SOURCE_SQL_STREAM_001".ROWTIME BY INTERVAL '60' SECOND);
Sliding Windows
Instead of grouping records using GROUP BY, you can define a time-based or row-based window. You do
this by adding an explicit WINDOW clause.
In this case, as the window slides with time, Amazon Kinesis Data Analytics emits an output when new
records appear on the stream. Kinesis Data Analytics emits this output by processing rows in the window.
Windows can overlap in this type of processing, and a record can be part of multiple windows and be
processed with each window. The following example illustrates a sliding window.
Consider a simple query that counts records on the stream. This example assumes a 5-second window. In
the following example stream, new records arrive at time t1, t2, t6, and t7, and three records arrive at time
t8 seconds.
76
Amazon Kinesis Data Analytics Developer Guide
Sliding Windows
Keep the following in mind:
The example assumes a 5-second window. The 5-second window slides continuously with time.
For every row that enters a window, an output row is emitted by the sliding window. Soon after the
application starts, you see the query emit output for every new record that appears on the stream,
even though a 5-second window hasn't passed yet. For example, the query emits output when a record
appears in the first second and second second. Later, the query processes records in the 5-second
window.
The windows slide with time. If an old record on the stream falls out of the window, the query doesn't
emit output unless there is also a new record on the stream that falls within that 5-second window.
Suppose that the query starts executing at t0. Then the following occurs:
1. At the time t0, the query starts. The query doesn't emit output (count value) because there are no
records at this time.
2. At time t1, a new record appears on the stream, and the query emits count value 1.
3. At time t2, another record appears, and the query emits count 2.
4. The 5-second window slides with time:
At t3, the sliding window t3 to t0
At t4 (sliding window t4 to t0)
At t5 the sliding window t5–t0
At all of these times, the 5-second window has the same records—there are no new records.
Therefore, the query doesn't emit any output.
5. At time t6, the 5-second window is (t6 to t1). The query detects one new record at t6 so it emits output
2. The record at t1 is no longer in the window and doesn't count.
77
Amazon Kinesis Data Analytics Developer Guide
Sliding Windows
6. At time t7, the 5-second window is t7 to t2. The query detects one new record at t7 so it emits output
2. The record at t2 is no longer in the 5-second window, and therefore isn't counted.
7. At time t8, the 5-second window is t8 to t3. The query detects three new records, and therefore emits
record count 5.
In summary, the window is a fixed size and slides with time. The query emits output when new records
appear.
Note
We recommend that you use a sliding window no longer than one hour. If you use a longer
window, the application takes longer to restart after regular system maintenance. This is
because the source data must be read from the stream again.
The following example queries use the WINDOW clause to define windows and perform aggregates.
Because the queries don't specify GROUP BY, the query uses the sliding window approach to process
records on the stream.
Example 1: Process a Stream Using a 1-Minute Sliding Window
Consider the demo stream in the Getting Started exercise that populates the in-application stream,
SOURCE_SQL_STREAM_001. The following is the schema.
(TICKER_SYMBOL VARCHAR(4),
SECTOR varchar(16),
CHANGE REAL,
PRICE REAL)
Suppose that you want your application to compute aggregates using a sliding 1-minute window. That is,
for each new record that appears on the stream, you want the application to emit an output by applying
aggregates on records in the preceding 1-minute window.
You can use the following time-based windowed query. The query uses the WINDOW clause to define the
1-minute range interval. The PARTITION BY in the WINDOW clause groups records by ticker values within
the sliding window.
SELECT STREAM ticker_symbol,
MIN(Price) OVER W1 AS Min_Price,
MAX(Price) OVER W1 AS Max_Price,
AVG(Price) OVER W1 AS Avg_Price
FROM "SOURCE_SQL_STREAM_001"
WINDOW W1 AS (
78
Amazon Kinesis Data Analytics Developer Guide
Sliding Windows
PARTITION BY ticker_symbol
RANGE INTERVAL '1' MINUTE PRECEDING);
To test the query
1. Set up an application by following the Getting Started Exercise.
2. Replace the SELECT statement in the application code with the preceding SELECT query. The
resulting application code is the following.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
ticker_symbol VARCHAR(10),
Min_Price double,
Max_Price double,
Avg_Price double);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM ticker_symbol,
MIN(Price) OVER W1 AS Min_Price,
MAX(Price) OVER W1 AS Max_Price,
AVG(Price) OVER W1 AS Avg_Price
FROM "SOURCE_SQL_STREAM_001"
WINDOW W1 AS (
PARTITION BY ticker_symbol
RANGE INTERVAL '1' MINUTE PRECEDING);
Example 2: Query Applying Aggregates on a Sliding Window
The following query on the demo stream returns the average of the percent change in the price of each
ticker in a 10-second window.
SELECT STREAM Ticker_Symbol,
AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change
FROM "SOURCE_SQL_STREAM_001"
WINDOW W1 AS (
PARTITION BY ticker_symbol
RANGE INTERVAL '10' SECOND PRECEDING);
To test the query
1. Set up an application by following the Getting Started Exercise.
2. Replace the SELECT statement in the application code with the preceding SELECT query. The
resulting application code is the following.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
ticker_symbol VARCHAR(10),
Avg_Percent_Change double);
CREATE OR REPLACE PUMP "STREAM_PUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM Ticker_Symbol,
AVG(Change / (Price - Change)) over W1 as Avg_Percent_Change
FROM "SOURCE_SQL_STREAM_001"
WINDOW W1 AS (
PARTITION BY ticker_symbol
RANGE INTERVAL '10' SECOND PRECEDING);
79
Amazon Kinesis Data Analytics Developer Guide
Stream Joins
Example 3: Query Data from Multiple Sliding Windows on the
Same Stream
You can write queries to emit output in which each column value is calculated using different sliding
windows defined over the same stream.
In the following example, the query emits the output ticker, price, a2, and a10. It emits output for ticker
symbols whose two-row moving average crosses the ten-row moving average. The a2 and a10 column
values are derived from two-row and ten-row sliding windows.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
ticker_symbol VARCHAR(12),
price double,
average_last2rows double,
average_last10rows double);
CREATE OR REPLACE PUMP "myPump" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM ticker_symbol,
price,
avg(price) over last2rows,
avg(price) over last10rows
FROM SOURCE_SQL_STREAM_001
WINDOW
last2rows AS (PARTITION BY ticker_symbol ROWS 2 PRECEDING),
last10rows AS (PARTITION BY ticker_symbol ROWS 10 PRECEDING);
To test this query against the demo stream, follow the test procedure described in Example 1 (p. 78).
Streaming Data Operations: Stream Joins
You can have multiple in-application streams in your application. You can write JOIN queries to correlate
data arriving on these streams. For example, suppose that you have the following in-application streams:
OrderStream – Receives stock orders being placed.
(orderId SqlType, ticker SqlType, amount SqlType, ROWTIME TimeStamp)
TradeStream – Receives resulting stock trades for those orders.
(tradeId SqlType, orderId SqlType, ticker SqlType, amount SqlType, ticker SqlType,
amount SqlType, ROWTIME TimeStamp)
The following are JOIN query examples that correlate data on these streams.
Example 1: Report Orders Where There Are Trades
Within One Minute of the Order Being Placed
In this example, your query joins both the OrderStream and TradeStream. However, because we
want only trades placed one minute after the orders, the query defines the 1-minute window over the
TradeStream. For information about windowed queries, see Sliding Windows (p. 76).
SELECT STREAM
ROWTIME,
80
Amazon Kinesis Data Analytics Developer Guide
Example 1: Report Orders Where There Are Trades
Within One Minute of the Order Being Placed
o.orderId, o.ticker, o.amount AS orderAmount,
t.amount AS tradeAmount
FROM OrderStream AS o
JOIN TradeStream OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS t
ON o.orderId = t.orderId;
You can define the windows explicitly using the WINDOW clause and writing the preceding query as
follows:
SELECT STREAM
ROWTIME,
o.orderId, o.ticker, o.amount AS orderAmount,
t.amount AS tradeAmount
FROM OrderStream AS o
JOIN TradeStream OVER t
ON o.orderId = t.orderId
WINDOW t AS
(RANGE INTERVAL '1' MINUTE PRECEDING)
When you include this query in your application code, the application code runs continuously. For each
arriving record on the OrderStream, the application emits an output if there are trades within the 1-
minute window following the order being placed.
The join in the preceding query is an inner join where the query emits records in OrderStream for which
there is a matching record in TradeStream (and vice versa). Using an outer join you can create another
interesting scenario. Suppose that you want stock orders for which there are no trades within one minute
of stock order being placed, and trades reported within the same window but for some other orders. This
is example of an outer join.
SELECT STREAM
ROWTIME,
o.orderId, o.ticker, o.amount AS orderAmount,
t.ticker, t.tradeId, t.amount AS tradeAmount,
FROM OrderStream AS o
OUTER JOIN TradeStream OVER (RANGE INTERVAL '1' MINUTE PRECEDING) AS t
ON o.orderId = t.orderId;
81
Amazon Kinesis Data Analytics Developer Guide
Transforming Data
Example Applications
This section provides examples of creating and working with applications in Amazon Kinesis Data
Analytics. They include example code and step-by-step instructions to help you create Kinesis data
analytics applications and test your results.
Before you explore these examples, we recommend that you first review Amazon Kinesis Data Analytics:
How It Works (p. 3) and Getting Started with Amazon Kinesis Data Analytics (p. 45).
Topics
Examples: Transforming Data (p. 82)
Examples: Windows and Aggregation (p. 104)
Examples: Joins (p. 116)
Examples: Machine Learning (p. 119)
Examples: Alerts and Errors (p. 139)
Examples: Solution Accelerators (p. 142)
Examples: Transforming Data
There are times when your application code must preprocess incoming records before performing any
analytics in Amazon Kinesis Data Analytics. This can happen for various reasons, such as when records
don't conform to the supported record formats, resulting in unnormalized columns in the in-application
input streams.
This section provides examples of how to use the available string functions to normalize data, how to
extract information that you need from string columns, and so on. The section also points to date time
functions that you might find useful.
Preprocessing Streams with Lambda
For information about preprocessing streams with AWS Lambda, see Preprocessing Data Using a Lambda
Function (p. 21).
Topics
Examples: Transforming String Values (p. 82)
Example: Transforming DateTime Values (p. 96)
Example: Transforming Multiple Data Types (p. 99)
Examples: Transforming String Values
Amazon Kinesis Data Analytics supports formats such as JSON and CSV for records on a streaming
source. For details, see RecordFormat (p. 276). These records then map to rows in an in-application
stream as per the input configuration. For details, see Configuring Application Input (p. 5). The input
82
Amazon Kinesis Data Analytics Developer Guide
Transforming String Values
configuration specifies how record fields in the streaming source map to columns in an in-application
stream.
This mapping works when records on the streaming source follow the supported formats, which results
in an in-application stream with normalized data. But what if data on your streaming source does not
conform to supported standards? For example, what if your streaming source contains data such as
clickstream data, IoT sensors, and application logs?
Consider these examples:
Streaming source contains application logs – The application logs follow the standard Apache log
format, and are written to the stream using JSON format.
{
"Log":"192.168.254.30 - John [24/May/2004:22:01:02 -0700] "GET /icons/apache_pb.gif
HTTP/1.1" 304 0"
}
For more information about the standard Apache log format, see Log Files on the Apache website.
Streaming source contains semi-structured data – The following example shows two records. The
Col_E_Unstructured field value is a series of comma-separated values. There are five columns: the
first four have string type values, and the last column contains comma-separated values.
{ "Col_A" : "string",
"Col_B" : "string",
"Col_C" : "string",
"Col_D" : "string",
"Col_E_Unstructured" : "value,value,value,value"}
{ "Col_A" : "string",
"Col_B" : "string",
"Col_C" : "string",
"Col_D" : "string",
"Col_E_Unstructured" : "value,value,value,value"}
Records on your streaming source contain URLs, and you need a portion of the URL domain name for
analytics.
{ "referrer" : "http://www.amazon.com"}
{ "referrer" : "http://www.stackoverflow.com" }
In such cases, the following two-step process generally works for creating in-application streams that
contain normalized data:
1. Configure application input to map the unstructured field to a column of the VARCHAR(N) type in the
in-application input stream that is created.
2. In your application code, use string functions to split this single column into multiple columns and
then save the rows in another in-application stream. This in-application stream that your application
code creates will have normalized data. You can then perform analytics on this in-application stream.
Amazon Kinesis Data Analytics provides the following string operations, standard SQL functions, and
extensions to the SQL standard for working with string columns:
String operators – Operators such as LIKE and SIMILAR are useful in comparing strings. For more
information, see String Operators in the Amazon Kinesis Data Analytics SQL Reference.
83
Amazon Kinesis Data Analytics Developer Guide
Transforming String Values
SQL functions – The following functions are useful when manipulating individual strings. For more
information, see String and Search Functions in the Amazon Kinesis Data Analytics SQL Reference.
CHAR_LENGTH – Provides the length of a string.
INITCAP – Returns a converted version of the input string such that the first character of each
space-delimited word is uppercase, and all other characters are lowercase.
LOWER/UPPER – Converts a string to lowercase or uppercase.
OVERLAY – Replaces a portion of the first string argument (the original string) with the second string
argument (the replacement string).
POSITION – Searches for a string within another string.
REGEX_REPLACE – Replaces a substring with an alternative substring.
SUBSTRING – Extracts a portion of a source string starting at a specific position.
TRIM – Removes instances of the specified character from the beginning or end of the source string.
SQL extensions – These are useful for working with unstructured strings such as logs and URIs. For
more information, see Log Parsing Functions in the Amazon Kinesis Data Analytics SQL Reference.
FAST_REGEX_LOG_PARSER – Works similar to the regex parser, but it takes several shortcuts to
ensure faster results. For example, the fast regex parser stops at the first match it finds (known as
lazy semantics).
FIXED_COLUMN_LOG_PARSE – Parses fixed-width fields and automatically converts them to the
given SQL types.
REGEX_LOG_PARSE – Parses a string based on default Java regular expression patterns.
SYS_LOG_PARSE – Parses entries commonly found in UNIX/Linux system logs.
VARIABLE_COLUMN_LOG_PARSE – Splits an input string into fields separated by a delimiter
character or a delimiter string.
W3C_LOG_PARSE – Can be used for quickly formatting Apache logs.
For examples using these functions, see the following topics:
Topics
Example: Extracting a Portion of a String (SUBSTRING Function) (p. 84)
Example: Replacing a Substring using Regex (REGEX_REPLACE Function) (p. 86)
Example: Parsing Log Strings Based on Regular Expressions (REGEX_LOG_PARSE Function) (p. 89)
Example: Parsing Web Logs (W3C_LOG_PARSE Function) (p. 91)
Example: Split Strings into Multiple Fields (VARIABLE_COLUMN_LOG_PARSE Function) (p. 93)
Example: Extracting a Portion of a String (SUBSTRING Function)
This example uses the SUBSTRING function to transform a string in Amazon Kinesis Data Analytics.
The SUBSTRING function extracts a portion of a source string starting at a specific position. For more
information, see SUBSTRING in the Amazon Kinesis Data Analytics SQL Reference.
In this example, you write the following records to an Amazon Kinesis data stream.
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
You then create an Amazon Kinesis data analytics application on the console, using the Kinesis data
stream as the streaming source. The discovery process reads sample records on the streaming source and
infers an in-application schema with one column (REFERRER), as shown.
84
Amazon Kinesis Data Analytics Developer Guide
Transforming String Values
Then, you use the application code with the SUBSTRING function to parse the URL string to retrieve
the company name. Then you insert the resulting data into another in-application stream, as shown
following:
Topics
Step 1: Create a Kinesis Data Stream (p. 85)
Step 2: Create the Kinesis Data Analytics Application (p. 86)
Step 1: Create a Kinesis Data Stream
Create an Amazon Kinesis data stream and populate the log records as follows:
1. Sign in to the AWS Management Console and open the Kinesis console at https://
console.aws.amazon.com/kinesis.
2. Choose Data Streams in the navigation pane.
3. Choose Create Kinesis stream, and create a stream with one shard. For more information, see Create
a Stream in the Amazon Kinesis Data Streams Developer Guide.
4. Run the following Python code to populate sample log records. This simple code continuously writes
the same log record to the stream.
import json
import boto3
import random
kinesis = boto3.client('kinesis')
def getReferrer():
data = {}
data['REFERRER'] = 'http://www.amazon.com'
85
Amazon Kinesis Data Analytics Developer Guide
Transforming String Values
return data
while True:
data = json.dumps(getReferrer())
print(data)
kinesis.put_record(
StreamName="teststreamforkinesisanalyticsapps",
Data=data,
PartitionKey="partitionkey")
Step 2: Create the Kinesis Data Analytics Application
Next, create an Amazon Kinesis data analytics application as follows:
1. Open the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics.
2. Choose Create application, type an application name, and choose Create application.
3. On the application details page, choose Connect streaming data.
4. On the Connect to source page, do the following:
a. Choose the stream that you created in the preceding section.
b. Choose the option to create an IAM role.
c. Choose Discover schema. Wait for the console to show the inferred schema and samples
records used to infer the schema for the in-application stream created. The inferred schema has
only one column.
d. Choose Save and continue.
5. On the application details page, choose Go to SQL editor. To start the application, choose Yes, start
application in the dialog box that appears.
6. In the SQL editor, write the application code, and verify the results as follows:
a. Copy the following application code and paste it into the editor.
-- CREATE OR REPLACE STREAM for cleaned up referrer
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"ingest_time" TIMESTAMP,
"referrer" VARCHAR(32));
CREATE OR REPLACE PUMP "myPUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
"APPROXIMATE_ARRIVAL_TIME",
SUBSTRING("referrer", 12, (POSITION('.com' IN "referrer") -
POSITION('www.' IN "referrer") - 4))
FROM "SOURCE_SQL_STREAM_001";
b. Choose Save and run SQL. On the Real-time analytics tab, you can see all the in-application
streams that the application created and verify the data.
Example: Replacing a Substring using Regex (REGEX_REPLACE
Function)
This example uses the REGEX_REPLACE function to transform a string in Amazon Kinesis Data
Analytics. REGEX_REPLACE replaces a substring with an alternative substring. For more information, see
REGEX_REPLACE in the Amazon Kinesis Data Analytics SQL Reference.
In this example, you write the following records to an Amazon Kinesis data stream:
86
Amazon Kinesis Data Analytics Developer Guide
Transforming String Values
{ "REFERRER" : "http://www.amazon.com" }
{ "REFERRER" : "http://www.amazon.com"}
{ "REFERRER" : "http://www.amazon.com"}
...
You then create an Amazon Kinesis data analytics application on the console, with the Kinesis data
stream as the streaming source. The discovery process reads sample records on the streaming source and
infers an in-application schema with one column (REFERRER) as shown.
Then, you use the application code with the REGEX_REPLACE function to convert the URL to use
https:// instead of http://. You insert the resulting data into another in-application stream, as
shown following:
Topics
Step 1: Create a Kinesis Data Stream (p. 87)
Step 2: Create the Kinesis Data Analytics Application (p. 88)
Step 1: Create a Kinesis Data Stream
Create an Amazon Kinesis data stream and populate the log records as follows:
1. Sign in to the AWS Management Console and open the Kinesis console at https://
console.aws.amazon.com/kinesis.
2. Choose Data Streams in the navigation pane.
3. Choose Create Kinesis stream, and create a stream with one shard. For more information, see Create
a Stream in the Amazon Kinesis Data Streams Developer Guide.
4. Run the following Python code to populate the sample log records. This simple code continuously
writes the same log record to the stream.
87
Amazon Kinesis Data Analytics Developer Guide
Transforming String Values
import json
import boto3
import random
kinesis = boto3.client('kinesis')
def getReferrer():
data = {}
data['REFERRER'] = 'http://www.amazon.com'
return data
while True:
data = json.dumps(getReferrer())
print(data)
kinesis.put_record(
StreamName="teststreamforkinesisanalyticsapps",
Data=data,
PartitionKey="partitionkey")
Step 2: Create the Kinesis Data Analytics Application
Next, create an Amazon Kinesis data analytics application as follows:
1. Open the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics.
2. Choose Create application, type an application name, and choose Create application.
3. On the application details page, choose Connect streaming data.
4. On the Connect to source page, do the following:
a. Choose the stream that you created in the preceding section.
b. Choose the option to create an IAM role.
c. Choose Discover schema. Wait for the console to show the inferred schema and samples
records used to infer the schema for the in-application stream created. The inferred schema has
only one column.
d. Choose Save and continue.
5. On the application details page, choose Go to SQL editor. To start the application, choose Yes, start
application in the dialog box that appears.
6. In the SQL editor, write the application code and verify the results as follows:
a. Copy the following application code, and paste it into the editor:
-- CREATE OR REPLACE STREAM for cleaned up referrer
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
"ingest_time" TIMESTAMP,
"referrer" VARCHAR(32));
CREATE OR REPLACE PUMP "myPUMP" AS
INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM
"APPROXIMATE_ARRIVAL_TIME",
REGEX_REPLACE("REFERRER", 'http://', 'https://', 1, 0)
FROM "SOURCE_SQL_STREAM_001";
b. Choose Save and run SQL. On the Real-time analytics tab, you can see all the in-application
streams that the application created and verify the data.
88
Amazon Kinesis Data Analytics Developer Guide
Transforming String Values
Example: Parsing Log Strings Based on Regular Expressions
(REGEX_LOG_PARSE Function)
This example uses the REGEX_LOG_PARSE function to transform a string in Amazon Kinesis Data
Analytics. REGEX_LOG_PARSE parses a string based on default Java regular expression patterns. For
more information, see REGEX_LOG_PARSE in the Amazon Kinesis Data Analytics SQL Reference.
In this example, you write the following records to an Amazon Kinesis stream:
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200
125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200
125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
{"LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] \"GET /index.php HTTP/1.1\" 200
125 \"-\" \"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0\""}
...
You then create an Amazon Kinesis data analytics application on the console, with the Kinesis data
stream as the streaming source. The discovery process reads sample records on the streaming source and
infers an in-application schema with one column (LOGENTRY), as shown following.
Then, you use the application code with the REGEX_LOG_PARSE function to parse the log string to
retrieve the data elements. You insert the resulting data into another in-application stream, as shown in
the following screenshot:
Topics
Step 1: Create a Kinesis Data Stream (p. 90)
Step 2: Create the Kinesis Data Analytics Application (p. 90)
89
Amazon Kinesis Data Analytics Developer Guide
Transforming String Values
Step 1: Create a Kinesis Data Stream
Create an Amazon Kinesis data stream and populate the log records as follows:
1. Sign in to the AWS Management Console and open the Kinesis console at https://
console.aws.amazon.com/kinesis.
2. Choose Data Streams in the navigation pane.
3. Choose Create Kinesis stream, and create a stream with one shard. For more information, see Create
a Stream in the Amazon Kinesis Data Streams Developer Guide.
4. Run the following Python code to populate sample log records. This simple code continuously writes
the same log record to the stream.
import json
import boto3
import random
kinesis = boto3.client('kinesis')
def getReferrer():
data = {}
data['LOGENTRY'] = '203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] "GET /index.php
HTTP/1.1" 200 125 "-" "Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"'
return data
while True:
data = json.dumps(getReferrer())
print(data)
kinesis.put_record(
StreamName="teststreamforkinesisanalyticsapps",
Data=data,
PartitionKey="partitionkey")
Step 2: Create the Kinesis Data Analytics Application
Next, create an Amazon Kinesis data analytics application as follows:
1. Open the Kinesis Data Analytics console at https://console.aws.amazon.com/kinesisanalytics.
2. Choose Create application, and specify an application name.
3. On the application details page, choose Connect streaming data.
4. On the Connect to source page, do the following:
a. Choose the stream that you created in the preceding section.
b. Choose the option to create an IAM role.
c. Choose Discover schema. Wait for the console to show the inferred schema and samples
records used to infer the schema for the in-application stream created. The inferred schema has
only one column.
d. Choose Save and continue.
5. On the application details page, choose Go to SQL editor. To start the application, choose Yes, start
application in the dialog box that appears.
6. In the SQL editor, write the application code, and verify the results as follows:
a. Copy the following application code and paste it into the editor.
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (logentry VARCHAR(24), match1
VARCHAR(24), match2 VARCHAR(24));
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
90
Amazon Kinesis Data Analytics Developer Guide
Transforming String Values
SELECT STREAM T.LOGENTRY, T.REC.COLUMN1, T.REC.COLUMN2
FROM
(SELECT STREAM LOGENTRY,
REGEX_LOG_PARSE(LOGENTRY, '(\w.+) (\d.+) (\w.+) (\w.+)') AS REC
FROM