Sha256: 9205a593a5944ce8fffbd6d3e1d088a424526c7b13f2272c4effdc2fc8eb5a16

Contents?: true

Size: 1.79 KB

Versions: 4

Compression:

Stored size: 1.79 KB

Contents

module Magent
  class Processor
    attr_reader :actor

    def initialize(actor)
      @actor = actor
      @shutdown = false

      @actor.class.actions.each do |action|
        if !@actor.respond_to?(action)
          raise ArgumentError, "action '#{action}' is not defined"
        end
      end
    end

    def run!
      processed_messages = 0
      delay = 0

      trap('TERM') { shutdown!; exit 0 }
      trap('SIGINT') { shutdown!; exit 0 }

      loop do
        break if @shutdown

        delay = 0 if @actor._run_tasks

        @method, @payload = @actor.class.channel.dequeue

        if @method.nil?
          delay += 0.1 if delay <= 5
        else
          delay = 0
          $stderr.puts "#{@actor.class}##{@method}(#{@payload.inspect})"
          begin
            if @actor.class.actions.include?(@method)
              processed_messages += 1
              @actor.send(@method, @payload)

              if processed_messages > 20
                processed_messages = 0
                GC.start
              end
            else
              $stderr.puts "Unknown action: #{@method} (payload=#{@payload.inspect})"
            end
          rescue SystemExit
          rescue Exception => e
            $stderr.puts "Error while executing #{@method.inspect} #{@payload.inspect}"
            $stderr.puts "#{e.to_s}\n#{e.backtrace.join("\t\n")}"
            @actor.class.channel.failed(:message => e.message, :method => @method, :payload => @payload, :backtrace => e.backtrace, :date => Time.now.utc)
          ensure
            @method, @payload = nil
          end
        end
        sleep delay
      end
    end

    def shutdown!
      @shutdown = true
      $stderr.puts "Shutting down..."
      if @method
        @actor.class.channel.enqueue(@method, @payload)
      end
    end
  end #Processor
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
magent-0.1.3 lib/magent/processor.rb
magent-0.1.2 lib/magent/processor.rb
magent-0.1.1 lib/magent/processor.rb
magent-0.1.0 lib/magent/processor.rb