lib/deimos/utils/db_producer.rb in deimos-ruby-1.0.0.pre.beta22 vs lib/deimos/utils/db_producer.rb in deimos-ruby-1.0.0.pre.beta23
- old
+ new
@@ -60,9 +60,25 @@
end
@current_topic = topic
messages = retrieve_messages
while messages.any?
+ @logger.debug do
+ producer = Deimos::Producer.descendants.find { |c| c.topic == topic }
+ decoded_messages = if producer
+ consumer = Class.new(Deimos::Consumer)
+ consumer.config.merge!(producer.config)
+ messages.map do |message|
+ {
+ :key => message[:key].present? ? consumer.new.decode_key(message[:key]) : nil,
+ :message => consumer.decoder.decode(message[:payload])
+ }
+ end
+ else
+ messages
+ end
+ "DB producer: Topic #{topic} Producing messages: #{decoded_messages}"
+ end
produce_messages(messages.map(&:phobos_message))
messages.first.class.where(id: messages.map(&:id)).delete_all
break if messages.size < BATCH_SIZE
KafkaTopicInfo.heartbeat(@current_topic, @id) # keep alive