Sha256: 4c3619b64da16a43a69d9d8bdd7cc8ea32fc0a318ab6b5f99137efe36bad342d

Contents?: true

Size: 1.16 KB

Versions: 2

Compression:

Stored size: 1.16 KB

Contents

# -*- encoding : utf-8 -*-
module Untied
  module Consumer
    class Processor
      attr_reader :observers

      def initialize
        @observers = \
          self.class.observers.collect { |o| o.to_s.camelize.constantize.instance }
      end

      def process(headers, message)
        begin
          message = JSON.parse(message)
        rescue JSON::ParserError => e
          Consumer.config.logger "Untied::Processor: Parsing error #{e}"
          return
        end

        message = message.fetch("event", {})
        payload = message.fetch("payload", {})
        service = message["origin"].try(:to_sym)
        event_name = message["name"].try(:to_sym)
        klass = payload.keys.first.try(:to_sym)

        Consumer.config.logger.info \
          "Untied::Processor: processing event #{event_name} from #{service} with " + \
          "payload #{payload}"

        observers.each do |observer|
          observer.notify(event_name, klass, service, payload)
        end
      end

      class << self
        def observers=(*obs)
          @observers = obs.flatten
        end

        def observers
          @observers ||= []
        end
      end

    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
untied-consumer-0.0.5 lib/untied-consumer/processor.rb
untied-consumer-0.0.4 lib/untied-consumer/processor.rb