lib/rdkafka/producer.rb in karafka-rdkafka-0.14.4 vs lib/rdkafka/producer.rb in karafka-rdkafka-0.14.5

- old
+ new

@@ -224,15 +224,16 @@ # @param key [String, nil] The message's key # @param partition [Integer,nil] Optional partition to produce to # @param partition_key [String, nil] Optional partition key based on which partition assignment can happen # @param timestamp [Time,Integer,nil] Optional timestamp of this message. Integer timestamp is in milliseconds since Jan 1 1970. # @param headers [Hash<String,String>] Optional message headers + # @param label [Object, nil] a label that can be assigned when producing a message that will be part of the delivery handle and the delivery report # # @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message # # @raise [RdkafkaError] When adding the message to rdkafka's queue failed - def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil) + def produce(topic:, payload: nil, key: nil, partition: nil, partition_key: nil, timestamp: nil, headers: nil, label: nil) closed_producer_check(__method__) # Start by checking and converting the input # Get payload length @@ -270,9 +271,10 @@ else raise TypeError.new("Timestamp has to be nil, an Integer or a Time") end delivery_handle = DeliveryHandle.new + delivery_handle.label = label delivery_handle[:pending] = true delivery_handle[:response] = -1 delivery_handle[:partition] = -1 delivery_handle[:offset] = -1 DeliveryHandle.register(delivery_handle)