lib/phobos/producer.rb in phobos-1.4.1 vs lib/phobos/producer.rb in phobos-1.4.2
- old
+ new
@@ -71,10 +71,11 @@
def publish_list(messages)
client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
producer = client.producer(regular_configs)
produce_messages(producer, messages)
+ producer.deliver_messages
ensure
producer&.shutdown
end
def create_async_producer
@@ -92,10 +93,11 @@
end
def async_publish_list(messages)
producer = async_producer || create_async_producer
produce_messages(producer, messages)
+ producer.deliver_messages unless async_automatic_delivery?
end
def async_producer_shutdown
async_producer&.deliver_messages
async_producer&.shutdown
@@ -117,10 +119,14 @@
producer.produce(message[:payload], topic: message[:topic],
key: message[:key],
partition_key: message[:key]
)
end
- producer.deliver_messages
+ end
+
+ def async_automatic_delivery?
+ async_configs.fetch(:delivery_threshold, 0) > 0 ||
+ async_configs.fetch(:delivery_interval, 0) > 0
end
def producer_store
Thread.current[NAMESPACE] ||= {}
end