lib/racecar/runner.rb in racecar-2.0.0.alpha2 vs lib/racecar/runner.rb in racecar-2.0.0.alpha3
- old
+ new
@@ -1,7 +1,8 @@
require "rdkafka"
require "racecar/pause"
+require "racecar/message"
module Racecar
class Runner
attr_reader :processor, :config, :logger
@@ -152,11 +153,11 @@
offset: message.offset,
}
@instrumenter.instrument("process_message.racecar", payload) do
with_pause(message.topic, message.partition, message.offset..message.offset) do
- processor.process(message)
+ processor.process(Racecar::Message.new(message))
processor.deliver!
consumer.store_offset(message)
end
end
end
@@ -171,10 +172,10 @@
}
@instrumenter.instrument("process_batch.racecar", payload) do
first, last = messages.first, messages.last
with_pause(first.topic, first.partition, first.offset..last.offset) do
- processor.process_batch(messages)
+ processor.process_batch(messages.map {|message| Racecar::Message.new(message) })
processor.deliver!
consumer.store_offset(messages.last)
end
end
end