lib/jruby-kafka/group.rb in jruby-kafka-0.0.12 vs lib/jruby-kafka/group.rb in jruby-kafka-0.1.0

- old
+ new

@@ -53,10 +53,11 @@ @fetch_wait_max_ms = '100' @refresh_leader_backoff_ms = '200' @consumer_timeout_ms = '-1' @consumer_restart_on_error = "#{false}" @consumer_restart_sleep_ms = '0' + @consumer_id = nil if options[:zk_connect_timeout] @zk_connect_timeout = "#{options[:zk_connect_timeout]}" end if options[:zk_session_timeout] @@ -127,10 +128,14 @@ @auto_offset_reset = 'smallest' else @auto_offset_reset = 'largest' end end + + if options[:consumer_id] + @consumer_id = options[:consumer_id] + end end private def validate_required_arguments(options={}) [:zk_connect, :group_id, :topic_id].each do |opt| @@ -201,8 +206,11 @@ properties.put("queued.max.message.chunks", @queued_max_message_chunks) properties.put("fetch.min.bytes", @fetch_min_bytes) properties.put("fetch.wait.max.ms", @fetch_wait_max_ms) properties.put("refresh.leader.backoff.ms", @refresh_leader_backoff_ms) properties.put("consumer.timeout.ms", @consumer_timeout_ms) + unless @consumer_id.nil? + properties.put('consumer.id', @consumer_id) + end return Java::kafka::consumer::ConsumerConfig.new(properties) end end