Skip to content
This repository has been archived by the owner on Feb 28, 2023. It is now read-only.
Derek Bennett edited this page Jul 27, 2016 · 10 revisions

Presto Kinesis Connector

This is the StitchFix update of the presto-kinesis wiki. This repository was cloned from qubole/presto-kinesis. We'll use this Wiki to document local updates and configuration.

Presto is made up of a series of connectors that allow it to work with multiple data sources (Hive being one of our primary ones here). The Kinesis connector allows a Kinesis event stream to be used as a data source. The Presto documentation contains a section that explains what is required to develop a connector: Presto Dev Guide.

Please see the additional pages for details on specific topics, such as:

Changes in this version

Git work branch: upgrade-148 (now merged into master and PR submitted to qubole)

The major change is to upgrade the connector to work with the latest version of Presto (now 0.149). The original connector was developed at version 0.105 and the SPI has changed quite a bit! Additionally the Amazon AWS SDK has changed (now on 1.11.x, was on 1.9.x when original connector was written). Other changes besides the API upgrades include:

  • Adding ability to read table definitions from Amazon S3 instead of just the local file
  • Added more configurable shard iterator, and by default start iterating 24 hours before current time (in our configuration settings we use 1 hour before current time instead of the default)
  • Adding more unit tests to verify read logic, including creating a simple mock Kinesis client
  • Improve record reading logic in the record set class (original logic had several issues, including some queries that led to endless loops). Added several configuration and session variables to have more control over this.
  • Added more configuration properties, and exposed all of the session variables to Presto
  • Enable the other field decoders to be used with the JSON row decoder
  • Minor extensions to the table definition JSON

Local notes

Amazon S3 Table Definitions

Table definitions for Kinesis streams are stored in JSON files in a location on S3 that we have defined. The server will check the S3 directory for changes (new or updated files) every 10 minutes. If new tables are added or existing tables are updated, the server will update its internal table definitions. The location can be configured to point to the desired folder in S3.

Amazon bucket and folder used for table definitions:

  • Bucket : stitchfix.aa.config
  • Folder : tools/presto-kinesis
  • Full URL: s3://stitchfix.aa.config/tools/presto-kinesis

Details on Reading from Streams

Kinesis streams are read with a combination of: 1) obtaining an initial shard iterator, and 2) a series of getRecords calls to retrieve a batch of records. There are many types of shard iterators, and these define the starting point for reading records. By default, the connector will start reading from 24 hours before the current date (using the AT_TIMESTAMP shard iterator type). We will configure it on our servers to start 1 hour before the current date, since our main use case is for looking at recent records. It moves forward in time from there until: 1) there are no more records, OR, 2) Presto figures out it does not need to read any more (ex: LIMIT query).

But when should we stop reading from the stream? Records are added all of the time, and if we keep reading forever the query will never come back with an answer! The Kinesis API provides a "milliseconds behind latest" parameter so we know if we have caught up. We stop reading whenever we get close enough to the current time.

We have also provided session variables to configure the reading process above, and to facilitate querying Kinesis in a loop to get the latest set of events.

More details on the Kinesis API: Kinesis Consumers