lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.10.0 vs lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.11.0
- old
+ new
@@ -10,13 +10,11 @@
# for out_kafka_buffered
module Kafka
EMPTY_HEADER = {}
class Producer
- def produce_for_buffered(value, key: nil, topic:, partition: nil, partition_key: nil)
- create_time = Time.now
-
+ def produce_for_buffered(value, key: nil, topic:, partition: nil, partition_key: nil, create_time: Time.now)
message = PendingMessage.new(
value: value,
key: key,
headers: EMPTY_HEADER,
topic: topic,
@@ -97,16 +95,14 @@
# Messages added by `#produce` but not yet assigned a partition.
@pending_message_queue = PendingMessageQueue.new
end
- def produce(value, key: nil, partition: nil, partition_key: nil)
- create_time = Time.now
-
+ def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now)
message = PendingMessage.new(
value: value,
key: key,
- headers: EMPTY_HEADER,
+ headers: headers,
topic: @topic,
partition: partition,
partition_key: partition_key,
create_time: create_time
)