Sha256: 681cccd8424064e4c4b8de864e223cabb5ca37c6239f6beee6b2d3458660ea6f

Contents?: true

Size: 955 Bytes

Versions: 2

Compression:

Stored size: 955 Bytes

Contents

require 'eventmachine'
require 'rosetta_queue/consumer_managers/base'
require 'mq'

module RosettaQueue

  class EventedManager < BaseManager

    def start
      EM.run {
        trap_interruptions

        begin
          @consumers.each do |key, consumer|
            RosettaQueue.logger.info("Running consumer #{key} in event machine...")
            consumer.receive
          end
        rescue Exception => e
          RosettaQueue.logger.error("Exception thrown: #{$!}\n" + e.backtrace.join("\n\t"))
        end
      }
    end

    def stop
      RosettaQueue.logger.info("Shutting down event machine...")
      EM.stop
    end

    private

      def trap_interruptions
        trap("INT") {
          RosettaQueue.logger.warn("Interrupt received.  Shutting down...")
          EM.stop
        }

        trap("TERM") {
          RosettaQueue.logger.warn("Interrupt received.  Shutting down...")
          EM.stop
        }
      end

  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rosetta_queue-0.5.2 lib/rosetta_queue/consumer_managers/evented.rb
rosetta_queue-0.5.0 lib/rosetta_queue/consumer_managers/evented.rb