Sha256: b5d31b378be43f47f5e8193bd6899941b5948a871f98eb33b76eac4e2299ff15

Contents?: true

Size: 1.05 KB

Versions: 3

Compression:

Stored size: 1.05 KB

Contents

module RosettaQueue
  module MessageHandler

    module ClassMethods
      attr_reader :destination, :options_hash

      def options(options = {})
        @options_hash = options
      end

      def publishes_to(destination)
        @destination = destination
      end

      def subscribes_to(destination)
        @destination = destination
      end
    end

    def self.included(receiver)
      receiver.extend(ClassMethods)
    end

    attr_accessor :adapter_proxy

    def destination
      self.class.destination
    end

    def options_hash
      self.class.options_hash
    end

    def handle_message(unfiltered_message)
      ExceptionHandler::handle(:publishing,
        lambda {
          { :message => Filters.safe_process_receiving(unfiltered_message),
            :destination => destination,
            :action => :consuming,
            :options => options_hash
          }
        } ) do
        on_message(Filters.process_receiving(unfiltered_message))
      end
    end

    def ack
      adapter_proxy.ack unless adapter_proxy.nil?
    end

  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
rosetta_queue-0.5.2 lib/rosetta_queue/message_handler.rb
rosetta_queue-0.5.0 lib/rosetta_queue/message_handler.rb
rosetta_queue-0.4.0 lib/rosetta_queue/message_handler.rb