lib/karafka/base_responder.rb in karafka-1.0.1 vs lib/karafka/base_responder.rb in karafka-1.1.0.alpha1
- old
+ new
@@ -145,16 +145,15 @@
messages_buffer.each do |topic, data_elements|
# We map this topic name, so it will match namespaced/etc topic in Kafka
# @note By default will not change topic (if default mapper used)
mapped_topic = Karafka::App.config.topic_mapper.outgoing(topic)
- data_elements.each do |(data, options)|
- ::WaterDrop::Message.new(
- mapped_topic,
+ data_elements.each do |data, options|
+ producer(options).call(
data,
- options
- ).send!
+ options.merge(topic: mapped_topic)
+ )
end
end
end
# Method that needs to be implemented in a subclass. It should handle responding
@@ -173,8 +172,14 @@
def respond_to(topic, data, options = {})
Karafka.monitor.notice(self.class, topic: topic, data: data, options: options)
messages_buffer[topic.to_s] ||= []
messages_buffer[topic.to_s] << [@parser_class.generate(data), options]
+ end
+
+ # @param options [Hash] options for waterdrop
+ # @return [Class] WaterDrop producer (sync or async based on the settings)
+ def producer(options)
+ options[:async] ? WaterDrop::AsyncProducer : WaterDrop::SyncProducer
end
end
end