lib/kafka/producer.rb in ruby-kafka-0.7.0.alpha4 vs lib/kafka/producer.rb in ruby-kafka-0.7.0.beta1
- old
+ new
@@ -170,25 +170,27 @@
# so can carry semantic value--whether you want to have the message key double
# as a partition key is up to you.
#
# @param value [String] the message data.
# @param key [String] the message key.
+ # @param headers [Hash<String, String>] the headers for the message.
# @param topic [String] the topic that the message should be written to.
# @param partition [Integer] the partition that the message should be written to.
# @param partition_key [String] the key that should be used to assign a partition.
# @param create_time [Time] the timestamp that should be set on the message.
#
# @raise [BufferOverflow] if the maximum buffer size has been reached.
# @return [nil]
- def produce(value, key: nil, topic:, partition: nil, partition_key: nil, create_time: Time.now)
+ def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now)
message = PendingMessage.new(
- value && value.to_s,
- key && key.to_s,
- topic.to_s,
- partition && Integer(partition),
- partition_key && partition_key.to_s,
- create_time,
+ value: value && value.to_s,
+ key: key && key.to_s,
+ headers: headers,
+ topic: topic.to_s,
+ partition: partition && Integer(partition),
+ partition_key: partition_key && partition_key.to_s,
+ create_time: create_time
)
if buffer_size >= @max_buffer_size
buffer_overflow topic,
"Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached"
@@ -352,10 +354,11 @@
end
@buffer.write(
value: message.value,
key: message.key,
+ headers: message.headers,
topic: message.topic,
partition: partition,
create_time: message.create_time,
)
rescue Kafka::Error => e
@@ -388,15 +391,16 @@
end
@buffer.each do |topic, partition, messages_for_partition|
messages_for_partition.each do |message|
messages << PendingMessage.new(
- message.value,
- message.key,
- topic,
- partition,
- nil,
- message.create_time
+ value: message.value,
+ key: message.key,
+ headers: message.headers,
+ topic: topic,
+ partition: partition,
+ partition_key: nil,
+ create_time: message.create_time
)
end
end
messages