lib/deimos/utils/db_producer.rb in deimos-ruby-1.0.0.pre.beta22 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.0.0.pre.beta23

- old
+ new

@@ -60,9 +60,25 @@ end @current_topic = topic messages = retrieve_messages while messages.any? + @logger.debug do + producer = Deimos::Producer.descendants.find { |c| c.topic == topic } + decoded_messages = if producer + consumer = Class.new(Deimos::Consumer) + consumer.config.merge!(producer.config) + messages.map do |message| + { + :key => message[:key].present? ? consumer.new.decode_key(message[:key]) : nil, + :message => consumer.decoder.decode(message[:payload]) + } + end + else + messages + end + "DB producer: Topic #{topic} Producing messages: #{decoded_messages}" + end produce_messages(messages.map(&:phobos_message)) messages.first.class.where(id: messages.map(&:id)).delete_all break if messages.size < BATCH_SIZE KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive