Sha256: 8db3422365a5910c569559d4d32631302698df62d88bedb74d2c1d9058460965

Contents?: true

Size: 1.35 KB

Versions: 3

Compression:

Stored size: 1.35 KB

Contents

module Magent
  class Processor
    def initialize(channel)
      @channel = channel
      @shutdown = false

#       @actor.class.actions.each do |action|
#         if !@actor.respond_to?(action)
#           raise ArgumentError, "action '#{action}' is not defined"
#         end
#       end
    end

    def run!
      processed_messages = 0
      delay = 0

      trap('TERM') { shutdown!; exit 0 }
      trap('SIGINT') { shutdown!; exit 0 }

      loop do
        break if @shutdown

        message = @channel.dequeue
        begin
          if message && @channel.process!(message)
            puts "Processed #{message.inspect}"
            delay = 0
            processed_messages += 1
            if processed_messages > 20
              processed_messages = 0
              GC.start
            end
          else
            delay += 0.1 if delay <= 5
          end
        rescue SystemExit
        rescue Exception => e
          $stderr.puts "Error processing #{message.inspect} => #{e.message}"
          @channel.failed(:error => e.message, :message => message, :backtrace => e.backtrace, :date => Time.now.utc)
        ensure
        end

        sleep (delay*100.0).to_i/100.0
      end
    end

    def shutdown!
      @shutdown = true
      @channel.on_shutdown if @channel.respond_to?(:on_shutdown)
      $stderr.puts "Shutting down..."
    end
  end #Processor
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
magent-0.6.0 lib/magent/processor.rb
magent-0.5.4 lib/magent/processor.rb
magent-0.5.3 lib/magent/processor.rb