lib/racecar/runner.rb in racecar-0.3.7 vs lib/racecar/runner.rb in racecar-0.3.8

- old
+ new

@@ -33,10 +33,11 @@ group_id: config.group_id, offset_commit_interval: config.offset_commit_interval, offset_commit_threshold: config.offset_commit_threshold, session_timeout: config.session_timeout, heartbeat_interval: config.heartbeat_interval, + offset_retention_time: config.offset_retention_time, ) # Stop the consumer on SIGINT, SIGQUIT or SIGTERM. trap("QUIT") { stop } trap("INT") { stop } @@ -105,9 +106,14 @@ # Pause fetches from the partition. We'll continue processing the other partitions in the topic. # The partition is automatically resumed after the specified timeout, and will continue where we # left off. @logger.warn "Pausing partition #{e.topic}/#{e.partition} for #{config.pause_timeout} seconds" consumer.pause(e.topic, e.partition, timeout: config.pause_timeout) + elsif config.pause_timeout == -1 + # A pause timeout of -1 means indefinite pausing, which in ruby-kafka is done by passing nil as + # the timeout. + @logger.warn "Pausing partition #{e.topic}/#{e.partition} indefinitely, or until the process is restarted" + consumer.pause(e.topic, e.partition, timeout: nil) end config.error_handler.call(e.cause, { topic: e.topic, partition: e.partition,