Sha256: e13c153fde21fb3f1abe0772a7539dc1d7c2f306bd042a0e815562c3a81b84da

Contents?: true

Size: 1.25 KB

Versions: 4

Compression:

Stored size: 1.25 KB

Contents

class Localjob
  class Worker
    attr_accessor :logger, :channel
    attr_reader :options

    def initialize(queues, logger: Logger.new(STDOUT), **options)
      @channel, @logger = Channel.new(queues), logger
      @options = options
      @shutdown = false
    end

    def process(job)
      logger.info "Worker #{pid}: #{job.inspect}"
      job.perform
    end

    def pid
      Process.pid
    end

    def work
      logger.info "Worker #{pid} now listening!"
      trap_signals
      create_pid_file(@options[:pid_file])
      deamonize if @options[:deamon]
      loop { shift_and_process }
    end

    private

    def shift_and_process
      exit! if @shutdown

      job = wait { @channel.shift }
      process job
    rescue Object => e
      logger.error "Worker #{pid} job failed: #{job}"
      logger.error "#{$!}\n#{$@.join("\n")}"
    end

    def trap_signals
      Signal.trap("QUIT") { graceful_shutdown }
    end

    def wait
      @waiting = true
      job = yield
      @waiting = false
      job
    end

    def deamonize
      Process.daemon(true, true)
    end

    def create_pid_file(path)
      File.open(path, 'w') { |f| f << self.pid } if path
    end

    def graceful_shutdown
      exit! if @waiting
      @shutdown = true
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
localjob-0.1.1 lib/localjob/worker.rb
localjob-0.1.0 lib/localjob/worker.rb
localjob-0.0.2 lib/localjob/worker.rb
localjob-0.0.1 lib/localjob/worker.rb