module Rdkafka # A producer for Kafka messages. To create a producer set up a {Config} and call {Config#producer producer} on that. class Producer # @private def initialize(native_kafka) @closing = false @native_kafka = native_kafka # Start thread to poll client for delivery callbacks @polling_thread = Thread.new do loop do Rdkafka::Bindings.rd_kafka_poll(@native_kafka, 250) # Exit thread if closing and the poll queue is empty if @closing && Rdkafka::Bindings.rd_kafka_outq_len(@native_kafka) == 0 break end end end @polling_thread.abort_on_exception = true end # Close this producer and wait for the internal poll queue to empty. def close # Indicate to polling thread that we're closing @closing = true # Wait for the polling thread to finish up @polling_thread.join end # Produces a message to a Kafka topic. The message is added to rdkafka's queue, call {DeliveryHandle#wait wait} on the returned delivery handle to make sure it is delivered. # # When no partition is specified the underlying Kafka library picks a partition based on the key. If no key is specified, a random partition will be used. # When a timestamp is provided this is used instead of the autogenerated timestamp. # # @param topic [String] The topic to produce to # @param payload [String] The message's payload # @param key [String] The message's key # @param partition [Integer] Optional partition to produce to # @param timestamp [Integer] Optional timestamp of this message # # @raise [RdkafkaError] When adding the message to rdkafka's queue failed # # @return [DeliveryHandle] Delivery handle that can be used to wait for the result of producing this message def produce(topic:, payload: nil, key: nil, partition: nil, timestamp: nil) # Start by checking and converting the input # Get payload length payload_size = if payload.nil? 0 else payload.bytesize end # Get key length key_size = if key.nil? 0 else key.bytesize end # If partition is nil use -1 to let Kafka set the partition based # on the key/randomly if there is no key partition = -1 if partition.nil? # If timestamp is nil use 0 and let Kafka set one timestamp = 0 if timestamp.nil? delivery_handle = DeliveryHandle.new delivery_handle[:pending] = true delivery_handle[:response] = -1 delivery_handle[:partition] = -1 delivery_handle[:offset] = -1 # Produce the message response = Rdkafka::Bindings.rd_kafka_producev( @native_kafka, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TOPIC, :string, topic, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_MSGFLAGS, :int, Rdkafka::Bindings::RD_KAFKA_MSG_F_COPY, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_VALUE, :buffer_in, payload, :size_t, payload_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_KEY, :buffer_in, key, :size_t, key_size, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_PARTITION, :int32, partition, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_TIMESTAMP, :int64, timestamp, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_OPAQUE, :pointer, delivery_handle, :int, Rdkafka::Bindings::RD_KAFKA_VTYPE_END ) # Raise error if the produce call was not successfull if response != 0 raise RdkafkaError.new(response) end delivery_handle end end end