lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.10.0 vs lib/fluent/plugin/kafka_producer_ext.rb in fluent-plugin-kafka-0.11.0

- old
+ new

@@ -10,13 +10,11 @@ # for out_kafka_buffered module Kafka EMPTY_HEADER = {} class Producer - def produce_for_buffered(value, key: nil, topic:, partition: nil, partition_key: nil) - create_time = Time.now - + def produce_for_buffered(value, key: nil, topic:, partition: nil, partition_key: nil, create_time: Time.now) message = PendingMessage.new( value: value, key: key, headers: EMPTY_HEADER, topic: topic, @@ -97,16 +95,14 @@ # Messages added by `#produce` but not yet assigned a partition. @pending_message_queue = PendingMessageQueue.new end - def produce(value, key: nil, partition: nil, partition_key: nil) - create_time = Time.now - + def produce(value, key: nil, partition: nil, partition_key: nil, headers: EMPTY_HEADER, create_time: Time.now) message = PendingMessage.new( value: value, key: key, - headers: EMPTY_HEADER, + headers: headers, topic: @topic, partition: partition, partition_key: partition_key, create_time: create_time )