lib/phobos/producer.rb in phobos-1.8.2.pre.beta1 vs lib/phobos/producer.rb in phobos-1.8.2.pre.beta2
- old
+ new
@@ -50,10 +50,11 @@
end
class PublicAPI
NAMESPACE = :phobos_producer_store
ASYNC_PRODUCER_PARAMS = [:max_queue_size, :delivery_threshold, :delivery_interval].freeze
+ INTERNAL_PRODUCER_PARAMS = [:persistent_connections].freeze
# This method configures the kafka client used with publish operations
# performed by the host class
#
# @param kafka_client [Kafka::Client]
@@ -65,22 +66,39 @@
def kafka_client
producer_store[:kafka_client]
end
+ def create_sync_producer
+ client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
+ sync_producer = client.producer(regular_configs)
+ if Phobos.config.producer_hash[:persistent_connections]
+ producer_store[:sync_producer] = sync_producer
+ end
+ sync_producer
+ end
+
+ def sync_producer
+ producer_store[:sync_producer]
+ end
+
+ def sync_producer_shutdown
+ sync_producer&.shutdown
+ producer_store[:sync_producer] = nil
+ end
+
def publish(topic, payload, key = nil, partition_key = nil)
publish_list([{ topic: topic, payload: payload, key: key,
partition_key: partition_key }])
end
def publish_list(messages)
- client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
- producer = client.producer(regular_configs)
+ producer = sync_producer || create_sync_producer
produce_messages(producer, messages)
producer.deliver_messages
ensure
- producer&.shutdown
+ producer&.shutdown unless Phobos.config.producer_hash[:persistent_connections]
end
def create_async_producer
client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
async_producer = client.async_producer(async_configs)
@@ -107,14 +125,17 @@
async_producer&.shutdown
producer_store[:async_producer] = nil
end
def regular_configs
- Phobos.config.producer_hash.reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) }
+ Phobos.config.producer_hash
+ .reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) }
+ .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end
def async_configs
Phobos.config.producer_hash
+ .reject { |k, _| INTERNAL_PRODUCER_PARAMS.include?(k) }
end
private
def produce_messages(producer, messages)