Sha256: ee07fcd572132495e9a571663f70f4d355a2f2d60e7fea4cfe4f9c3f0004e5ff

Contents?: true

Size: 857 Bytes

Versions: 11

Compression:

Stored size: 857 Bytes

Contents

require 'liquid/tracker/base'

module Tracker
  class KafkaTracker < Base

    java_import 'kafka.javaapi.producer.Producer'
    java_import 'kafka.producer.ProducerConfig'
    java_import 'kafka.producer.KeyedMessage'

    def initialize(properties, dimensions = {})
      super(dimensions)
      @producer = Producer.new(ProducerConfig.new(properties))
    end

    def down?
      # TODO: async is fire and forget. we might want to handle
      # QueueFullExceptions later
      false
    end

    def event(topic, data)
      @producer.send(KeyedMessage.new(topic, data))
    rescue => e
      # TODO: maybe fall back to FileTracker here
      $log.exception(e, "failed to log #{topic}=#{data.inspect}")
    end

    def shutdown
      @producer.close if @producer
    rescue Java::KafkaProducer::ProducerClosedException
      # pass
    end

  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
liquid-ext-3.5.3 lib/liquid/tracker/kafka_tracker.rb
liquid-ext-3.5.2 lib/liquid/tracker/kafka_tracker.rb
liquid-ext-3.5.0 lib/liquid/tracker/kafka_tracker.rb
liquid-ext-3.4.2 lib/liquid/tracker/kafka_tracker.rb
liquid-ext-3.4.1 lib/liquid/tracker/kafka_tracker.rb
liquid-ext-3.4.0 lib/liquid/tracker/kafka_tracker.rb
liquid-ext-3.3.10 lib/liquid/tracker/kafka_tracker.rb
liquid-ext-3.3.9 lib/liquid/tracker/kafka_tracker.rb
liquid-ext-3.3.8 lib/liquid/tracker/kafka_tracker.rb
liquid-ext-3.3.7 lib/liquid/tracker/kafka_tracker.rb
liquid-ext-3.3.6 lib/liquid/tracker/kafka_tracker.rb