lib/phobos/producer.rb in phobos-1.8.3.pre.beta2 vs lib/phobos/producer.rb in phobos-1.9.0.pre.beta1
- old
+ new
@@ -9,20 +9,32 @@
def producer
Phobos::Producer::PublicAPI.new(self)
end
class PublicAPI
+ MissingRequiredArgumentsError = Class.new(StandardError) do
+ def initialize
+ super('You need to provide a topic name and a payload')
+ end
+ end
+
def initialize(host_obj)
@host_obj = host_obj
end
- def publish(topic, payload, key = nil, partition_key = nil, headers = nil)
- class_producer.publish(topic, payload, key, partition_key, headers)
+ def publish(*args, **kwargs)
+ Phobos.deprecate(deprecate_positional_args_message('publish')) if kwargs.empty?
+
+ args = normalize_arguments(*args, **kwargs)
+ class_producer.publish(**args)
end
- def async_publish(topic, payload, key = nil, partition_key = nil, headers = nil)
- class_producer.async_publish(topic, payload, key, partition_key, headers)
+ def async_publish(*args, **kwargs)
+ Phobos.deprecate(deprecate_positional_args_message('async_publish')) if kwargs.empty?
+
+ args = normalize_arguments(*args, **kwargs)
+ class_producer.async_publish(**args)
end
# @param messages [Array(Hash(:topic, :payload, :key, :headers))]
# e.g.: [
# { topic: 'A', payload: 'message-1', key: '1', headers: { foo: 'bar' } },
@@ -40,10 +52,40 @@
private
def class_producer
@host_obj.class.producer
end
+
+ # rubocop:disable Metrics/ParameterLists
+ def normalize_arguments(p_topic = nil, p_payload = nil, p_key = nil,
+ p_partition_key = nil, p_headers = {},
+ **kwargs)
+ {}.tap do |args|
+ {
+ topic: p_topic,
+ payload: p_payload,
+ key: p_key,
+ partition_key: p_partition_key,
+ headers: p_headers
+ }.each { |k, v| args[k] = kwargs[k] || v }
+
+ raise MissingRequiredArgumentsError if [:topic, :payload].any? { |k| args[k].nil? }
+
+ kwargs.each do |k, v|
+ next if args.key?(k)
+
+ args[:headers][k] = v
+ end
+ end
+ end
+ # rubocop:enable Metrics/ParameterLists
+
+ def deprecate_positional_args_message(method_name)
+ "The `#{method_name}` method should now receive keyword arguments " \
+ 'rather than positional ones. Please update your publishers. This will ' \
+ 'not be backwards compatible in the future.'
+ end
end
module ClassMethods
def producer
Phobos::Producer::ClassMethods::PublicAPI.new
@@ -84,11 +126,11 @@
def sync_producer_shutdown
sync_producer&.shutdown
producer_store[:sync_producer] = nil
end
- def publish(topic, payload, key = nil, partition_key = nil, headers = nil)
+ def publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
publish_list([{ topic: topic, payload: payload, key: key,
partition_key: partition_key, headers: headers }])
end
def publish_list(messages)
@@ -107,10 +149,10 @@
def async_producer
producer_store[:async_producer]
end
- def async_publish(topic, payload, key = nil, partition_key = nil, headers = nil)
+ def async_publish(topic:, payload:, key: nil, partition_key: nil, headers: nil)
async_publish_list([{ topic: topic, payload: payload, key: key,
partition_key: partition_key, headers: headers }])
end
def async_publish_list(messages)