# basically we are porting this https://cwiki.apache.org/confluence/display/KAFKA/0.8.0+Producer+Example require "java" require "jruby-kafka/namespace" require "jruby-kafka/error" class Kafka::Producer java_import 'kafka.producer.ProducerConfig' java_import 'kafka.producer.KeyedMessage' KafkaProducer = Java::kafka.javaapi.producer.Producer java_import 'kafka.message.NoCompressionCodec' java_import 'kafka.message.GZIPCompressionCodec' java_import 'kafka.message.SnappyCompressionCodec' VALIDATIONS = { 'request.required.acks' => %w[ 0 1 -1 ], 'required.codecs' => [NoCompressionCodec.name, GZIPCompressionCodec.name, SnappyCompressionCodec.name], 'producer.type' => %w[ sync async ] } REQUIRED = %w[ metadata.broker.list ] # List of all available options extracted from http://kafka.apache.org/documentation.html#producerconfigs Apr. 27, 2014 # If new options are added, they should just work. Please add them to the list so that we can get handy warnings. KNOWN = %w[ metadata.broker.list request.required.acks request.timeout.ms producer.type serializer.class key.serializer.class partitioner.class compression.codec compressed.topics message.send.max.retries retry.backoff.ms topic.metadata.refresh.interval.ms queue.buffering.max.ms queue.buffering.max.messages queue.enqueue.timeout.ms batch.num.messages send.buffer.bytes client.id broker.list ] attr_reader :producer, :send_method, :options # Create a Kafka Producer # # options: # metadata_broker_list: ["localhost:9092"] - REQUIRED: a seed list of kafka brokers def initialize(opts = {}) @options = opts.reduce({}) do |opts, (k, v)| opts[k.to_s.gsub(/_/, '.')] = v opts end if options['broker.list'] options['metadata.broker.list'] = options.delete 'broker.list' end if options['compressed.topics'].to_s == 'none' options.delete 'compressed.topics' end if options['metadata.broker.list'].is_a? Array options['metadata.broker.list'] = options['metadata.broker.list'].join(',') end validate_arguments @send_method = proc { throw StandardError.new "Producer is not connected" } end def connect @producer = KafkaProducer.new(createProducerConfig) @send_method = producer.java_method :send, [KeyedMessage] end # throws FailedToSendMessageException or if not connected, StandardError. def sendMsg(topic, key, msg) send_method.call(KeyedMessage.new(topic, key, msg)) end def close @producer.close end private def validate_arguments errors = [] missing = REQUIRED.reject { |opt| options[opt] } errors = ["Required settings: #{ missing.join(', ')}"] if missing.any? invalid = VALIDATIONS.reject { |opt, valid| options[opt].nil? or valid.include? options[opt].to_s } errors += invalid.map { |opt, valid| "#{ opt } should be one of: [#{ valid.join(', ')}]" } fail StandardError.new "Invalid configuration arguments: #{ errors.join('; ') }" if errors.any? options.keys.each do |opt| STDERR.puts "WARNING: Unknown configuration key: #{opt}" unless KNOWN.include? opt end end def createProducerConfig properties = java.util.Properties.new() options.each { |opt, value| properties.put opt, value.to_s } return ProducerConfig.new(properties) end end