lib/jruby-kafka/group.rb in jruby-kafka-0.0.3 vs lib/jruby-kafka/group.rb in jruby-kafka-0.0.4
- old
+ new
@@ -39,10 +39,21 @@
@zk_connect_timeout = '6000'
@zk_sync_time = '2000'
@auto_offset_reset = 'largest'
@auto_commit_interval = '1000'
@running = false
+ @rebalance_max_retries = '4'
+ @rebalance_backoff_ms = '2000'
+ @socket_timeout_ms = "#{30 * 1000}"
+ @socket_receive_buffer_bytes = "#{64 * 1024}"
+ @fetch_message_max_bytes = "#{1024 * 1024}"
+ @auto_commit_enable = true
+ @queued_max_message_chunks = '10'
+ @fetch_min_bytes = '1'
+ @fetch_wait_max_ms = '100'
+ @refresh_leader_backoff_ms = '200'
+ @consumer_timeout_ms = '-1'
if options[:zk_connect_timeout]
@zk_connect_timeout = options[:zk_connect_timeout]
end
if options[:zk_session_timeout]
@@ -53,11 +64,54 @@
end
if options[:auto_commit_interval]
@auto_commit_interval = options[:auto_commit_interval]
end
+ if options[:rebalance_max_retries]
+ @rebalance_max_retries = options[:rebalance_max_retries]
+ end
+ if options[:rebalance_backoff_ms]
+ @rebalance_backoff_ms = options[:rebalance_backoff_ms]
+ end
+
+ if options[:socket_timeout_ms]
+ @socket_timeout_ms = options[:socket_timeout_ms]
+ end
+
+ if options[:socket_receive_buffer_bytes]
+ @socket_receive_buffer_bytes = options[:socket_receive_buffer_bytes]
+ end
+
+ if options[:fetch_message_max_bytes]
+ @fetch_message_max_bytes = options[:fetch_message_max_bytes]
+ end
+
+ if options[:auto_commit_enable]
+ @auto_commit_enable = options[:auto_commit_enable]
+ end
+
+ if options[:queued_max_message_chunks]
+ @queued_max_message_chunks = options[:queued_max_message_chunks]
+ end
+
+ if options[:fetch_min_bytes]
+ @fetch_min_bytes = options[:fetch_min_bytes]
+ end
+
+ if options[:fetch_wait_max_ms]
+ @fetch_wait_max_ms = options[:fetch_wait_max_ms]
+ end
+
+ if options[:refresh_leader_backoff_ms]
+ @refresh_leader_backoff_ms = options[:refresh_leader_backoff_ms]
+ end
+
+ if options[:consumer_timeout_ms]
+ @consumer_timeout_ms = options[:consumer_timeout_ms]
+ end
+
if options[:reset_beginning]
if options[:reset_beginning] == 'from-beginning'
@auto_offset_reset = 'smallest'
else
@auto_offset_reset = 'largest'
@@ -124,8 +178,19 @@
properties.put("zookeeper.connection.timeout.ms", @zk_connect_timeout)
properties.put("zookeeper.session.timeout.ms", @zk_session_timeout)
properties.put("zookeeper.sync.time.ms", @zk_sync_time)
properties.put("auto.commit.interval.ms", @auto_commit_interval)
properties.put("auto.offset.reset", @auto_offset_reset)
+ properties.put("rebalance.max.retries", @rebalance_max_retries)
+ properties.put("rebalance.backoff.ms", @rebalance_backoff_ms)
+ properties.put("socket.timeout.ms", @socket_timeout_ms)
+ properties.put("socket.receive.buffer.bytes", @socket_receive_buffer_bytes)
+ properties.put("fetch.message.max.bytes", @fetch_message_max_bytes)
+ properties.put("auto.commit.enable", @auto_commit_enable)
+ properties.put("queued.max.message.chunks", @queued_max_message_chunks)
+ properties.put("fetch.min.bytes", @fetch_min_bytes)
+ properties.put("fetch.wait.max.ms", @fetch_wait_max_ms)
+ properties.put("refresh.leader.backoff.ms", @refresh_leader_backoff_ms)
+ properties.put("consumer.timeout.ms", @consumer_timeout_ms)
return Java::kafka::consumer::ConsumerConfig.new(properties)
end
end