Sha256: 359750ad5f684b093a38246f7adfbaac7fcffbf7962801ea171632a3128efb95
Contents?: true
Size: 1.49 KB
Versions: 1
Compression:
Stored size: 1.49 KB
Contents
# encoding: utf-8 class LogStash::Inputs::Kinesis::Worker include com.amazonaws.services.kinesis.clientlibrary.interfaces.v2::IRecordProcessor attr_reader( :checkpoint_interval, :codec, :decorator, :logger, :output_queue, ) def initialize(*args) # nasty hack, because this is the name of a method on IRecordProcessor, but also ruby's constructor if !@constructed @codec, @output_queue, @decorator, @checkpoint_interval, @logger = args @next_checkpoint = Time.now - 600 @constructed = true else _shard_id = args[0].shardId end end public :initialize def processRecords(records_input) records_input.records.each { |record| process_record(record.getData) } if Time.now >= @next_checkpoint checkpoint(records_input.checkpointer) @next_checkpoint = Time.now + @checkpoint_interval end end def shutdown(shutdown_input) if shutdown_input.shutdown_reason == com.amazonaws.services.kinesis.clientlibrary.types::ShutdownReason::TERMINATE checkpoint(shutdown_input.checkpointer) end end protected def checkpoint(checkpointer) checkpointer.checkpoint() rescue => error @logger.error("Kinesis worker failed checkpointing: #{error}") end def process_record(record) raw = String.from_java_bytes(record.array) @codec.decode(raw) do |event| @decorator.call(event) @output_queue << event end rescue => error @logger.error("Error processing record: #{error}") end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
logstash-input-kinesis-2.0.1-java | lib/logstash/inputs/kinesis/worker.rb |