lib/racecar/runner.rb in racecar-2.0.0.beta1 vs lib/racecar/runner.rb in racecar-2.0.0.beta2

- old
+ new

@@ -51,17 +51,20 @@ producer: producer, consumer: consumer, instrumenter: @instrumenter, ) - instrument_payload = { consumer_class: processor.class.to_s, consumer_set: consumer } + instrumentation_payload = { + consumer_class: processor.class.to_s, + consumer_set: consumer + } # Main loop loop do break if @stop_requested resume_paused_partitions - @instrumenter.instrument("main_loop.racecar", instrument_payload) do + @instrumenter.instrument("main_loop", instrumentation_payload) do case process_method when :batch then msg_per_part = consumer.batch_poll(config.max_wait_time).group_by(&:partition) msg_per_part.each_value do |messages| process_batch(messages) @@ -75,11 +78,13 @@ logger.info "Gracefully shutting down" processor.deliver! processor.teardown consumer.commit - consumer.close + @instrumenter.instrument('leave_group') do + consumer.close + end end def stop @stop_requested = true end @@ -104,11 +109,11 @@ # Manually store offset after messages have been processed successfully # to avoid marking failed messages as committed. The call just updates # a value within librdkafka and is asynchronously written to proper # storage through auto commits. config.consumer << "enable.auto.offset.store=false" - ConsumerSet.new(config, logger) + ConsumerSet.new(config, logger, @instrumenter) end end def producer @producer ||= Rdkafka::Config.new(producer_config).producer.tap do |producer| @@ -128,12 +133,15 @@ producer_config end def delivery_callback ->(delivery_report) do - data = {offset: delivery_report.offset, partition: delivery_report.partition} - @instrumenter.instrument("acknowledged_message.racecar", data) + payload = { + offset: delivery_report.offset, + partition: delivery_report.partition + } + @instrumenter.instrument("acknowledged_message", payload) end end def install_signal_handlers # Stop the consumer on SIGINT, SIGQUIT or SIGTERM. @@ -144,37 +152,44 @@ # Print the consumer config to STDERR on USR1. trap("USR1") { $stderr.puts config.inspect } end def process(message) - payload = { + instrumentation_payload = { consumer_class: processor.class.to_s, - topic: message.topic, - partition: message.partition, - offset: message.offset, + topic: message.topic, + partition: message.partition, + offset: message.offset, + create_time: message.timestamp, + key: message.key, + value: message.payload, + headers: message.headers } - @instrumenter.instrument("process_message.racecar", payload) do + @instrumenter.instrument("start_process_message", instrumentation_payload) + @instrumenter.instrument("process_message", instrumentation_payload) do with_pause(message.topic, message.partition, message.offset..message.offset) do processor.process(Racecar::Message.new(message)) processor.deliver! consumer.store_offset(message) end end end def process_batch(messages) - payload = { + first, last = messages.first, messages.last + instrumentation_payload = { consumer_class: processor.class.to_s, - topic: messages.first.topic, - partition: messages.first.partition, - first_offset: messages.first.offset, - message_count: messages.size, + topic: first.topic, + partition: first.partition, + first_offset: first.offset, + last_offset: last.offset, + message_count: messages.size } - @instrumenter.instrument("process_batch.racecar", payload) do - first, last = messages.first, messages.last + @instrumenter.instrument("start_process_batch", instrumentation_payload) + @instrumenter.instrument("process_batch", instrumentation_payload) do with_pause(first.topic, first.partition, first.offset..last.offset) do processor.process_batch(messages.map {|message| Racecar::Message.new(message) }) processor.deliver! consumer.store_offset(messages.last) end @@ -202,14 +217,16 @@ def resume_paused_partitions return if config.pause_timeout == 0 pauses.each do |topic, partitions| partitions.each do |partition, pause| - @instrumenter.instrument("pause_status.racecar", { - topic: topic, - partition: partition, - duration: pause.pause_duration, - }) + instrumentation_payload = { + topic: topic, + partition: partition, + duration: pause.pause_duration, + consumer_class: processor.class.to_s, + } + @instrumenter.instrument("pause_status", instrumentation_payload) if pause.paused? && pause.expired? logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired" consumer.resume(topic, partition) pause.resume!