lib/racecar/runner.rb in racecar-0.3.5 vs lib/racecar/runner.rb in racecar-0.3.6
- old
+ new
@@ -20,10 +20,11 @@
seed_brokers: config.brokers,
logger: logger,
connect_timeout: config.connect_timeout,
socket_timeout: config.socket_timeout,
ssl_ca_cert: config.ssl_ca_cert,
+ ssl_ca_cert_file_path: config.ssl_ca_cert_file_path,
ssl_client_cert: config.ssl_client_cert,
ssl_client_cert_key: config.ssl_client_cert_key,
sasl_plain_username: config.sasl_plain_username,
sasl_plain_password: config.sasl_plain_password,
)
@@ -50,10 +51,14 @@
start_from_beginning: subscription.start_from_beginning,
max_bytes_per_partition: subscription.max_bytes_per_partition,
)
end
+ # Configure the consumer with a producer so it can produce messages.
+ producer = kafka.producer
+ processor.configure(producer: producer)
+
begin
if processor.respond_to?(:process)
consumer.each_message(max_wait_time: config.max_wait_time) do |message|
payload = {
consumer_class: processor.class.to_s,
@@ -66,10 +71,11 @@
# message.
@instrumenter.instrument("start_process_message.racecar", payload)
@instrumenter.instrument("process_message.racecar", payload) do
processor.process(message)
+ producer.deliver_messages
end
end
elsif processor.respond_to?(:process_batch)
consumer.each_batch(max_wait_time: config.max_wait_time) do |batch|
payload = {
@@ -84,9 +90,10 @@
# message.
@instrumenter.instrument("start_process_batch.racecar", payload)
@instrumenter.instrument("process_batch.racecar", payload) do
processor.process_batch(batch)
+ producer.deliver_messages
end
end
else
raise NotImplementedError, "Consumer class must implement process or process_batch method"
end