lib/water_drop/producer/buffer.rb in waterdrop-2.0.6 vs lib/water_drop/producer/buffer.rb in waterdrop-2.0.7
- old
+ new
@@ -21,11 +21,11 @@
ensure_active!
validate_message!(message)
@monitor.instrument(
'message.buffered',
- producer: self,
+ producer_id: id,
message: message
) { @messages << message }
end
# Adds given messages into the internal producer buffer without flushing them to Kafka
@@ -38,11 +38,11 @@
ensure_active!
messages.each { |message| validate_message!(message) }
@monitor.instrument(
'messages.buffered',
- producer: self,
+ producer_id: id,
messages: messages
) do
messages.each { |message| @messages << message }
messages
end
@@ -54,11 +54,11 @@
def flush_async
ensure_active!
@monitor.instrument(
'buffer.flushed_async',
- producer: self,
+ producer_id: id,
messages: @messages
) { flush(false) }
end
# Flushes the internal buffer to Kafka in a sync way
@@ -67,11 +67,11 @@
def flush_sync
ensure_active!
@monitor.instrument(
'buffer.flushed_sync',
- producer: self,
+ producer_id: id,
messages: @messages
) { flush(true) }
end
private
@@ -102,10 +102,10 @@
wait_timeout: @config.wait_timeout
)
end
rescue *RESCUED_ERRORS => e
key = sync ? 'buffer.flushed_sync.error' : 'buffer.flush_async.error'
- @monitor.instrument(key, producer: self, error: e, dispatched: dispatched)
+ @monitor.instrument(key, producer_id: id, error: e, dispatched: dispatched)
raise Errors::FlushFailureError.new(dispatched)
end
end
end