lib/deimos/kafka_message.rb in deimos-ruby-1.0.0.pre.beta25 vs lib/deimos/kafka_message.rb in deimos-ruby-1.0.0.pre.beta26

- old
+ new

@@ -12,9 +12,32 @@ # @param mess [Object] def message=(mess) write_attribute(:message, mess ? mess.to_s : nil) end + # @return [Deimos::Consumer] + def decoder + producer = Deimos::Producer.descendants.find { |c| c.topic == self.topic } + return nil unless producer + + consumer = Class.new(Deimos::Consumer) + consumer.config.merge!(producer.config) + consumer + end + + # Decode the message. This assumes for now that we have access to a producer + # in the codebase which can decode it. + # @param decoder [Deimos::Consumer] + # @return [Hash] + def decoded_message(decoder=self.decoder) + return { key: self.key, message: self.message } unless decoder + + { + key: self.key.present? ? decoder.new.decode_key(self.key) : nil, + payload: decoder.decoder.decode(self.message) + } + end + # @return [Hash] def phobos_message { payload: self.message, partition_key: self.partition_key,