lib/racecar/runner.rb in racecar-0.4.2 vs lib/racecar/runner.rb in racecar-0.5.0.beta1
- old
+ new
@@ -8,12 +8,14 @@
@processor, @config, @logger = processor, config, logger
@instrumenter = instrumenter
end
def stop
- processor.teardown
- consumer.stop unless consumer.nil?
+ Thread.new do
+ processor.teardown
+ consumer.stop unless consumer.nil?
+ end.join
end
def run
kafka = Kafka.new(
client_id: config.client_id,
@@ -62,10 +64,10 @@
# Configure the consumer with a producer so it can produce messages.
producer = kafka.producer(
compression_codec: config.producer_compression_codec,
)
- processor.configure(producer: producer)
+ processor.configure(consumer: consumer, producer: producer)
begin
if processor.respond_to?(:process)
consumer.each_message(max_wait_time: config.max_wait_time, max_bytes: config.max_bytes) do |message|
payload = {