lib/racecar/runner.rb in racecar-0.3.5 vs lib/racecar/runner.rb in racecar-0.3.6

- old
+ new

@@ -20,10 +20,11 @@ seed_brokers: config.brokers, logger: logger, connect_timeout: config.connect_timeout, socket_timeout: config.socket_timeout, ssl_ca_cert: config.ssl_ca_cert, + ssl_ca_cert_file_path: config.ssl_ca_cert_file_path, ssl_client_cert: config.ssl_client_cert, ssl_client_cert_key: config.ssl_client_cert_key, sasl_plain_username: config.sasl_plain_username, sasl_plain_password: config.sasl_plain_password, ) @@ -50,10 +51,14 @@ start_from_beginning: subscription.start_from_beginning, max_bytes_per_partition: subscription.max_bytes_per_partition, ) end + # Configure the consumer with a producer so it can produce messages. + producer = kafka.producer + processor.configure(producer: producer) + begin if processor.respond_to?(:process) consumer.each_message(max_wait_time: config.max_wait_time) do |message| payload = { consumer_class: processor.class.to_s, @@ -66,10 +71,11 @@ # message. @instrumenter.instrument("start_process_message.racecar", payload) @instrumenter.instrument("process_message.racecar", payload) do processor.process(message) + producer.deliver_messages end end elsif processor.respond_to?(:process_batch) consumer.each_batch(max_wait_time: config.max_wait_time) do |batch| payload = { @@ -84,9 +90,10 @@ # message. @instrumenter.instrument("start_process_batch.racecar", payload) @instrumenter.instrument("process_batch.racecar", payload) do processor.process_batch(batch) + producer.deliver_messages end end else raise NotImplementedError, "Consumer class must implement process or process_batch method" end