lib/racecar/runner.rb in racecar-0.4.2 vs lib/racecar/runner.rb in racecar-0.5.0.beta1

- old
+ new

@@ -8,12 +8,14 @@ @processor, @config, @logger = processor, config, logger @instrumenter = instrumenter end def stop - processor.teardown - consumer.stop unless consumer.nil? + Thread.new do + processor.teardown + consumer.stop unless consumer.nil? + end.join end def run kafka = Kafka.new( client_id: config.client_id, @@ -62,10 +64,10 @@ # Configure the consumer with a producer so it can produce messages. producer = kafka.producer( compression_codec: config.producer_compression_codec, ) - processor.configure(producer: producer) + processor.configure(consumer: consumer, producer: producer) begin if processor.respond_to?(:process) consumer.each_message(max_wait_time: config.max_wait_time, max_bytes: config.max_bytes) do |message| payload = {