lib/jruby-kafka/group.rb in jruby-kafka-0.0.6 vs lib/jruby-kafka/group.rb in jruby-kafka-0.0.7
- old
+ new
@@ -26,11 +26,12 @@
# :zk_connect_timeout => "6000" - (optional) The max time that the client waits while establishing a connection to zookeeper.
# :group_id => "group" - REQUIRED: The group id to consume on.
# :topic_id => "topic" - REQUIRED: The topic id to consume on.
# :reset_beginning => "from-beginning" - (optional) If the consumer does not already have an established offset
# to consume from, start with the earliest message present in the log rather than the latest message.
- #
+ # :consumer_restart_on_error => "true" - (optional) Controls if consumer threads are to restart on caught exceptions.
+ # exceptions are logged.
def initialize(options={})
validate_required_arguments(options)
@zk_connect = options[:zk_connect]
@group_id = options[:group_id]
@@ -50,10 +51,12 @@
@queued_max_message_chunks = '10'
@fetch_min_bytes = '1'
@fetch_wait_max_ms = '100'
@refresh_leader_backoff_ms = '200'
@consumer_timeout_ms = '-1'
+ @consumer_restart_on_error = "#{true}"
+ @consumer_restart_sleep_ms = '2000'
if options[:zk_connect_timeout]
@zk_connect_timeout = options[:zk_connect_timeout]
end
if options[:zk_session_timeout]
@@ -108,10 +111,19 @@
if options[:consumer_timeout_ms]
@consumer_timeout_ms = options[:consumer_timeout_ms]
end
+ if options[:consumer_restart_on_error]
+ @consumer_restart_on_error = options[:consumer_restart_on_error]
+ end
+
+ if options[:consumer_restart_sleep_ms]
+ @consumer_restart_sleep_ms = options[:consumer_restart_sleep_ms]
+ end
+
+
if options[:reset_beginning]
if options[:reset_beginning] == 'from-beginning'
@auto_offset_reset = 'smallest'
else
@auto_offset_reset = 'largest'
@@ -157,10 +169,10 @@
@executor = Executors.newFixedThreadPool(a_numThreads)
@executor_submit = @executor.java_method(:submit, [Java::JavaLang::Runnable.java_class])
threadNumber = 0
for stream in streams
- @executor_submit.call(Kafka::Consumer.new(stream, threadNumber, a_queue))
+ @executor_submit.call(Kafka::Consumer.new(stream, threadNumber, a_queue, @consumer_restart_on_error, @consumer_restart_sleep_ms))
threadNumber += 1
end
@running = true
end