Sha256: ee07fcd572132495e9a571663f70f4d355a2f2d60e7fea4cfe4f9c3f0004e5ff
Contents?: true
Size: 857 Bytes
Versions: 11
Compression:
Stored size: 857 Bytes
Contents
require 'liquid/tracker/base' module Tracker class KafkaTracker < Base java_import 'kafka.javaapi.producer.Producer' java_import 'kafka.producer.ProducerConfig' java_import 'kafka.producer.KeyedMessage' def initialize(properties, dimensions = {}) super(dimensions) @producer = Producer.new(ProducerConfig.new(properties)) end def down? # TODO: async is fire and forget. we might want to handle # QueueFullExceptions later false end def event(topic, data) @producer.send(KeyedMessage.new(topic, data)) rescue => e # TODO: maybe fall back to FileTracker here $log.exception(e, "failed to log #{topic}=#{data.inspect}") end def shutdown @producer.close if @producer rescue Java::KafkaProducer::ProducerClosedException # pass end end end
Version data entries
11 entries across 11 versions & 1 rubygems