lib/jruby-kafka/producer.rb in jruby-kafka-0.0.10 vs lib/jruby-kafka/producer.rb in jruby-kafka-0.0.11

- old
+ new

@@ -3,128 +3,70 @@ require "java" require "jruby-kafka/namespace" require "jruby-kafka/error" -java_import 'org.I0Itec.zkclient.exception.ZkException' +java_import 'kafka.common.FailedToSendMessageException' class Kafka::Producer @topic - @zk_connect # Create a Kafka Producer # # options: - # :zk_connect => "localhost:2181" - REQUIRED: The connection string for the - # zookeeper connection in the form host:port. Multiple URLS can be given to allow fail-over. - # :zk_connect_timeout => "6000" - (optional) The max time that the client waits while establishing a connection to zookeeper. # :topic_id => "topic" - REQUIRED: The topic id to consume on. # :broker_list => "localhost:9092" - REQUIRED: a seed list of kafka brokers def initialize(options={}) validate_required_arguments(options) - @zk_connect = options[:zk_connect] - @topic = options[:topic_id] @brokers = options[:broker_list] - @zk_session_timeout = '6000' - @zk_connect_timeout = '6000' - @zk_sync_time = '2000' - @auto_offset_reset = 'largest' - @auto_commit_interval = '1000' - @running = false - @rebalance_max_retries = '4' - @rebalance_backoff_ms = '2000' - @socket_timeout_ms = "#{30 * 1000}" - @socket_receive_buffer_bytes = "#{64 * 1024}" - @auto_commit_enable = "#{true}" - @queued_max_message_chunks = '10' - @refresh_leader_backoff_ms = '200' - @consumer_timeout_ms = '-1' + @serializer_class = 'kafka.serializer.StringEncoder' + @partitioner_class = nil + @request_required_acks = '0' - if options[:zk_connect_timeout] - @zk_connect_timeout = "#{options[:zk_connect_timeout]}" + if options[:partitioner_class] + @partitioner_class = "#{options[:partitioner_class]}" end - if options[:zk_session_timeout] - @zk_session_timeout = "#{options[:zk_session_timeout]}" - end - if options[:zk_sync_time] - @zk_sync_time = "#{options[:zk_sync_time]}" - end - if options[:auto_commit_interval] - @auto_commit_interval = "#{options[:auto_commit_interval]}" - end - if options[:rebalance_max_retries] - @rebalance_max_retries = "#{options[:rebalance_max_retries]}" + if options[:request_required_acks] + @request_required_acks = "#{options[:request_required_acks]}" end - - if options[:rebalance_backoff_ms] - @rebalance_backoff_ms = "#{options[:rebalance_backoff_ms]}" - end - - if options[:socket_timeout_ms] - @socket_timeout_ms = "#{options[:socket_timeout_ms]}" - end - - if options[:socket_receive_buffer_bytes] - @socket_receive_buffer_bytes = "#{options[:socket_receive_buffer_bytes]}" - end - - if options[:auto_commit_enable] - @auto_commit_enable = "#{options[:auto_commit_enable]}" - end - - if options[:refresh_leader_backoff_ms] - @refresh_leader_backoff_ms = "#{options[:refresh_leader_backoff_ms]}" - end - - if options[:consumer_timeout_ms] - @consumer_timeout_ms = "#{options[:consumer_timeout_ms]}" - end - end private def validate_required_arguments(options={}) - [:zk_connect, :broker_list, :topic_id].each do |opt| + [:broker_list].each do |opt| raise(ArgumentError, "#{opt} is required.") unless options[opt] end end public - def shutdown() - @running = false - end - - public def connect() @producer = Java::kafka::producer::Producer.new(createProducerConfig) end public - def sendMsg(key,msg) - m = Java::kafka::producer::KeyedMessage.new(topic=@topic,key=key, message=msg) + def sendMsg(topic, key, msg) + m = Java::kafka::producer::KeyedMessage.new(topic=topic, key=key, message=msg) #the send message for a producer is scala varargs, which doesn't seem to play nice w/ jruby # this is the best I could come up with ms = Java::scala::collection::immutable::Vector.new(0,0,0) ms = ms.append_front(m) - @producer.send(ms) + begin + @producer.send(ms) + rescue FailedToSendMessageException => e + raise KafkaError.new(e), "Got FailedToSendMessageException: #{e}" + end end - public - def running? - @running - end - def createProducerConfig() # TODO lots more options avaiable here: http://kafka.apache.org/documentation.html#producerconfigs properties = java.util.Properties.new() - properties.put("zookeeper.connect", @zk_connect) - properties.put("zookeeper.connection.timeout.ms", @zk_connect_timeout) - properties.put("zookeeper.session.timeout.ms", @zk_session_timeout) - properties.put("zookeeper.sync.time.ms", @zk_sync_time) - properties.put("serializer.class", "kafka.serializer.StringEncoder") - properties.put("request.required.acks", "1") properties.put("metadata.broker.list", @brokers) + properties.put("request.required.acks", @request_required_acks) + if not @partitioner_class.nil? + properties.put("partitioner.class", @partitioner_class) + end + properties.put("serializer.class", @serializer_class) return Java::kafka::producer::ProducerConfig.new(properties) end end