lib/phobos/producer.rb in phobos-1.8.3.pre.beta1 vs lib/phobos/producer.rb in phobos-1.8.3.pre.beta2
- old
+ new
@@ -13,22 +13,22 @@
class PublicAPI
def initialize(host_obj)
@host_obj = host_obj
end
- def publish(topic, payload, key = nil, partition_key = nil)
- class_producer.publish(topic, payload, key, partition_key)
+ def publish(topic, payload, key = nil, partition_key = nil, headers = nil)
+ class_producer.publish(topic, payload, key, partition_key, headers)
end
- def async_publish(topic, payload, key = nil, partition_key = nil)
- class_producer.async_publish(topic, payload, key, partition_key)
+ def async_publish(topic, payload, key = nil, partition_key = nil, headers = nil)
+ class_producer.async_publish(topic, payload, key, partition_key, headers)
end
- # @param messages [Array(Hash(:topic, :payload, :key))]
+ # @param messages [Array(Hash(:topic, :payload, :key, :headers))]
# e.g.: [
- # { topic: 'A', payload: 'message-1', key: '1' },
- # { topic: 'B', payload: 'message-2', key: '2' }
+ # { topic: 'A', payload: 'message-1', key: '1', headers: { foo: 'bar' } },
+ # { topic: 'B', payload: 'message-2', key: '2', headers: { foo: 'bar' } }
# ]
#
def publish_list(messages)
class_producer.publish_list(messages)
end
@@ -84,13 +84,13 @@
def sync_producer_shutdown
sync_producer&.shutdown
producer_store[:sync_producer] = nil
end
- def publish(topic, payload, key = nil, partition_key = nil)
+ def publish(topic, payload, key = nil, partition_key = nil, headers = nil)
publish_list([{ topic: topic, payload: payload, key: key,
- partition_key: partition_key }])
+ partition_key: partition_key, headers: headers }])
end
def publish_list(messages)
producer = sync_producer || create_sync_producer
produce_messages(producer, messages)
@@ -107,13 +107,13 @@
def async_producer
producer_store[:async_producer]
end
- def async_publish(topic, payload, key = nil, partition_key = nil)
+ def async_publish(topic, payload, key = nil, partition_key = nil, headers = nil)
async_publish_list([{ topic: topic, payload: payload, key: key,
- partition_key: partition_key }])
+ partition_key: partition_key, headers: headers }])
end
def async_publish_list(messages)
producer = async_producer || create_async_producer
produce_messages(producer, messages)
@@ -142,9 +142,10 @@
def produce_messages(producer, messages)
messages.each do |message|
partition_key = message[:partition_key] || message[:key]
producer.produce(message[:payload], topic: message[:topic],
key: message[:key],
+ headers: message[:headers],
partition_key: partition_key)
end
end
def async_automatic_delivery?