Sha256: 433674610cfdaca1b77db3f75c8c0d85899ce02178c6d87d077c0c64adfa405b
Contents?: true
Size: 1.87 KB
Versions: 1
Compression:
Stored size: 1.87 KB
Contents
class Localjob class Worker TERMINATION_MESSAGE = "__TERMINATE__" attr_accessor :logger attr_reader :options, :queue def initialize(queue, logger: Logger.new(STDOUT), **options) @queue, @logger = queue, logger @queue = Localjob.new(@queue) if queue.kind_of?(Fixnum) @options = options @shutdown = false end def process(job) logger.info "Worker #{pid}: #{job.inspect}" job.perform end def pid Process.pid end def work(thread: false) logger.info "Worker #{pid} now listening!" trap_signals return work_thread if thread create_pid_file(@options[:pid_file]) loop { break unless shift_and_process } end def kill if @thread Thread.kill(@thread) @thread.join else shutdown end end private def work_thread @thread = Thread.new do begin loop do shift_and_process end # Respond to Thread.kill by sending termination message ensure shutdown work end end end def shutdown! logger.info "Worker #{pid} shutting down.." File.rm(@options[:pid_file]) if @options[:pid_file] return false if @thread exit! end def shutdown @queue << TERMINATION_MESSAGE end def shift_and_process job = queue.shift return shutdown! if job == TERMINATION_MESSAGE || !job process(job) return true # Explicit return of true, job#perform may return nil rescue Object logger.error "Worker #{pid} job failed: #{job}" logger.error "#{$!}\n#{$@.join("\n")}" end def trap_signals Signal.trap("QUIT") { shutdown } Signal.trap("INT") { shutdown } end def create_pid_file(path) File.open(path, 'w') { |f| f << self.pid } if path end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
localjob-0.4.0 | lib/localjob/worker.rb |