lib/racecar/runner.rb in racecar-0.3.7 vs lib/racecar/runner.rb in racecar-0.3.8
- old
+ new
@@ -33,10 +33,11 @@
group_id: config.group_id,
offset_commit_interval: config.offset_commit_interval,
offset_commit_threshold: config.offset_commit_threshold,
session_timeout: config.session_timeout,
heartbeat_interval: config.heartbeat_interval,
+ offset_retention_time: config.offset_retention_time,
)
# Stop the consumer on SIGINT, SIGQUIT or SIGTERM.
trap("QUIT") { stop }
trap("INT") { stop }
@@ -105,9 +106,14 @@
# Pause fetches from the partition. We'll continue processing the other partitions in the topic.
# The partition is automatically resumed after the specified timeout, and will continue where we
# left off.
@logger.warn "Pausing partition #{e.topic}/#{e.partition} for #{config.pause_timeout} seconds"
consumer.pause(e.topic, e.partition, timeout: config.pause_timeout)
+ elsif config.pause_timeout == -1
+ # A pause timeout of -1 means indefinite pausing, which in ruby-kafka is done by passing nil as
+ # the timeout.
+ @logger.warn "Pausing partition #{e.topic}/#{e.partition} indefinitely, or until the process is restarted"
+ consumer.pause(e.topic, e.partition, timeout: nil)
end
config.error_handler.call(e.cause, {
topic: e.topic,
partition: e.partition,