lib/phobos/producer.rb in phobos-1.0.0 vs lib/phobos/producer.rb in phobos-1.1.0

- old
+ new

@@ -47,10 +47,11 @@ Phobos::Producer::ClassMethods::PublicAPI.new end class PublicAPI NAMESPACE = :phobos_producer_store + ASYNC_PRODUCER_PARAMS = %i(max_queue_size delivery_threshold delivery_interval).freeze # This method configures the kafka client used with publish operations # performed by the host class # # @param kafka_client [Kafka::Client] @@ -68,19 +69,19 @@ publish_list([{ topic: topic, payload: payload, key: key }]) end def publish_list(messages) client = kafka_client || configure_kafka_client(Phobos.create_kafka_client) - producer = client.producer(Phobos.config.producer_hash) + producer = client.producer(regular_configs) produce_messages(producer, messages) ensure producer&.shutdown end def create_async_producer client = kafka_client || configure_kafka_client(Phobos.create_kafka_client) - async_producer = client.async_producer(Phobos.config.producer_hash) + async_producer = client.async_producer(async_configs) producer_store[:async_producer] = async_producer end def async_producer producer_store[:async_producer] @@ -97,9 +98,17 @@ def async_producer_shutdown async_producer&.deliver_messages async_producer&.shutdown producer_store[:async_producer] = nil + end + + def regular_configs + Phobos.config.producer_hash.reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k)} + end + + def async_configs + Phobos.config.producer_hash end private def produce_messages(producer, messages)