lib/jruby-kafka/group.rb in jruby-kafka-0.0.3 vs lib/jruby-kafka/group.rb in jruby-kafka-0.0.4

- old
+ new

@@ -39,10 +39,21 @@ @zk_connect_timeout = '6000' @zk_sync_time = '2000' @auto_offset_reset = 'largest' @auto_commit_interval = '1000' @running = false + @rebalance_max_retries = '4' + @rebalance_backoff_ms = '2000' + @socket_timeout_ms = "#{30 * 1000}" + @socket_receive_buffer_bytes = "#{64 * 1024}" + @fetch_message_max_bytes = "#{1024 * 1024}" + @auto_commit_enable = true + @queued_max_message_chunks = '10' + @fetch_min_bytes = '1' + @fetch_wait_max_ms = '100' + @refresh_leader_backoff_ms = '200' + @consumer_timeout_ms = '-1' if options[:zk_connect_timeout] @zk_connect_timeout = options[:zk_connect_timeout] end if options[:zk_session_timeout] @@ -53,11 +64,54 @@ end if options[:auto_commit_interval] @auto_commit_interval = options[:auto_commit_interval] end + if options[:rebalance_max_retries] + @rebalance_max_retries = options[:rebalance_max_retries] + end + if options[:rebalance_backoff_ms] + @rebalance_backoff_ms = options[:rebalance_backoff_ms] + end + + if options[:socket_timeout_ms] + @socket_timeout_ms = options[:socket_timeout_ms] + end + + if options[:socket_receive_buffer_bytes] + @socket_receive_buffer_bytes = options[:socket_receive_buffer_bytes] + end + + if options[:fetch_message_max_bytes] + @fetch_message_max_bytes = options[:fetch_message_max_bytes] + end + + if options[:auto_commit_enable] + @auto_commit_enable = options[:auto_commit_enable] + end + + if options[:queued_max_message_chunks] + @queued_max_message_chunks = options[:queued_max_message_chunks] + end + + if options[:fetch_min_bytes] + @fetch_min_bytes = options[:fetch_min_bytes] + end + + if options[:fetch_wait_max_ms] + @fetch_wait_max_ms = options[:fetch_wait_max_ms] + end + + if options[:refresh_leader_backoff_ms] + @refresh_leader_backoff_ms = options[:refresh_leader_backoff_ms] + end + + if options[:consumer_timeout_ms] + @consumer_timeout_ms = options[:consumer_timeout_ms] + end + if options[:reset_beginning] if options[:reset_beginning] == 'from-beginning' @auto_offset_reset = 'smallest' else @auto_offset_reset = 'largest' @@ -124,8 +178,19 @@ properties.put("zookeeper.connection.timeout.ms", @zk_connect_timeout) properties.put("zookeeper.session.timeout.ms", @zk_session_timeout) properties.put("zookeeper.sync.time.ms", @zk_sync_time) properties.put("auto.commit.interval.ms", @auto_commit_interval) properties.put("auto.offset.reset", @auto_offset_reset) + properties.put("rebalance.max.retries", @rebalance_max_retries) + properties.put("rebalance.backoff.ms", @rebalance_backoff_ms) + properties.put("socket.timeout.ms", @socket_timeout_ms) + properties.put("socket.receive.buffer.bytes", @socket_receive_buffer_bytes) + properties.put("fetch.message.max.bytes", @fetch_message_max_bytes) + properties.put("auto.commit.enable", @auto_commit_enable) + properties.put("queued.max.message.chunks", @queued_max_message_chunks) + properties.put("fetch.min.bytes", @fetch_min_bytes) + properties.put("fetch.wait.max.ms", @fetch_wait_max_ms) + properties.put("refresh.leader.backoff.ms", @refresh_leader_backoff_ms) + properties.put("consumer.timeout.ms", @consumer_timeout_ms) return Java::kafka::consumer::ConsumerConfig.new(properties) end end