lib/karafka/base_responder.rb in karafka-1.0.1 vs lib/karafka/base_responder.rb in karafka-1.1.0.alpha1

- old
+ new

@@ -145,16 +145,15 @@ messages_buffer.each do |topic, data_elements| # We map this topic name, so it will match namespaced/etc topic in Kafka # @note By default will not change topic (if default mapper used) mapped_topic = Karafka::App.config.topic_mapper.outgoing(topic) - data_elements.each do |(data, options)| - ::WaterDrop::Message.new( - mapped_topic, + data_elements.each do |data, options| + producer(options).call( data, - options - ).send! + options.merge(topic: mapped_topic) + ) end end end # Method that needs to be implemented in a subclass. It should handle responding @@ -173,8 +172,14 @@ def respond_to(topic, data, options = {}) Karafka.monitor.notice(self.class, topic: topic, data: data, options: options) messages_buffer[topic.to_s] ||= [] messages_buffer[topic.to_s] << [@parser_class.generate(data), options] + end + + # @param options [Hash] options for waterdrop + # @return [Class] WaterDrop producer (sync or async based on the settings) + def producer(options) + options[:async] ? WaterDrop::AsyncProducer : WaterDrop::SyncProducer end end end