lib/jruby-kafka/group.rb in jruby-kafka-0.0.12 vs lib/jruby-kafka/group.rb in jruby-kafka-0.1.0
- old
+ new
@@ -53,10 +53,11 @@
@fetch_wait_max_ms = '100'
@refresh_leader_backoff_ms = '200'
@consumer_timeout_ms = '-1'
@consumer_restart_on_error = "#{false}"
@consumer_restart_sleep_ms = '0'
+ @consumer_id = nil
if options[:zk_connect_timeout]
@zk_connect_timeout = "#{options[:zk_connect_timeout]}"
end
if options[:zk_session_timeout]
@@ -127,10 +128,14 @@
@auto_offset_reset = 'smallest'
else
@auto_offset_reset = 'largest'
end
end
+
+ if options[:consumer_id]
+ @consumer_id = options[:consumer_id]
+ end
end
private
def validate_required_arguments(options={})
[:zk_connect, :group_id, :topic_id].each do |opt|
@@ -201,8 +206,11 @@
properties.put("queued.max.message.chunks", @queued_max_message_chunks)
properties.put("fetch.min.bytes", @fetch_min_bytes)
properties.put("fetch.wait.max.ms", @fetch_wait_max_ms)
properties.put("refresh.leader.backoff.ms", @refresh_leader_backoff_ms)
properties.put("consumer.timeout.ms", @consumer_timeout_ms)
+ unless @consumer_id.nil?
+ properties.put('consumer.id', @consumer_id)
+ end
return Java::kafka::consumer::ConsumerConfig.new(properties)
end
end