Sha256: 7c2919d1d521a24e42b4ba8ec2b214eee31dcccdc78a138b4fd5f3b853ce1af3

Contents?: true

Size: 1.53 KB

Versions: 3

Compression:

Stored size: 1.53 KB

Contents

class LogStash::Inputs::Kinesis::Worker
  include com.amazonaws.services.kinesis.clientlibrary.interfaces.v2::IRecordProcessor

  attr_reader(
    :checkpoint_interval,
    :codec,
    :decorator,
    :logger,
    :output_queue,
  )

  def initialize(*args)
    # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor
    if !@constructed
      @codec, @output_queue, @decorator, @checkpoint_interval, @logger = args
      @next_checkpoint = Time.now - 600
      @constructed = true
    else
      _shard_id = args[0].shardId
      @decoder = java.nio.charset::Charset.forName("UTF-8").newDecoder()
    end
  end
  public :initialize

  def processRecords(records_input)
    records_input.records.each { |record| process_record(record) }
    if Time.now >= @next_checkpoint
      checkpoint(records_input.checkpointer)
      @next_checkpoint = Time.now + @checkpoint_interval
    end
  end

  def shutdown(shutdown_input)
    if shutdown_input.shutdown_reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE
      checkpoint(shutdown_input.checkpointer)
    end
  end

  protected

  def checkpoint(checkpointer)
    checkpointer.checkpoint()
  rescue => error
    @logger.error("Kinesis worker failed checkpointing: #{error}")
  end

  def process_record(record)
    raw = @decoder.decode(record.getData).to_s
    @codec.decode(raw) do |event|
      @decorator.call(event)
      @output_queue << event
    end
  rescue => error
    @logger.error("Error processing record: #{error}")
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
logstash-input-kinesis-1.4.3-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-1.4.2-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-1.4.1-java lib/logstash/inputs/kinesis/worker.rb