lib/water_drop/producer/buffer.rb in waterdrop-2.0.6 vs lib/water_drop/producer/buffer.rb in waterdrop-2.0.7

- old
+ new

@@ -21,11 +21,11 @@ ensure_active! validate_message!(message) @monitor.instrument( 'message.buffered', - producer: self, + producer_id: id, message: message ) { @messages << message } end # Adds given messages into the internal producer buffer without flushing them to Kafka @@ -38,11 +38,11 @@ ensure_active! messages.each { |message| validate_message!(message) } @monitor.instrument( 'messages.buffered', - producer: self, + producer_id: id, messages: messages ) do messages.each { |message| @messages << message } messages end @@ -54,11 +54,11 @@ def flush_async ensure_active! @monitor.instrument( 'buffer.flushed_async', - producer: self, + producer_id: id, messages: @messages ) { flush(false) } end # Flushes the internal buffer to Kafka in a sync way @@ -67,11 +67,11 @@ def flush_sync ensure_active! @monitor.instrument( 'buffer.flushed_sync', - producer: self, + producer_id: id, messages: @messages ) { flush(true) } end private @@ -102,10 +102,10 @@ wait_timeout: @config.wait_timeout ) end rescue *RESCUED_ERRORS => e key = sync ? 'buffer.flushed_sync.error' : 'buffer.flush_async.error' - @monitor.instrument(key, producer: self, error: e, dispatched: dispatched) + @monitor.instrument(key, producer_id: id, error: e, dispatched: dispatched) raise Errors::FlushFailureError.new(dispatched) end end end