Parent

Resque::Worker

Public Class Methods

start(ips, queues) click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 207
def self.start(ips, queues)
  if Rails.env =~ /development|test/
    Thread.new(queues) { |queue| system("rake RAILS_ENV=#{Rails.env} QUEUE=#{queue} resque:work") }
  else
    Thread.new(queues, ips) { |queue, ip_list| system("cd #{Rails.root}; #{ResqueUi::Cap.path} #{Rails.env} resque:work host=#{ip_list} queue=#{queue}") }
  end
end
working() click to toggle source

logic for mappged_mget changed where it returns keys with nil values in latest redis gem.

# File lib/resque_ui/overrides/resque/worker.rb, line 181
def self.working
  names = all
  return [] unless names.any?
  names.map! { |name| "worker:#{name}" }
  redis.mapped_mget(*names).map do |key, value|
    find key.sub("worker:", '') unless value.nil?
  end.compact
end

Public Instance Methods

all_workers_in_pid_working() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 99
def all_workers_in_pid_working
  workers_in_pid.select { |w| (hash = w.processing) && !hash.empty? }
end
id() click to toggle source
Alias for: to_s
ip() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 41
def ip
  to_s.split(':').first[/\b(?:\d{1,3}\.){3}\d{1,3}\b/]
end
local_ip() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 6
def local_ip
  orig, Socket.do_not_reverse_lookup = Socket.do_not_reverse_lookup, true # turn off reverse DNS resolution temporarily

  UDPSocket.open do |s|
    s.connect '64.233.187.99', 1
    s.addr.last
  end
ensure
  Socket.do_not_reverse_lookup = orig
end
pid() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 25
def pid
  to_s.split(':').second
end
prune_dead_workers() click to toggle source

Looks for any workers which should be running on this server and, if they’re not, removes them from Redis.

This is a form of garbage collection. If a server is killed by a hard shutdown, power failure, or something else beyond our control, the Resque workers will not die gracefully and therefor will leave stale state information in Redis.

By checking the current Redis state against the actual environment, we can determine if Redis is old and clean it up a bit.

# File lib/resque_ui/overrides/resque/worker.rb, line 89
def prune_dead_workers
  Worker.all.each do |worker|
    host, pid, thread, queues = worker.id.split(':')
    next unless host.include?(hostname)
    next if worker_pids.include?(pid)
    log! "Pruning dead worker: #{worker}"
    worker.unregister_worker
  end
end
queue() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 33
def queue
  to_s.split(':').last
end
queues() click to toggle source

OVERRIDE for multithreaded workers

# File lib/resque_ui/overrides/resque/worker.rb, line 50
def queues
  Thread.current[:queues] == "*" ? Resque.queues.sort : Thread.current[:queues].split(',')
end
queues_in_pid() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 45
def queues_in_pid
  workers_in_pid.collect { |w| w.queue }
end
quit() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 215
def quit
  if Rails.env =~ /development|test/
    system("kill -INT  #{self.pid}")
  else
    system("cd #{Rails.root}; #{ResqueUi::Cap.path} #{Rails.env} resque:quit_worker pid=#{self.pid} host=#{self.ip}")
  end
end
register_signal_handlers() click to toggle source

Jruby won’t allow you to trap the QUIT signal, so we’re changing the INT signal to replace it for Jruby.

# File lib/resque_ui/overrides/resque/worker.rb, line 104
def register_signal_handlers
  trap('TERM') { shutdown! }
  trap('INT') { shutdown }

  begin
    s = trap('QUIT') { shutdown }
    warn "Signal QUIT not supported." unless s
    s = trap('USR1') { kill_child }
    warn "Signal USR1 not supported." unless s
    s = trap('USR2') { pause_processing }
    warn "Signal USR2 not supported." unless s
    s = trap('CONT') { unpause_processing }
    warn "Signal CONT not supported." unless s
  rescue ArgumentError
    warn "Signals QUIT, USR1, USR2, and/or CONT not supported."
  end
