lib/phobos/producer.rb in phobos-1.8.2.pre.beta1 vs lib/phobos/producer.rb in phobos-1.8.2.pre.beta2

- old
+ new

@@ -50,10 +50,11 @@ end class PublicAPI NAMESPACE = :phobos_producer_store ASYNC_PRODUCER_PARAMS = [:max_queue_size, :delivery_threshold, :delivery_interval].freeze + INTERNAL_PRODUCER_PARAMS = [:persistent_connections].freeze # This method configures the kafka client used with publish operations # performed by the host class # # @param kafka_client [Kafka::Client] @@ -65,22 +66,39 @@ def kafka_client producer_store[:kafka_client] end + def create_sync_producer + client = kafka_client || configure_kafka_client(Phobos.create_kafka_client) + sync_producer = client.producer(regular_configs) + if Phobos.config.producer_hash[:persistent_connections] + producer_store[:sync_producer] = sync_producer + end + sync_producer + end + + def sync_producer + producer_store[:sync_producer] + end + + def sync_producer_shutdown + sync_producer&.shutdown + producer_store[:sync_producer] = nil + end + def publish(topic, payload, key = nil, partition_key = nil) publish_list([{ topic: topic, payload: payload, key: key, partition_key: partition_key }]) end def publish_list(messages) - client = kafka_client || configure_kafka_client(Phobos.create_kafka_client) - producer = client.producer(regular_configs) + producer = sync_producer || create_sync_producer produce_messages(producer, messages) producer.deliver_messages ensure - producer&.shutdown + producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections] end def create_async_producer client = kafka_client || configure_kafka_client(Phobos.create_kafka_client) async_producer = client.async_producer(async_configs) @@ -107,14 +125,17 @@ async_producer&.shutdown producer_store[:async_producer] = nil end def regular_configs - Phobos.config.producer_hash.reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) } + Phobos.config.producer_hash + .reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) } + .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) } end def async_configs Phobos.config.producer_hash + .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) } end private def produce_messages(producer, messages)