Sha256: eda7a2b4941093b88efc36bcbf6b8551af154978e2a9d96d093fb30420c73f60

Contents?: true

Size: 1.84 KB

Versions: 7

Compression:

Stored size: 1.84 KB

Contents

# encoding: utf-8
class LogStash::Inputs::Kinesis::Worker
  include com.amazonaws.services.kinesis.clientlibrary.interfaces.v2::IRecordProcessor

  attr_reader(
    :checkpoint_interval,
    :codec,
    :decorator,
    :logger,
    :output_queue,
  )

  def initialize(*args)
    # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor
    if !@constructed
      @codec, @output_queue, @decorator, @checkpoint_interval, @logger = args
      @next_checkpoint = Time.now - 600
      @constructed = true
    else
      _shard_id = args[0].shardId
    end
  end
  public :initialize

  def processRecords(records_input)
    records_input.records.each { |record| process_record(record) }
    if Time.now >= @next_checkpoint
      checkpoint(records_input.checkpointer)
      @next_checkpoint = Time.now + @checkpoint_interval
    end
  end

  def shutdown(shutdown_input)
    if shutdown_input.shutdown_reason == com.amazonaws.services.kinesis.clientlibrary.lib.worker::ShutdownReason::TERMINATE
      checkpoint(shutdown_input.checkpointer)
    end
  end

  protected

  def checkpoint(checkpointer)
    checkpointer.checkpoint()
  rescue => error
    @logger.error("Kinesis worker failed checkpointing: #{error}")
  end

  def process_record(record)
    raw = String.from_java_bytes(record.getData.array)
    metadata = build_metadata(record)
    @codec.decode(raw) do |event|
      @decorator.call(event)
      event.set('@metadata', metadata)
      @output_queue << event
    end
  rescue => error
    @logger.error("Error processing record: #{error}")
  end

  def build_metadata(record)
    metadata = Hash.new
    metadata['approximate_arrival_timestamp'] = record.getApproximateArrivalTimestamp.getTime
    metadata['partition_key'] = record.getPartitionKey
    metadata['sequence_number'] = record.getSequenceNumber
    metadata
  end

end

Version data entries

7 entries across 7 versions & 2 rubygems

Version Path
logstash-input-kinesis-2.1.2-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-2.1.1-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-2.1.0-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-2.0.11-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-2.0.10-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-jordanforks-2.0.9-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-2.0.8-java lib/logstash/inputs/kinesis/worker.rb