Sha256: 359750ad5f684b093a38246f7adfbaac7fcffbf7962801ea171632a3128efb95

Contents?: true

Size: 1.49 KB

Versions: 1

Compression:

Stored size: 1.49 KB

Contents

# encoding: utf-8
class LogStash::Inputs::Kinesis::Worker
  include com.amazonaws.services.kinesis.clientlibrary.interfaces.v2::IRecordProcessor

  attr_reader(
    :checkpoint_interval,
    :codec,
    :decorator,
    :logger,
    :output_queue,
  )

  def initialize(*args)
    # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor
    if !@constructed
      @codec, @output_queue, @decorator, @checkpoint_interval, @logger = args
      @next_checkpoint = Time.now - 600
      @constructed = true
    else
      _shard_id = args[0].shardId
    end
  end
  public :initialize

  def processRecords(records_input)
    records_input.records.each { |record| process_record(record.getData) }
    if Time.now >= @next_checkpoint
      checkpoint(records_input.checkpointer)
      @next_checkpoint = Time.now + @checkpoint_interval
    end
  end

  def shutdown(shutdown_input)
    if shutdown_input.shutdown_reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE
      checkpoint(shutdown_input.checkpointer)
    end
  end

  protected

  def checkpoint(checkpointer)
    checkpointer.checkpoint()
  rescue => error
    @logger.error("Kinesis worker failed checkpointing: #{error}")
  end

  def process_record(record)
    raw = String.from_java_bytes(record.array)
    @codec.decode(raw) do |event|
      @decorator.call(event)
      @output_queue << event
    end
  rescue => error
    @logger.error("Error processing record: #{error}")
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
logstash-input-kinesis-2.0.1-java lib/logstash/inputs/kinesis/worker.rb