lib/jruby-kafka/group.rb in jruby-kafka-0.0.6 vs lib/jruby-kafka/group.rb in jruby-kafka-0.0.7

- old
+ new

@@ -26,11 +26,12 @@ # :zk_connect_timeout => "6000" - (optional) The max time that the client waits while establishing a connection to zookeeper. # :group_id => "group" - REQUIRED: The group id to consume on. # :topic_id => "topic" - REQUIRED: The topic id to consume on. # :reset_beginning => "from-beginning" - (optional) If the consumer does not already have an established offset # to consume from, start with the earliest message present in the log rather than the latest message. - # + # :consumer_restart_on_error => "true" - (optional) Controls if consumer threads are to restart on caught exceptions. + # exceptions are logged. def initialize(options={}) validate_required_arguments(options) @zk_connect = options[:zk_connect] @group_id = options[:group_id] @@ -50,10 +51,12 @@ @queued_max_message_chunks = '10' @fetch_min_bytes = '1' @fetch_wait_max_ms = '100' @refresh_leader_backoff_ms = '200' @consumer_timeout_ms = '-1' + @consumer_restart_on_error = "#{true}" + @consumer_restart_sleep_ms = '2000' if options[:zk_connect_timeout] @zk_connect_timeout = options[:zk_connect_timeout] end if options[:zk_session_timeout] @@ -108,10 +111,19 @@ if options[:consumer_timeout_ms] @consumer_timeout_ms = options[:consumer_timeout_ms] end + if options[:consumer_restart_on_error] + @consumer_restart_on_error = options[:consumer_restart_on_error] + end + + if options[:consumer_restart_sleep_ms] + @consumer_restart_sleep_ms = options[:consumer_restart_sleep_ms] + end + + if options[:reset_beginning] if options[:reset_beginning] == 'from-beginning' @auto_offset_reset = 'smallest' else @auto_offset_reset = 'largest' @@ -157,10 +169,10 @@ @executor = Executors.newFixedThreadPool(a_numThreads) @executor_submit = @executor.java_method(:submit, [Java::JavaLang::Runnable.java_class]) threadNumber = 0 for stream in streams - @executor_submit.call(Kafka::Consumer.new(stream, threadNumber, a_queue)) + @executor_submit.call(Kafka::Consumer.new(stream, threadNumber, a_queue, @consumer_restart_on_error, @consumer_restart_sleep_ms)) threadNumber += 1 end @running = true end