lib/phobos/producer.rb in phobos-1.0.0 vs lib/phobos/producer.rb in phobos-1.1.0
- old
+ new
@@ -47,10 +47,11 @@
Phobos::Producer::ClassMethods::PublicAPI.new
end
class PublicAPI
NAMESPACE = :phobos_producer_store
+ ASYNC_PRODUCER_PARAMS = %i(max_queue_size delivery_threshold delivery_interval).freeze
# This method configures the kafka client used with publish operations
# performed by the host class
#
# @param kafka_client [Kafka::Client]
@@ -68,19 +69,19 @@
publish_list([{ topic: topic, payload: payload, key: key }])
end
def publish_list(messages)
client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
- producer = client.producer(Phobos.config.producer_hash)
+ producer = client.producer(regular_configs)
produce_messages(producer, messages)
ensure
producer&.shutdown
end
def create_async_producer
client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
- async_producer = client.async_producer(Phobos.config.producer_hash)
+ async_producer = client.async_producer(async_configs)
producer_store[:async_producer] = async_producer
end
def async_producer
producer_store[:async_producer]
@@ -97,9 +98,17 @@
def async_producer_shutdown
async_producer&.deliver_messages
async_producer&.shutdown
producer_store[:async_producer] = nil
+ end
+
+ def regular_configs
+ Phobos.config.producer_hash.reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k)}
+ end
+
+ def async_configs
+ Phobos.config.producer_hash
end
private
def produce_messages(producer, messages)