lib/racecar/runner.rb in racecar-0.3.0 vs lib/racecar/runner.rb in racecar-0.3.1
- old
+ new
@@ -1,15 +1,20 @@
require "kafka"
module Racecar
class Runner
- attr_reader :processor, :config, :logger
+ attr_reader :processor, :config, :logger, :consumer
def initialize(processor, config:, logger:)
@processor, @config, @logger = processor, config, logger
end
+ def stop
+ processor.teardown
+ consumer.stop unless consumer.nil?
+ end
+
def run
kafka = Kafka.new(
client_id: config.client_id,
seed_brokers: config.brokers,
logger: logger,
@@ -18,20 +23,21 @@
ssl_ca_cert: config.ssl_ca_cert,
ssl_client_cert: config.ssl_client_cert,
ssl_client_cert_key: config.ssl_client_cert_key,
)
- consumer = kafka.consumer(
+ @consumer = kafka.consumer(
group_id: config.group_id,
offset_commit_interval: config.offset_commit_interval,
offset_commit_threshold: config.offset_commit_threshold,
+ session_timeout: config.session_timeout,
heartbeat_interval: config.heartbeat_interval,
)
# Stop the consumer on SIGINT, SIGQUIT or SIGTERM.
- trap("QUIT") { consumer.stop }
- trap("INT") { consumer.stop }
- trap("TERM") { consumer.stop }
+ trap("QUIT") { stop }
+ trap("INT") { stop }
+ trap("TERM") { stop }
# Print the consumer config to STDERR on USR1.
trap("USR1") { $stderr.puts config.inspect }
config.subscriptions.each do |subscription|