Sha256: e0f6fdf228d40779d659eef13a14273fd02dc86e34012de96795413ffaca0755
Contents?: true
Size: 1.83 KB
Versions: 3
Compression:
Stored size: 1.83 KB
Contents
require 'poseidon' module LogStashLogger module Device class Kafka < Connectable DEFAULT_HOST = 'localhost' DEFAULT_PORT = 9092 DEFAULT_TOPIC = 'logstash' DEFAULT_PRODUCER = 'logstash-logger' DEFAULT_BACKOFF = 1 attr_accessor :hosts, :topic, :producer, :backoff def initialize(opts) super host = opts[:host] || DEFAULT_HOST port = opts[:port] || DEFAULT_PORT @hosts = opts[:hosts] || host.split(',').map { |h| "#{h}:#{port}" } @topic = opts[:path] || DEFAULT_TOPIC @producer = opts[:producer] || DEFAULT_PRODUCER @backoff = opts[:backoff] || DEFAULT_BACKOFF end def connect @io = ::Poseidon::Producer.new(@hosts, @producer) end def reconnect @io.close connect end def with_connection connect unless @io yield rescue ::Poseidon::Errors::ChecksumError, Poseidon::Errors::UnableToFetchMetadata => e warn "#{self.class} - #{e.class} -> reconnect/retry" sleep backoff if backoff reconnect retry rescue => e warn "#{self.class} - #{e.class} - #{e.message} -> giving up" @io = nil end def write(message) buffer_receive Poseidon::MessageToSend.new(@topic, message) buffer_flush(force: true) if @sync end def write_batch(messages) with_connection do @io.send_messages messages end end def close buffer_flush(final: true) @io && @io.close rescue => e warn "#{self.class} - #{e.class} - #{e.message}" ensure @io = nil end def flush(*args) if args.empty? buffer_flush else messages = *args.first write_batch(messages) end end end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
logstash-logger-0.15.2 | lib/logstash-logger/device/kafka.rb |
logstash-logger-0.15.1 | lib/logstash-logger/device/kafka.rb |
logstash-logger-0.15.0 | lib/logstash-logger/device/kafka.rb |