end
restart() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 223
def restart
  queues = self.queues_in_pid.join('#')
  quit
  self.class.start(self.ip, queues)
end
shutdown() click to toggle source
Schedule this worker for shutdown. Will finish processing the
current job.

OVERRIDE for multithreaded workers

# File lib/resque_ui/overrides/resque/worker.rb, line 73
def shutdown
  log 'Exiting...'
  Thread.list.each { |t| t[:shutdown] = true }
  @shutdown = true
end
startup() click to toggle source
Runs all the methods needed when a worker begins its lifecycle.

OVERRIDE for multithreaded workers

# File lib/resque_ui/overrides/resque/worker.rb, line 56
def startup
  enable_gc_optimizations
  if Thread.current == Thread.main
    register_signal_handlers
    prune_dead_workers
  end
  run_hook :before_first_fork
  register_worker

  # Fix buffering so we can `rake resque:work > resque.log` and
  # get output from the child in there.
  $stdout.sync = true
end
status() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 203
def status
  job['status']
end
status=(status) click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 198
def status=(status)
  data = encode(job.merge('status' => status))
  redis.set("worker:#{self}", data)
end
thread() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 29
def thread
  to_s.split(':').third
end
to_s() click to toggle source

The string representation is the same as the id for this worker instance. Can be used with `Worker.find`.

# File lib/resque_ui/overrides/resque/worker.rb, line 19
def to_s
  @to_s || "#{hostname}(#{local_ip}):#{Process.pid}:#{Thread.current.object_id}:#{Thread.current[:queues]}"
end
Also aliased as: id
work(interval = 5, &block) click to toggle source
This is the main workhorse method. Called on a Worker instance,
it begins the worker life cycle.

The following events occur during a worker's life cycle:

1. Startup:   Signals are registered, dead workers are pruned,
              and this worker is registered.
2. Work loop: Jobs are pulled from a queue and processed.
3. Teardown:  This worker is unregistered.

Can be passed an integer representing the polling frequency.
The default is 5 seconds, but for a semi-active site you may
want to use a smaller value.

Also accepts a block which will be passed the job as soon as it
has completed processing. Useful for testing.

OVERRIDE for multithreaded workers

# File lib/resque_ui/overrides/resque/worker.rb, line 139
def work(interval = 5, &block)
  $0 = "resque: Starting"
  startup

  loop do
    break if @shutdown || Thread.current[:shutdown]

    if not @paused and job = reserve
      log "got: #{job.inspect}"
      run_hook :before_fork
      working_on job

      if @child = fork
        rand # Reseeding
        procline "Forked #{@child} at #{Time.now.to_i}"
        Process.wait
      else
        procline "Processing #{job.queue} since #{Time.now.to_i}"
        perform(job, &block)
        exit! unless @cant_fork
      end

      done_working
      @child = nil
    else
      break if interval.to_i == 0
      log! "Sleeping for #{interval.to_i}"
      procline @paused ? "Paused" : "Waiting for #{@queues.join(',')}"
      sleep interval.to_i
    end
  end
  unregister_worker rescue nil
  loop do
    #hang onto the process until all threads are done
    break if all_workers_in_pid_working.blank?
    sleep interval.to_i
  end
ensure
  unregister_worker
end
worker_pids() click to toggle source

Returns an array of string pids of all the other workers on this machine. Useful when pruning dead workers on startup.

# File lib/resque_ui/overrides/resque/worker.rb, line 192
def worker_pids
  `ps -A -o pid,command | grep [r]esque`.split("\n").map do |line|
    line.split(' ')[0]
  end
end
workers_in_pid() click to toggle source
# File lib/resque_ui/overrides/resque/worker.rb, line 37
def workers_in_pid
  Array(redis.smembers(:workers)).select { |id| id =~ /\(#{ip}\):#{pid}/ }.map { |id| Resque::Worker.find(id) }.compact
end

[Validate]

Generated with the Darkfish Rdoc Generator 2.