lib/phobos/producer.rb in phobos-1.8.3.pre.beta1 vs lib/phobos/producer.rb in phobos-1.8.3.pre.beta2

- old
+ new

@@ -13,22 +13,22 @@ class PublicAPI def initialize(host_obj) @host_obj = host_obj end - def publish(topic, payload, key = nil, partition_key = nil) - class_producer.publish(topic, payload, key, partition_key) + def publish(topic, payload, key = nil, partition_key = nil, headers = nil) + class_producer.publish(topic, payload, key, partition_key, headers) end - def async_publish(topic, payload, key = nil, partition_key = nil) - class_producer.async_publish(topic, payload, key, partition_key) + def async_publish(topic, payload, key = nil, partition_key = nil, headers = nil) + class_producer.async_publish(topic, payload, key, partition_key, headers) end - # @param messages [Array(Hash(:topic, :payload, :key))] + # @param messages [Array(Hash(:topic, :payload, :key, :headers))] # e.g.: [ - # { topic: 'A', payload: 'message-1', key: '1' }, - # { topic: 'B', payload: 'message-2', key: '2' } + # { topic: 'A', payload: 'message-1', key: '1', headers: { foo: 'bar' } }, + # { topic: 'B', payload: 'message-2', key: '2', headers: { foo: 'bar' } } # ] # def publish_list(messages) class_producer.publish_list(messages) end @@ -84,13 +84,13 @@ def sync_producer_shutdown sync_producer&.shutdown producer_store[:sync_producer] = nil end - def publish(topic, payload, key = nil, partition_key = nil) + def publish(topic, payload, key = nil, partition_key = nil, headers = nil) publish_list([{ topic: topic, payload: payload, key: key, - partition_key: partition_key }]) + partition_key: partition_key, headers: headers }]) end def publish_list(messages) producer = sync_producer || create_sync_producer produce_messages(producer, messages) @@ -107,13 +107,13 @@ def async_producer producer_store[:async_producer] end - def async_publish(topic, payload, key = nil, partition_key = nil) + def async_publish(topic, payload, key = nil, partition_key = nil, headers = nil) async_publish_list([{ topic: topic, payload: payload, key: key, - partition_key: partition_key }]) + partition_key: partition_key, headers: headers }]) end def async_publish_list(messages) producer = async_producer || create_async_producer produce_messages(producer, messages) @@ -142,9 +142,10 @@ def produce_messages(producer, messages) messages.each do |message| partition_key = message[:partition_key] || message[:key] producer.produce(message[:payload], topic: message[:topic], key: message[:key], + headers: message[:headers], partition_key: partition_key) end end def async_automatic_delivery?