lib/racecar/runner.rb in racecar-2.0.0.beta1 vs lib/racecar/runner.rb in racecar-2.0.0.beta2
- old
+ new
@@ -51,17 +51,20 @@
producer: producer,
consumer: consumer,
instrumenter: @instrumenter,
)
- instrument_payload = { consumer_class: processor.class.to_s, consumer_set: consumer }
+ instrumentation_payload = {
+ consumer_class: processor.class.to_s,
+ consumer_set: consumer
+ }
# Main loop
loop do
break if @stop_requested
resume_paused_partitions
- @instrumenter.instrument("main_loop.racecar", instrument_payload) do
+ @instrumenter.instrument("main_loop", instrumentation_payload) do
case process_method
when :batch then
msg_per_part = consumer.batch_poll(config.max_wait_time).group_by(&:partition)
msg_per_part.each_value do |messages|
process_batch(messages)
@@ -75,11 +78,13 @@
logger.info "Gracefully shutting down"
processor.deliver!
processor.teardown
consumer.commit
- consumer.close
+ @instrumenter.instrument('leave_group') do
+ consumer.close
+ end
end
def stop
@stop_requested = true
end
@@ -104,11 +109,11 @@
# Manually store offset after messages have been processed successfully
# to avoid marking failed messages as committed. The call just updates
# a value within librdkafka and is asynchronously written to proper
# storage through auto commits.
config.consumer << "enable.auto.offset.store=false"
- ConsumerSet.new(config, logger)
+ ConsumerSet.new(config, logger, @instrumenter)
end
end
def producer
@producer ||= Rdkafka::Config.new(producer_config).producer.tap do |producer|
@@ -128,12 +133,15 @@
producer_config
end
def delivery_callback
->(delivery_report) do
- data = {offset: delivery_report.offset, partition: delivery_report.partition}
- @instrumenter.instrument("acknowledged_message.racecar", data)
+ payload = {
+ offset: delivery_report.offset,
+ partition: delivery_report.partition
+ }
+ @instrumenter.instrument("acknowledged_message", payload)
end
end
def install_signal_handlers
# Stop the consumer on SIGINT, SIGQUIT or SIGTERM.
@@ -144,37 +152,44 @@
# Print the consumer config to STDERR on USR1.
trap("USR1") { $stderr.puts config.inspect }
end
def process(message)
- payload = {
+ instrumentation_payload = {
consumer_class: processor.class.to_s,
- topic: message.topic,
- partition: message.partition,
- offset: message.offset,
+ topic: message.topic,
+ partition: message.partition,
+ offset: message.offset,
+ create_time: message.timestamp,
+ key: message.key,
+ value: message.payload,
+ headers: message.headers
}
- @instrumenter.instrument("process_message.racecar", payload) do
+ @instrumenter.instrument("start_process_message", instrumentation_payload)
+ @instrumenter.instrument("process_message", instrumentation_payload) do
with_pause(message.topic, message.partition, message.offset..message.offset) do
processor.process(Racecar::Message.new(message))
processor.deliver!
consumer.store_offset(message)
end
end
end
def process_batch(messages)
- payload = {
+ first, last = messages.first, messages.last
+ instrumentation_payload = {
consumer_class: processor.class.to_s,
- topic: messages.first.topic,
- partition: messages.first.partition,
- first_offset: messages.first.offset,
- message_count: messages.size,
+ topic: first.topic,
+ partition: first.partition,
+ first_offset: first.offset,
+ last_offset: last.offset,
+ message_count: messages.size
}
- @instrumenter.instrument("process_batch.racecar", payload) do
- first, last = messages.first, messages.last
+ @instrumenter.instrument("start_process_batch", instrumentation_payload)
+ @instrumenter.instrument("process_batch", instrumentation_payload) do
with_pause(first.topic, first.partition, first.offset..last.offset) do
processor.process_batch(messages.map {|message| Racecar::Message.new(message) })
processor.deliver!
consumer.store_offset(messages.last)
end
@@ -202,14 +217,16 @@
def resume_paused_partitions
return if config.pause_timeout == 0
pauses.each do |topic, partitions|
partitions.each do |partition, pause|
- @instrumenter.instrument("pause_status.racecar", {
- topic: topic,
- partition: partition,
- duration: pause.pause_duration,
- })
+ instrumentation_payload = {
+ topic: topic,
+ partition: partition,
+ duration: pause.pause_duration,
+ consumer_class: processor.class.to_s,
+ }
+ @instrumenter.instrument("pause_status", instrumentation_payload)
if pause.paused? && pause.expired?
logger.info "Automatically resuming partition #{topic}/#{partition}, pause timeout expired"
consumer.resume(topic, partition)
pause.resume!