Sha256: 8043c7ee82972b87fc18488ce7cdaa7a61cc865364e46ea6bee7b74d2a4b1ccf

Contents?: true

Size: 1.47 KB

Versions: 7

Compression:

Stored size: 1.47 KB

Contents

class LogStash::Inputs::Kinesis::Worker
  include com.amazonaws.services.kinesis.clientlibrary.interfaces::IRecordProcessor

  attr_reader(
    :checkpoint_interval,
    :codec,
    :decorator,
    :logger,
    :output_queue,
  )

  def initialize(*args)
    # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor
    if !@constructed
      @codec, @output_queue, @decorator, @checkpoint_interval, @logger = args
      @next_checkpoint = Time.now - 600
      @constructed = true
    else
      _shard_id, _ = args
      @decoder = java.nio.charset::Charset.forName("UTF-8").newDecoder()
    end
  end
  public :initialize

  def processRecords(records, checkpointer)
    records.each { |record| process_record(record) }
    if Time.now >= @next_checkpoint
      checkpoint(checkpointer)
      @next_checkpoint = Time.now + @checkpoint_interval
    end
  end

  def shutdown(checkpointer, reason)
    if reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE
      checkpoint(checkpointer)
    end
  end

  protected

  def checkpoint(checkpointer)
    checkpointer.checkpoint()
  rescue => error
    @logger.error("Kinesis worker failed checkpointing: #{error}")
  end

  def process_record(record)
    raw = @decoder.decode(record.getData).to_s
    @codec.decode(raw) do |event|
      @decorator.call(event)
      @output_queue << event
    end
  rescue => error
    @logger.error("Error processing record: #{error}")
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
logstash-input-kinesis-1.3.1-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-1.3.0-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-1.2.4-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-1.2.3-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-1.2.2-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-1.2.1-java lib/logstash/inputs/kinesis/worker.rb
logstash-input-kinesis-1.2.0-java lib/logstash/inputs/kinesis/worker.rb