lib/racecar/runner.rb in racecar-0.3.0 vs lib/racecar/runner.rb in racecar-0.3.1

- old
+ new

@@ -1,15 +1,20 @@ require "kafka" module Racecar class Runner - attr_reader :processor, :config, :logger + attr_reader :processor, :config, :logger, :consumer def initialize(processor, config:, logger:) @processor, @config, @logger = processor, config, logger end + def stop + processor.teardown + consumer.stop unless consumer.nil? + end + def run kafka = Kafka.new( client_id: config.client_id, seed_brokers: config.brokers, logger: logger, @@ -18,20 +23,21 @@ ssl_ca_cert: config.ssl_ca_cert, ssl_client_cert: config.ssl_client_cert, ssl_client_cert_key: config.ssl_client_cert_key, ) - consumer = kafka.consumer( + @consumer = kafka.consumer( group_id: config.group_id, offset_commit_interval: config.offset_commit_interval, offset_commit_threshold: config.offset_commit_threshold, + session_timeout: config.session_timeout, heartbeat_interval: config.heartbeat_interval, ) # Stop the consumer on SIGINT, SIGQUIT or SIGTERM. - trap("QUIT") { consumer.stop } - trap("INT") { consumer.stop } - trap("TERM") { consumer.stop } + trap("QUIT") { stop } + trap("INT") { stop } + trap("TERM") { stop } # Print the consumer config to STDERR on USR1. trap("USR1") { $stderr.puts config.inspect } config.subscriptions.each do |subscription|