**Table of Contents** - [Telekinesis](#telekinesis) - [Requirements](#requirements) - [Installing](#installing) - [Producers](#producers) - [SyncProducer](#syncproducer) - [AsyncProducer](#asyncproducer) - [Consumers](#consumers) - [DistributedConsumer](#distributedconsumer) - [Client State](#client-state) - [Errors while processing records](#errors-while-processing-records) - [Checkpoints and `INITIAL_POSITION_IN_STREAM`](#checkpoints-and-initial_position_in_stream) - [Java client logging](#java-client-logging) - [](#) - [Building](#building) - [Prerequisites](#prerequisites) - [Build](#build) - [Testing](#testing) # Telekinesis Telekinesis is a high-level client for Amazon Kinesis. The library provides a high-throughput asynchronous producer and wraps the [Kinesis Client Library](https://github.com/awslabs/amazon-kinesis-client) to provide an easy interface for writing consumers. ## Requirements Telekinesis runs on JRuby 1.7.x or later, with at least Java 6. If you want to build from source, you need to have Apache Maven installed. ## Installing ``` gem install telekinesis ``` ## Producers Telekinesis includes two high-level [Producers](http://docs.aws.amazon.com/kinesis/latest/dev/amazon-kinesis-producers.html). Telekinesis assumes that records are `[key, value]` pairs of strings. The key *must* be a string as enforced by Kinesis itself. Keys are used by the service to partition data into shards. Values can be any old blob of data, but for simplicity, Telekinesis expects strings. Both keys and values should respect any Kinesis [limits](http://docs.aws.amazon.com/kinesis/latest/dev/service-sizes-and-limits.html). and all of the [restrictions](http://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html) in the PutRecords API documentation. ### SyncProducer The `SyncProducer` sends data to Kinesis every time `put` or `put_records` is called. These calls will block until the call to Kinesis returns. ```ruby require 'telekinesis' producer = Telekinesis::Producer::SyncProducer.create( stream: 'my stream', credentials: { acess_key_id: 'foo', secret_access_key: 'bar' } ) ``` Calls to `put` send a single record at a time to Kinesis, where calls to `put_records` can send up to 500 records at a time, which is the Kinesis service limit. If more than 500 records are passed to `put_records` they're grouped into batches and sent. > NOTE: To send fewer records to Kinesis at a time when using `put_records`, > you can adjust the `:send_size` parameter in the `create` method. Using `put_records` over `put` is recommended if you have any way to batch your data. Since Kinesis has an HTTP API and often has high latency, it tends to make sense to try and increase throughput as much as possible by batching data. ```ruby # file is an instance of File containing CSV data that looks like: # # "some,very,important,data,with,a,partition_key" # lines = file.lines.map do |line| key = line.split(/,/).last data = line [key, data] end # One record at a time lines.each do |key, data| producer.put(key, data) end # Manually control your batches lines.each_slice(200) do |batch| producer.put_all(batch) end # Go hog wild producer.put_all(lines.to_a) ``` When something goes wrong and the Kinesis client throws an exception, it bubbles up as a `Telekinesis::Aws::KinesisError` with the underlying exception accessible as the `cause` field. When some of (but maybe not all of) the records passed to `put_records` cause problems, they're returned as an array of `[key, value, error_code, error_message]` tuples. ### AsyncProducer The `AsyncProducer` queues events interally and uses background threads to send data to Kinesis. Data is sent when a batch reaches the Kinesis limit of 500, when the producer's timeout is reached, or when the producer is shut down. > NOTE: You can configure the size at which a batch is sent by passing the > `:send_size` parameter to create. The producer's internal timeout can be > set by using the `:send_every_ms` parameter. The API for the `AsyncProducer` is looks similar to the `SyncProducer`. However, all `put` and `put_all` calls return immediately. Both `put` and `put_all` return `true` if the producer enqueued the data for sending later, and `false` if the producer is not accepting data for any reason. If the producer's internal queue fill up, calls to `put` and `put_all` will block. Since sending (and therefore failures) happen in a different thread, you can provide an `AsyncProducer` with a failure handler that's called whenever something bad happens. ```ruby require 'telekinesis' class MyFailureHandler def on_record_failure(kv_pairs_and_errors) items = kv_pairs_and_errors.map do |k, v, code, message| maybe_log_error(code, message) [k, v] end save_for_later(items) end def on_kinesis_error(err, items) log_exception(err.cause) save_for_later(items) end end producer = Telekinesis::Producer::AsyncProducer.create( stream: 'my stream', failure_handler: MyFailureHandler.new, send_every_ms: 1500, credentials: { acess_key_id: 'foo', secret_access_key: 'bar' } ) ``` ## Consumers ### DistributedConsumer The `DistributedConsumer` is a wrapper around Amazon's [Kinesis Client Library (also called the KCL)](http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-app.html#kinesis-record-processor-overview-kcl). Each `DistributedConsumer` is considered to be part of a group of consumers that make up an _application_. An application can be running on any number of hosts. Consumers identify themself uniquely within an application by specifying a `worker_id`. All of the consumers within an application attempt to distribute work evenly between themselves by coordinating through a DynamoDB table. This coordination ensures that a single consumer processes each shard, and that if one consumer fails for any reason, another consumer can pick up from the point at which it last checkpointed. This is all part of the KCL! Telekinesis just makes it easier to use from JRuby. Each `DistributedConsumer` has to know how to process all the data it's retreiving from Kinesis. That's done by creating a [record processor](http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app-java.html#kinesis-record-processor-implementation-interface-java) and telling a `DistributedConsumer` how to create a processor when it becomes responsible for a shard. We highly recommend reading the [official docs](http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app-java.html#kinesis-record-processor-implementation-interface-java) on implementing the `IRecordProcessor` interface before you continue. > NOTE: Since `initialize` is a reserved method, Telekinesis takes care of > calling your `init` method whenever the KCL calls `IRecordProcessor`'s > `initialize` method. > NOTE: Make sure you read the Kinesis Record Processor documentation carefully. > Failures, checkpoints, and shutting require some attention. More on that later. After it is created, a record processor is initialized with the ID of the shard it's processing, and handed an enumerable of [Records](http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/index.html?com/amazonaws/services/kinesis/AmazonKinesisClient.html) and a checkpointer (see below) every time the consumer detects new data to process. Defining and creating a simple processor might look like: ```ruby require 'telekinesis' class MyProcessor def init(shard_id) @shard_id = shard_id $stderr.puts "Started processing #{@shard_id}" end def process_records(records, checkpointer) records.each {|r| puts "key=#{r.partition_key} value=#{String.from_java_bytes(r.data.array)}" } end def shutdown $stderr.puts "Shutting down #{@shard_id}" end end Telekinesis::Consumer::DistributedConsumer.new(stream: 'some-events', app: 'example') do MyProcessor.new end ``` To make defining record processors easier, Telekinesis comes with a `Block` processor that lets you use a block to specify your `process_records` method. Use this if you don't need to do any explicit startup or shutdown in a record processor. ```ruby require 'telekinesis' Telekinesis::Consumer::DistributedConsumer.new(stream: 'some-events', app: 'example') do Telekinesis::Consumer::Block.new do |records, checkpointer| records.each {|r| puts "key=#{r.partition_key} value=#{String.from_java_bytes(r.data.array)}" } end end ``` Once you get into building a client application, you'll probably want to know about some of the following advanced tips and tricks. #### Client State Each KCL Application gets its own DynamoDB table that stores all of this state. The `:application` name is used as the DynamoDB table name, so beware of namespace collisions if you use DynamoDB on its own. Altering or reseting any of this state involves manually altering the application's Dynamo table. #### Errors while processing records When a call to `process_records` fails, the KCL expects you to handle the failure and try to reprocess. If you let an exception escape, it happily moves on to the next batch of records from Kinesis and will let you checkpoint further on down the road. From the [official docs](http://docs.aws.amazon.com/kinesis/latest/dev/kinesis-record-processor-implementation-app-java.html): > The KCL relies on processRecords to handle any exceptions that arise from > processing the data records. If an exception is thrown from processRecords, > the KCL skips over the data records that were passed prior to the exception; > that is, these records are not re-sent to the record processor that threw the > exception or to any other record processor in the application. The moral of the story is that you should be absolutely sure you catch any exceptions that get thrown in your `process_records` implementation. If you don't, you can (silently) drop data on the floor. If something terrible happens and you can't attempt to re-read the list of records and re-do whatever work you needed to do in process records, we've been advised by the Kinesis team that killing the entire JVM that's running the worker is the safest thing to do. On restart, the consumer (or another consumer in the application group) will pick up the orphaned shards and attempt to restart from the last available checkpoint. #### Checkpoints and `INITIAL_POSITION_IN_STREAM` The second object passed to `process_records` is a checkpointer. This can be used to checkpoint all records that have been passed to the processor so far (by just calling `checkpointer.checkpoint`) or up to a particular sequence number (by calling `checkpointer.checkpoint(record.sequence_number)`). While a `DistributedConsumer` can be initialized with an `:initial_position_in_stream` option, any existing checkpoint for a shard will take precedent over that value. Furthermore, any existing STATE in DynamoDB will take precedent, so if you start a consumer with `initial_position_in_stream: 'LATEST'` and then restart with `initial_position_in_stream: 'TRIM_HORIZON'` you still end up starting from `LATEST`. ## Java client logging The AWS Java SDK can be extremely noisy and hard to control, since it logs through `java.util.logging`. Telekinesis comes with a shim that can silence all of that logging or redirect it to a Ruby Logger of your choice. This isn't fine-grained control - you're capturing or disabling ALL logging from any Java dependency that uses `java.util.logging` - so use it with care. To entirely disable logging: ```ruby Telekinesis::Logging.disable_java_logging ``` To capture all logging and send it through a Ruby logger: ```ruby Telekinesis::Logging.capture_java_logging(Logger.new($stderr)) ``` ---- # Building ## Prerequisites * JRuby 1.7.9 or later. * Apache Maven ## Build Install JRuby 1.7.9 or later, for example with `rbenv` you would: ``` $ rbenv install jruby-1.7.9 ``` Install Bundler and required Gems. ``` $ gem install bundler $ bundle install ``` Install Apache Maven. On Ubuntu or related use: ``` $ sudo apt-get install maven ``` The easiest method on OSX is via `brew`. ``` $ sudo brew install maven ``` Ensure `JAVA_HOME` is set on OSX. Ensure your `JAVA_HOME` environment variable is set. In Bash for example add the following to `~/.bash_profile`. ``` export JAVA_HOME=$(/usr/libexec/java_home) ``` Then run: ``` $ source ~/.bash_profile ``` Build the Java shim and jar. ``` $ rake build:ext ``` The `rake build:ext` task builds the Java shim and packages all of the required Java classes into a single jar. Since bytecode is portable, the JAR is shipped with the built gem. Build the Gem. Use the `rake build:gem` task to build the complete gem, uberjar and all. ``` $ rake build:gem ``` # Testing Telekinesis comes with a small set of unit tests. Run those with plain ol' `rake test`. > NOTE: The Java extension *must* be built and installed before you can run > unit tests. Integration tests coming soon.