Sha256: e13c153fde21fb3f1abe0772a7539dc1d7c2f306bd042a0e815562c3a81b84da
Contents?: true
Size: 1.25 KB
Versions: 4
Compression:
Stored size: 1.25 KB
Contents
class Localjob class Worker attr_accessor :logger, :channel attr_reader :options def initialize(queues, logger: Logger.new(STDOUT), **options) @channel, @logger = Channel.new(queues), logger @options = options @shutdown = false end def process(job) logger.info "Worker #{pid}: #{job.inspect}" job.perform end def pid Process.pid end def work logger.info "Worker #{pid} now listening!" trap_signals create_pid_file(@options[:pid_file]) deamonize if @options[:deamon] loop { shift_and_process } end private def shift_and_process exit! if @shutdown job = wait { @channel.shift } process job rescue Object => e logger.error "Worker #{pid} job failed: #{job}" logger.error "#{$!}\n#{$@.join("\n")}" end def trap_signals Signal.trap("QUIT") { graceful_shutdown } end def wait @waiting = true job = yield @waiting = false job end def deamonize Process.daemon(true, true) end def create_pid_file(path) File.open(path, 'w') { |f| f << self.pid } if path end def graceful_shutdown exit! if @waiting @shutdown = true end end end
Version data entries
4 entries across 4 versions & 1 rubygems
Version | Path |
---|---|
localjob-0.1.1 | lib/localjob/worker.rb |
localjob-0.1.0 | lib/localjob/worker.rb |
localjob-0.0.2 | lib/localjob/worker.rb |
localjob-0.0.1 | lib/localjob/worker.rb |