Sha256: fcda1a35126e1d4ad77dfdd24d157459c068c5270af5b798001b59379ba18ac3

Contents?: true

Size: 1.75 KB

Versions: 50

Compression:

Stored size: 1.75 KB

Contents

module Qu
  class Worker
    include Logger

    attr_accessor :queues

    class Abort < Exception
    end

    def initialize(*queues)
      @queues = queues.flatten
      self.attributes = @queues.pop if @queues.last.is_a?(Hash)
      @queues << 'default' if @queues.empty?
    end

    def attributes=(attrs)
      attrs.each do |attr, value|
        self.instance_variable_set("@#{attr}", value)
      end
    end

    def attributes
      {'hostname' => hostname, 'pid' => pid, 'queues' => queues}
    end

    def handle_signals
      logger.debug "Worker #{id} registering traps for INT and TERM signals"
      %W(INT TERM).each do |sig|
        trap(sig) do
          logger.info "Worker #{id} received #{sig}, shutting down"
          raise Abort
        end
      end
    end

    def work_off
      logger.debug "Worker #{id} working of all jobs"
      while job = Qu.reserve(self, :block => false)
        logger.debug "Worker #{id} reserved job #{job}"
        job.perform
        logger.debug "Worker #{id} completed job #{job}"
      end
    end

    def work
      logger.debug "Worker #{id} waiting for next job"
      job = Qu.reserve(self)
      logger.debug "Worker #{id} reserved job #{job}"
      job.perform
      logger.debug "Worker #{id} completed job #{job}"
    end

    def start
      logger.warn "Worker #{id} starting"
      handle_signals
      Qu.backend.register_worker(self)
      loop { work }
    rescue Abort => e
      # Ok, we'll shut down, but give us a sec
    ensure
      Qu.backend.unregister_worker(self)
      logger.debug "Worker #{id} done"
    end

    def id
      @id ||= "#{hostname}:#{pid}:#{queues.join(',')}"
    end

    def pid
      @pid ||= Process.pid
    end

    def hostname
      @hostname ||= `hostname`.strip
    end
  end
end

Version data entries

50 entries across 50 versions & 2 rubygems

Version Path
classiccms-0.7.5 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.7.4 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.7.3 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.7.2 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.7.1 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.7.0 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.6.9 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.6.8 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.6.7 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.6.6 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.6.5 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.6.4 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.6.3 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.6.2 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.6.1 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.6.0 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.5.17 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.5.16 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.5.15 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb
classiccms-0.5.14 vendor/bundle/gems/qu-0.1.4/lib/qu/worker.rb