lib/phobos/producer.rb in phobos-1.4.1 vs lib/phobos/producer.rb in phobos-1.4.2

- old
+ new

@@ -71,10 +71,11 @@ def publish_list(messages) client = kafka_client || configure_kafka_client(Phobos.create_kafka_client) producer = client.producer(regular_configs) produce_messages(producer, messages) + producer.deliver_messages ensure producer&.shutdown end def create_async_producer @@ -92,10 +93,11 @@ end def async_publish_list(messages) producer = async_producer || create_async_producer produce_messages(producer, messages) + producer.deliver_messages unless async_automatic_delivery? end def async_producer_shutdown async_producer&.deliver_messages async_producer&.shutdown @@ -117,10 +119,14 @@ producer.produce(message[:payload], topic: message[:topic], key: message[:key], partition_key: message[:key] ) end - producer.deliver_messages + end + + def async_automatic_delivery? + async_configs.fetch(:delivery_threshold, 0) > 0 || + async_configs.fetch(:delivery_interval, 0) > 0 end def producer_store Thread.current[NAMESPACE] ||= {} end