Sha256: 4c3619b64da16a43a69d9d8bdd7cc8ea32fc0a318ab6b5f99137efe36bad342d
Contents?: true
Size: 1.16 KB
Versions: 2
Compression:
Stored size: 1.16 KB
Contents
# -*- encoding : utf-8 -*- module Untied module Consumer class Processor attr_reader :observers def initialize @observers = \ self.class.observers.collect { |o| o.to_s.camelize.constantize.instance } end def process(headers, message) begin message = JSON.parse(message) rescue JSON::ParserError => e Consumer.config.logger "Untied::Processor: Parsing error #{e}" return end message = message.fetch("event", {}) payload = message.fetch("payload", {}) service = message["origin"].try(:to_sym) event_name = message["name"].try(:to_sym) klass = payload.keys.first.try(:to_sym) Consumer.config.logger.info \ "Untied::Processor: processing event #{event_name} from #{service} with " + \ "payload #{payload}" observers.each do |observer| observer.notify(event_name, klass, service, payload) end end class << self def observers=(*obs) @observers = obs.flatten end def observers @observers ||= [] end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
untied-consumer-0.0.5 | lib/untied-consumer/processor.rb |
untied-consumer-0.0.4 | lib/untied-consumer/processor.rb |