lib/kafka/producer.rb in ruby-kafka-0.7.0.alpha4 vs lib/kafka/producer.rb in ruby-kafka-0.7.0.beta1

- old
+ new

@@ -170,25 +170,27 @@ # so can carry semantic value--whether you want to have the message key double # as a partition key is up to you. # # @param value [String] the message data. # @param key [String] the message key. + # @param headers [Hash<String, String>] the headers for the message. # @param topic [String] the topic that the message should be written to. # @param partition [Integer] the partition that the message should be written to. # @param partition_key [String] the key that should be used to assign a partition. # @param create_time [Time] the timestamp that should be set on the message. # # @raise [BufferOverflow] if the maximum buffer size has been reached. # @return [nil] - def produce(value, key: nil, topic:, partition: nil, partition_key: nil, create_time: Time.now) + def produce(value, key: nil, headers: {}, topic:, partition: nil, partition_key: nil, create_time: Time.now) message = PendingMessage.new( - value && value.to_s, - key && key.to_s, - topic.to_s, - partition && Integer(partition), - partition_key && partition_key.to_s, - create_time, + value: value && value.to_s, + key: key && key.to_s, + headers: headers, + topic: topic.to_s, + partition: partition && Integer(partition), + partition_key: partition_key && partition_key.to_s, + create_time: create_time ) if buffer_size >= @max_buffer_size buffer_overflow topic, "Cannot produce to #{topic}, max buffer size (#{@max_buffer_size} messages) reached" @@ -352,10 +354,11 @@ end @buffer.write( value: message.value, key: message.key, + headers: message.headers, topic: message.topic, partition: partition, create_time: message.create_time, ) rescue Kafka::Error => e @@ -388,15 +391,16 @@ end @buffer.each do |topic, partition, messages_for_partition| messages_for_partition.each do |message| messages << PendingMessage.new( - message.value, - message.key, - topic, - partition, - nil, - message.create_time + value: message.value, + key: message.key, + headers: message.headers, + topic: topic, + partition: partition, + partition_key: nil, + create_time: message.create_time ) end end messages