Sha256: 8e6554b91787e9c53672796b8e73f9d5ce2b5fc12c0e313a0f9de9847d274c43

Contents?: true

Size: 1.29 KB

Versions: 1

Compression:

Stored size: 1.29 KB

Contents

#TODO: Seems to be a delay after shutting down Kafka and ZK updating
module Druid
  class Writer
    attr_reader :config, :producer, :zk
    def initialize(config, zk)
      @config = config
      @zk = zk
      init_producer
      zk.register_listener(self, :handle_kafka_state_change)
    end

    def write_point(datasource, datapoint)
      begin
        raise Druid::ConnectionError, 'no kafka brokers available' if producer.nil?
        producer.send_msg(datasource, nil, datapoint)
      rescue Java::KafkaCommon::FailedToSendMessageException => e
        init_producer #TODO: This may not be the best way to handle it
        producer.send_msg(datasource, nil, datapoint)
      end
    end

    private

    def broker_list
      zk.registry["/brokers/ids"].map{|instance| "#{instance[:host]}:#{instance[:port]}" }.join(',')
    end

    def handle_kafka_state_change(service)
      if service == config.kafka_broker_path
        init_producer
      end
    end

    def init_producer
      producer_options = {:broker_list => broker_list, "serializer.class" => "kafka.serializer.StringEncoder"}
      if producer_options[:broker_list].present?
        producer = Kafka::Producer.new(producer_options)
        producer.connect()
      else
        producer = nil
      end
      @producer = producer
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
jruby-druid-2.0.0.edge.1 lib/druid/writer.rb