lib/deimos/kafka_message.rb in deimos-ruby-1.0.0.pre.beta25 vs lib/deimos/kafka_message.rb in deimos-ruby-1.0.0.pre.beta26
- old
+ new
@@ -12,9 +12,32 @@
# @param mess [Object]
def message=(mess)
write_attribute(:message, mess ? mess.to_s : nil)
end
+ # @return [Deimos::Consumer]
+ def decoder
+ producer = Deimos::Producer.descendants.find { |c| c.topic == self.topic }
+ return nil unless producer
+
+ consumer = Class.new(Deimos::Consumer)
+ consumer.config.merge!(producer.config)
+ consumer
+ end
+
+ # Decode the message. This assumes for now that we have access to a producer
+ # in the codebase which can decode it.
+ # @param decoder [Deimos::Consumer]
+ # @return [Hash]
+ def decoded_message(decoder=self.decoder)
+ return { key: self.key, message: self.message } unless decoder
+
+ {
+ key: self.key.present? ? decoder.new.decode_key(self.key) : nil,
+ payload: decoder.decoder.decode(self.message)
+ }
+ end
+
# @return [Hash]
def phobos_message
{
payload: self.message,
partition_key: self.partition_key,