Sha256: 9e38ea6773684629900c07759c2cc399db208bc799c146c293b26d92a0f7f9d7

Contents?: true

Size: 1.84 KB

Versions: 6

Compression:

Stored size: 1.84 KB

Contents

# encoding: utf-8
class LogStash::Inputs::Kinesis::Worker
  include com.amazonaws.services.kinesis.clientlibrary.interfaces.v2::IRecordProcessor

  attr_reader(
    :checkpoint_interval,
    :codec,
    :decorator,
    :logger,
    :output_queue,
  )

  def initialize(*args)
    # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor
    if !@constructed
      @codec, @output_queue, @decorator, @checkpoint_interval, @logger = args
      @next_checkpoint = Time.now - 600
      @constructed = true
    else
      _shard_id = args[0].shardId
    end
  end
  public :initialize

  def processRecords(records_input)
    records_input.records.each { |record| process_record(record) }
    if Time.now >= @next_checkpoint
      checkpoint(records_input.checkpointer)
      @next_checkpoint = Time.now + @checkpoint_interval
    end
  end

  def shutdown(shutdown_input)
    if shutdown_input.shutdown_reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE
      checkpoint(shutdown_input.checkpointer)
    end
  end

  protected

  def checkpoint(checkpointer)
    checkpointer.checkpoint()
  rescue => error
    @logger.error("Kinesis worker failed checkpointing: #{error}")
  end

  def process_record(record)
    raw = String.from_java_bytes(record.getData.array)
    metadata = build_metadata(record)
    @codec.decode(raw) do |event|
      @decorator.call(event)
      event.set('@metadata', metadata)
      @output_queue << event
    end
  rescue => error
    @logger.error("Error processing record: #{error}")
  end

  def build_metadata(record)
    metadata = Hash.new
    metadata['approximate_arrival_timestamp'] = record.getApproximateArrivalTimestamp.getTime
    metadata['partition_key'] = record.getPartitionKey
    metadata['sequence_number'] = record.getSequenceNumber
    metadata
  end

end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
logstash-input-kinesis-2.0.7-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-2.0.6-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-2.0.5-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-2.0.4-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-2.0.3-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-2.0.2-java lib/logstash/inputs/kinesis/worker.rb