lib/resque_admin/worker.rb in resque_admin-1.0.4 vs lib/resque_admin/worker.rb in resque_admin-1.0.5
- old
+ new
@@ -1,50 +1,50 @@
require 'time'
require 'set'
require 'redis/distributed'
-module Resque
- # A Resque Worker processes jobs. On platforms that support fork(2),
+module ResqueAdmin
+ # A ResqueAdmin Worker processes jobs. On platforms that support fork(2),
# the worker will fork off a child to process each job. This ensures
# a clean slate when beginning the next job and cuts down on gradual
# memory growth as well as low level failures.
#
# It also ensures workers are always listening to signals from you,
# their master, and can react accordingly.
class Worker
- include Resque::Helpers
- extend Resque::Helpers
- include Resque::Logging
+ include ResqueAdmin::Helpers
+ extend ResqueAdmin::Helpers
+ include ResqueAdmin::Logging
@@all_heartbeat_threads = []
def self.kill_all_heartbeat_threads
@@all_heartbeat_threads.each(&:kill).each(&:join)
@@all_heartbeat_threads = []
end
def redis
- Resque.redis
+ ResqueAdmin.redis
end
alias :data_store :redis
def self.redis
- Resque.redis
+ ResqueAdmin.redis
end
def self.data_store
self.redis
end
# Given a Ruby object, returns a string suitable for storage in a
# queue.
def encode(object)
- Resque.encode(object)
+ ResqueAdmin.encode(object)
end
# Given a string, returns a Ruby object.
def decode(object)
- Resque.decode(object)
+ ResqueAdmin.decode(object)
end
attr_accessor :term_timeout
attr_accessor :pre_shutdown_timeout
@@ -198,11 +198,11 @@
return @static_queues if @static_queues
@queues.map { |queue| glob_match(queue) }.flatten.uniq
end
def glob_match(pattern)
- Resque.queues.select do |queue|
+ ResqueAdmin.queues.select do |queue|
File.fnmatch?(pattern, queue)
end.sort
end
# This is the main workhorse method. Called on a Worker instance,
@@ -311,11 +311,11 @@
# Attempts to grab a job off one of the provided queues. Returns
# nil if no job can be found.
def reserve
queues.each do |queue|
log_with_severity :debug, "Checking #{queue}"
- if job = Resque.reserve(queue)
+ if job = ResqueAdmin.reserve(queue)
log_with_severity :debug, "Found job on #{queue}"
return job
end
end
@@ -487,26 +487,26 @@
id = worker.to_s
heartbeat = heartbeats[id]
if heartbeat
seconds_since_heartbeat = (now - Time.parse(heartbeat)).to_i
- seconds_since_heartbeat > Resque.prune_interval
+ seconds_since_heartbeat > ResqueAdmin.prune_interval
else
false
end
end
end
def start_heartbeat
remove_heartbeat
- @heartbeat_thread_signal = Resque::ThreadSignal.new
+ @heartbeat_thread_signal = ResqueAdmin::ThreadSignal.new
@heartbeat_thread = Thread.new do
loop do
heartbeat!
- signaled = @heartbeat_thread_signal.wait_for_signal(Resque.heartbeat_interval)
+ signaled = @heartbeat_thread_signal.wait_for_signal(ResqueAdmin.heartbeat_interval)
break if signaled
end
end
@@all_heartbeat_threads << @heartbeat_thread
@@ -577,11 +577,11 @@
# Looks for any workers which should be running on this server
# and, if they're not, removes them from Redis.
#
# This is a form of garbage collection. If a server is killed by a
# hard shutdown, power failure, or something else beyond our
- # control, the Resque workers will not die gracefully and therefore
+ # control, the ResqueAdmin workers will not die gracefully and therefore
# will leave stale state information in Redis.
#
# By checking the current Redis state against the actual
# environment, we can determine if Redis is old and clean it up a bit.
def prune_dead_workers
@@ -596,11 +596,11 @@
# If the worker hasn't sent a heartbeat, remove it from the registry.
#
# If the worker hasn't ever sent a heartbeat, we won't remove it since
# the first heartbeat is sent before the worker is registred it means
# that this is a worker that doesn't support heartbeats, e.g., another
- # client library or an older version of Resque. We won't touch these.
+ # client library or an older version of ResqueAdmin. We won't touch these.
if all_workers_with_expired_heartbeats.include?(worker)
log_with_severity :info, "Pruning dead worker: #{worker}"
worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s))
next
end
@@ -630,11 +630,11 @@
data_store.register_worker(self)
end
# Runs a named hook, passing along any arguments.
def run_hook(name, *args)
- return unless hooks = Resque.send(name)
+ return unless hooks = ResqueAdmin.send(name)
return if name == :before_first_fork && @before_first_fork_hook_ran
msg = "Running #{name} hooks"
msg << " with #{args.inspect}" if args.any?
log_with_severity :info, msg
@@ -806,37 +806,37 @@
def windows_worker_pids
tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap)
tasklist_output.split($/).select { |line| line =~ /^PID:/}.collect{ |line| line.gsub /PID:\s+/, '' }
end
- # Find Resque worker pids on Linux and OS X.
+ # Find ResqueAdmin worker pids on Linux and OS X.
#
def linux_worker_pids
`ps -A -o pid,command | grep -E "[r]esque:work|[r]esque:\sStarting|[r]esque-[0-9]" | grep -v "resque-web"`.split("\n").map do |line|
line.split(' ')[0]
end
end
- # Find Resque worker pids on Solaris.
+ # Find ResqueAdmin worker pids on Solaris.
#
# Returns an Array of string pids of all the other workers on this
# machine. Useful when pruning dead workers on startup.
def solaris_worker_pids
`ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line|
real_pid = line.split(' ')[0]
pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"`
- if pargs_command.split(':')[1] == " resque-#{Resque::Version}"
+ if pargs_command.split(':')[1] == " resque-#{ResqueAdmin::Version}"
real_pid
end
end.compact
end
# Given a string, sets the procline ($0) and logs.
# Procline is always in the format of:
# RESQUE_PROCLINE_PREFIXresque-VERSION: STRING
def procline(string)
- $0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque-#{Resque::Version}: #{string}"
+ $0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque-#{ResqueAdmin::Version}: #{string}"
log_with_severity :debug, $0
end
def log(message)
info(message)
@@ -855,27 +855,27 @@
@very_verbose
end
def verbose=(value);
if value && !very_verbose
- Resque.logger.formatter = VerboseFormatter.new
- Resque.logger.level = Logger::INFO
+ ResqueAdmin.logger.formatter = VerboseFormatter.new
+ ResqueAdmin.logger.level = Logger::INFO
elsif !value
- Resque.logger.formatter = QuietFormatter.new
+ ResqueAdmin.logger.formatter = QuietFormatter.new
end
@verbose = value
end
def very_verbose=(value)
if value
- Resque.logger.formatter = VeryVerboseFormatter.new
- Resque.logger.level = Logger::DEBUG
+ ResqueAdmin.logger.formatter = VeryVerboseFormatter.new
+ ResqueAdmin.logger.level = Logger::DEBUG
elsif !value && verbose
- Resque.logger.formatter = VerboseFormatter.new
- Resque.logger.level = Logger::INFO
+ ResqueAdmin.logger.formatter = VerboseFormatter.new
+ ResqueAdmin.logger.level = Logger::INFO
else
- Resque.logger.formatter = QuietFormatter.new
+ ResqueAdmin.logger.formatter = QuietFormatter.new
end
@very_verbose = value
end