Sha256: e1c286a6d70e5f08143598453a0ed1ab7f920806ab6002b3e4db503ea159278e
Contents?: true
Size: 1.79 KB
Versions: 5
Compression:
Stored size: 1.79 KB
Contents
module Magent class Processor attr_reader :actor def initialize(actor) @actor = actor @shutdown = false @actor.class.actions.each do |action| if !@actor.respond_to?(action) raise ArgumentError, "action '#{action}' is not defined" end end end def run! processed_messages = 0 delay = 0 trap('TERM') { shutdown!; exit 0 } trap('SIGINT') { shutdown!; exit 0 } loop do break if @shutdown delay = 0 if @actor._run_tasks @method, @payload = @actor.class.channel.dequeue if @method.nil? delay += 0.1 if delay <= 5 else delay = 0 $stderr.puts "#{@actor.class}##{@method}(#{@payload.inspect})" begin if @actor.class.can_handle?(@method) processed_messages += 1 @actor.send(@method, @payload) if processed_messages > 20 processed_messages = 0 GC.start end else $stderr.puts "Unknown action: #{@method} (payload=#{@payload.inspect})" end rescue SystemExit rescue Exception => e $stderr.puts "Error while executing #{@method.inspect} #{@payload.inspect}" $stderr.puts "#{e.to_s}\n#{e.backtrace.join("\t\n")}" @actor.class.channel.failed(:message => e.message, :method => @method, :payload => @payload, :backtrace => e.backtrace, :date => Time.now.utc) ensure @method, @payload = nil end end sleep delay end end def shutdown! @shutdown = true $stderr.puts "Shutting down..." if @method @actor.class.channel.enqueue(@method, @payload) end end end #Processor end
Version data entries
5 entries across 5 versions & 2 rubygems