lib/phobos/producer.rb in phobos-1.8.0 vs lib/phobos/producer.rb in phobos-1.8.1
- old
+ new
@@ -1,5 +1,7 @@
+# frozen_string_literal: true
+
module Phobos
module Producer
def self.included(base)
base.extend(Phobos::Producer::ClassMethods)
end
@@ -11,16 +13,16 @@
class PublicAPI
def initialize(host_obj)
@host_obj = host_obj
end
- def publish(topic, payload, key = nil)
- class_producer.publish(topic, payload, key)
+ def publish(topic, payload, key = nil, partition_key = nil)
+ class_producer.publish(topic, payload, key, partition_key)
end
- def async_publish(topic, payload, key = nil)
- class_producer.async_publish(topic, payload, key)
+ def async_publish(topic, payload, key = nil, partition_key = nil)
+ class_producer.async_publish(topic, payload, key, partition_key)
end
# @param messages [Array(Hash(:topic, :payload, :key))]
# e.g.: [
# { topic: 'A', payload: 'message-1', key: '1' },
@@ -47,11 +49,11 @@
Phobos::Producer::ClassMethods::PublicAPI.new
end
class PublicAPI
NAMESPACE = :phobos_producer_store
- ASYNC_PRODUCER_PARAMS = %i(max_queue_size delivery_threshold delivery_interval).freeze
+ ASYNC_PRODUCER_PARAMS = [:max_queue_size, :delivery_threshold, :delivery_interval].freeze
# This method configures the kafka client used with publish operations
# performed by the host class
#
# @param kafka_client [Kafka::Client]
@@ -63,12 +65,13 @@
def kafka_client
producer_store[:kafka_client]
end
- def publish(topic, payload, key = nil)
- publish_list([{ topic: topic, payload: payload, key: key }])
+ def publish(topic, payload, key = nil, partition_key = nil)
+ publish_list([{ topic: topic, payload: payload, key: key,
+ partition_key: partition_key }])
end
def publish_list(messages)
client = kafka_client || configure_kafka_client(Phobos.create_kafka_client)
producer = client.producer(regular_configs)
@@ -86,12 +89,13 @@
def async_producer
producer_store[:async_producer]
end
- def async_publish(topic, payload, key = nil)
- async_publish_list([{ topic: topic, payload: payload, key: key }])
+ def async_publish(topic, payload, key = nil, partition_key = nil)
+ async_publish_list([{ topic: topic, payload: payload, key: key,
+ partition_key: partition_key }])
end
def async_publish_list(messages)
producer = async_producer || create_async_producer
produce_messages(producer, messages)
@@ -103,30 +107,30 @@
async_producer&.shutdown
producer_store[:async_producer] = nil
end
def regular_configs
- Phobos.config.producer_hash.reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k)}
+ Phobos.config.producer_hash.reject { |k, _| ASYNC_PRODUCER_PARAMS.include?(k) }
end
def async_configs
Phobos.config.producer_hash
end
private
def produce_messages(producer, messages)
messages.each do |message|
+ partition_key = message[:partition_key] || message[:key]
producer.produce(message[:payload], topic: message[:topic],
key: message[:key],
- partition_key: message[:key]
- )
+ partition_key: partition_key)
end
end
def async_automatic_delivery?
- async_configs.fetch(:delivery_threshold, 0) > 0 ||
- async_configs.fetch(:delivery_interval, 0) > 0
+ async_configs.fetch(:delivery_threshold, 0).positive? ||
+ async_configs.fetch(:delivery_interval, 0).positive?
end
def producer_store
Thread.current[NAMESPACE] ||= {}
end