Sha256: b17617fd87194cfdc0fa4fc232238b08260c1fc6d96a2f990f539d8c39b680ff
Contents?: true
Size: 1.65 KB
Versions: 2
Compression:
Stored size: 1.65 KB
Contents
module Magent class Processor def initialize(channel, identity = "#{channel.name}-#{Socket.gethostname.split('.')[0]}") @channel = channel @shutdown = false @identity = identity @channel.on_start(identity) # @actor.class.actions.each do |action| # if !@actor.respond_to?(action) # raise ArgumentError, "action '#{action}' is not defined" # end # end end def run! processed_messages = 0 delay = 0 trap('TERM') { shutdown!; exit 0 } trap('SIGINT') { shutdown!; exit 0 } loop do break if @shutdown message = @channel.dequeue begin t = Time.now if message && @channel.process!(message) puts "Processed #{message.inspect}" @channel.on_job_processed(@channel.current_job, Time.now - t, @identity) delay = 0 processed_messages += 1 if processed_messages > 20 processed_messages = 0 GC.start end else delay += 0.1 if delay <= 5 end rescue SystemExit rescue Exception => e $stderr.puts "Error processing #{message.inspect} => #{e.message}" @channel.on_job_failed(@identity) @channel.failed(:error => e.message, :message => message, :backtrace => e.backtrace, :date => Time.now.utc) ensure end sleep (delay*100.0).to_i/100.0 end end def shutdown! @shutdown = true @channel.on_quit @channel.on_shutdown if @channel.respond_to?(:on_shutdown) $stderr.puts "Shutting down..." end end #Processor end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
magent-0.6.2 | lib/magent/processor.rb |
magent-0.6.1 | lib/magent/processor.rb |