Sha256: b17617fd87194cfdc0fa4fc232238b08260c1fc6d96a2f990f539d8c39b680ff

Contents?: true

Size: 1.65 KB

Versions: 2

Compression:

Stored size: 1.65 KB

Contents

module Magent
  class Processor
    def initialize(channel, identity = "#{channel.name}-#{Socket.gethostname.split('.')[0]}")
      @channel = channel
      @shutdown = false
      @identity = identity

      @channel.on_start(identity)

#       @actor.class.actions.each do |action|
#         if !@actor.respond_to?(action)
#           raise ArgumentError, "action '#{action}' is not defined"
#         end
#       end
    end

    def run!
      processed_messages = 0
      delay = 0

      trap('TERM') { shutdown!; exit 0 }
      trap('SIGINT') { shutdown!; exit 0 }

      loop do
        break if @shutdown

        message = @channel.dequeue
        begin
          t = Time.now
          if message && @channel.process!(message)
            puts "Processed #{message.inspect}"

            @channel.on_job_processed(@channel.current_job, Time.now - t, @identity)

            delay = 0
            processed_messages += 1
            if processed_messages > 20
              processed_messages = 0
              GC.start
            end
          else
            delay += 0.1 if delay <= 5
          end
        rescue SystemExit
        rescue Exception => e
          $stderr.puts "Error processing #{message.inspect} => #{e.message}"
          @channel.on_job_failed(@identity)
          @channel.failed(:error => e.message, :message => message, :backtrace => e.backtrace, :date => Time.now.utc)
        ensure
        end

        sleep (delay*100.0).to_i/100.0
      end
    end

    def shutdown!
      @shutdown = true
      @channel.on_quit

      @channel.on_shutdown if @channel.respond_to?(:on_shutdown)
      $stderr.puts "Shutting down..."
    end
  end #Processor
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
magent-0.6.2 lib/magent/processor.rb
magent-0.6.1 lib/magent/processor.rb