lib/rdkafka/producer.rb in karafka-rdkafka-0.14.4 vs lib/rdkafka/producer.rb in karafka-rdkafka-0.14.5
- old
+ new
@@ -224,15 +224,16 @@
# @param key [String, nil] The message's key
# @param partition [Integer,nil] Optional partition to produce to
# @param partition_key [String, nil] Optional partition key based on which partition assignment can happen
# @param timestamp [Time,Integer,nil] Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970.
# @param headers [Hash<String,String>] Optional message headers
+ # @param label [Object, nil] a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report
#
# @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message
#
# @raise [RdkafkaError] When adding the message to rdkafka's queue failed
- def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil)
+ def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil, label: nil)
closed_producer_check(__method__)
# Start by checking and converting the input
# Get payload length
@@ -270,9 +271,10 @@
else
raise TypeError.new("Timestamp has to be nil, an Integer or a Time")
end
delivery_handle = DeliveryHandle.new
+ delivery_handle.label = label
delivery_handle[:pending] = true
delivery_handle[:response] = -1
delivery_handle[:partition] = -1
delivery_handle[:offset] = -1
DeliveryHandle.register(delivery_handle)