Sha256: a8794dd417baf47ed256814450b58c92ccd0d8c63182d01358ae35e1755eb77d
Contents?: true
Size: 1.08 KB
Versions: 1
Compression:
Stored size: 1.08 KB
Contents
require 'liquid/tracker/base' require_relative '../scala-library-2.10.3.jar' require_relative '../metrics-core-2.2.0.jar' require_relative '../metrics-annotation-2.2.0.jar' require_relative '../kafka_2.10-0.8.0.jar' require_relative '../snappy-java-1.1.1-M1.jar' module Tracker class KafkaTracker < Base java_import 'kafka.javaapi.producer.Producer' java_import 'kafka.producer.ProducerConfig' java_import 'kafka.producer.KeyedMessage' def initialize(properties, dimensions = {}) super(dimensions) @producer = Producer.new(ProducerConfig.new(properties)) end def down? # TODO: async is fire and forget. we might want to handle # QueueFullExceptions later false end def event(obj, topic) @producer.send(KeyedMessage.new(topic, @serializer.dump(obj))) rescue => e # TODO: maybe fall back to FileTracker here $log.exception(e, "failed to log event=#{obj.inspect}") end def shutdown @producer.close if @producer rescue Java::KafkaProducer::ProducerClosedException # pass end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
liquid-ext-3.0.0 | lib/liquid/tracker/kafka_tracker.rb |