lib/resque_admin/worker.rb in resque_admin-1.0.4 vs lib/resque_admin/worker.rb in resque_admin-1.0.5

- old
+ new

@@ -1,50 +1,50 @@ require 'time' require 'set' require 'redis/distributed' -module Resque - # A Resque Worker processes jobs. On platforms that support fork(2), +module ResqueAdmin + # A ResqueAdmin Worker processes jobs. On platforms that support fork(2), # the worker will fork off a child to process each job. This ensures # a clean slate when beginning the next job and cuts down on gradual # memory growth as well as low level failures. # # It also ensures workers are always listening to signals from you, # their master, and can react accordingly. class Worker - include Resque::Helpers - extend Resque::Helpers - include Resque::Logging + include ResqueAdmin::Helpers + extend ResqueAdmin::Helpers + include ResqueAdmin::Logging @@all_heartbeat_threads = [] def self.kill_all_heartbeat_threads @@all_heartbeat_threads.each(&:kill).each(&:join) @@all_heartbeat_threads = [] end def redis - Resque.redis + ResqueAdmin.redis end alias :data_store :redis def self.redis - Resque.redis + ResqueAdmin.redis end def self.data_store self.redis end # Given a Ruby object, returns a string suitable for storage in a # queue. def encode(object) - Resque.encode(object) + ResqueAdmin.encode(object) end # Given a string, returns a Ruby object. def decode(object) - Resque.decode(object) + ResqueAdmin.decode(object) end attr_accessor :term_timeout attr_accessor :pre_shutdown_timeout @@ -198,11 +198,11 @@ return @static_queues if @static_queues @queues.map { |queue| glob_match(queue) }.flatten.uniq end def glob_match(pattern) - Resque.queues.select do |queue| + ResqueAdmin.queues.select do |queue| File.fnmatch?(pattern, queue) end.sort end # This is the main workhorse method. Called on a Worker instance, @@ -311,11 +311,11 @@ # Attempts to grab a job off one of the provided queues. Returns # nil if no job can be found. def reserve queues.each do |queue| log_with_severity :debug, "Checking #{queue}" - if job = Resque.reserve(queue) + if job = ResqueAdmin.reserve(queue) log_with_severity :debug, "Found job on #{queue}" return job end end @@ -487,26 +487,26 @@ id = worker.to_s heartbeat = heartbeats[id] if heartbeat seconds_since_heartbeat = (now - Time.parse(heartbeat)).to_i - seconds_since_heartbeat > Resque.prune_interval + seconds_since_heartbeat > ResqueAdmin.prune_interval else false end end end def start_heartbeat remove_heartbeat - @heartbeat_thread_signal = Resque::ThreadSignal.new + @heartbeat_thread_signal = ResqueAdmin::ThreadSignal.new @heartbeat_thread = Thread.new do loop do heartbeat! - signaled = @heartbeat_thread_signal.wait_for_signal(Resque.heartbeat_interval) + signaled = @heartbeat_thread_signal.wait_for_signal(ResqueAdmin.heartbeat_interval) break if signaled end end @@all_heartbeat_threads << @heartbeat_thread @@ -577,11 +577,11 @@ # Looks for any workers which should be running on this server # and, if they're not, removes them from Redis. # # This is a form of garbage collection. If a server is killed by a # hard shutdown, power failure, or something else beyond our - # control, the Resque workers will not die gracefully and therefore + # control, the ResqueAdmin workers will not die gracefully and therefore # will leave stale state information in Redis. # # By checking the current Redis state against the actual # environment, we can determine if Redis is old and clean it up a bit. def prune_dead_workers @@ -596,11 +596,11 @@ # If the worker hasn't sent a heartbeat, remove it from the registry. # # If the worker hasn't ever sent a heartbeat, we won't remove it since # the first heartbeat is sent before the worker is registred it means # that this is a worker that doesn't support heartbeats, e.g., another - # client library or an older version of Resque. We won't touch these. + # client library or an older version of ResqueAdmin. We won't touch these. if all_workers_with_expired_heartbeats.include?(worker) log_with_severity :info, "Pruning dead worker: #{worker}" worker.unregister_worker(PruneDeadWorkerDirtyExit.new(worker.to_s)) next end @@ -630,11 +630,11 @@ data_store.register_worker(self) end # Runs a named hook, passing along any arguments. def run_hook(name, *args) - return unless hooks = Resque.send(name) + return unless hooks = ResqueAdmin.send(name) return if name == :before_first_fork && @before_first_fork_hook_ran msg = "Running #{name} hooks" msg << " with #{args.inspect}" if args.any? log_with_severity :info, msg @@ -806,37 +806,37 @@ def windows_worker_pids tasklist_output = `tasklist /FI "IMAGENAME eq ruby.exe" /FO list`.encode("UTF-8", Encoding.locale_charmap) tasklist_output.split($/).select { |line| line =~ /^PID:/}.collect{ |line| line.gsub /PID:\s+/, '' } end - # Find Resque worker pids on Linux and OS X. + # Find ResqueAdmin worker pids on Linux and OS X. # def linux_worker_pids `ps -A -o pid,command | grep -E "[r]esque:work|[r]esque:\sStarting|[r]esque-[0-9]" | grep -v "resque-web"`.split("\n").map do |line| line.split(' ')[0] end end - # Find Resque worker pids on Solaris. + # Find ResqueAdmin worker pids on Solaris. # # Returns an Array of string pids of all the other workers on this # machine. Useful when pruning dead workers on startup. def solaris_worker_pids `ps -A -o pid,comm | grep "[r]uby" | grep -v "resque-web"`.split("\n").map do |line| real_pid = line.split(' ')[0] pargs_command = `pargs -a #{real_pid} 2>/dev/null | grep [r]esque | grep -v "resque-web"` - if pargs_command.split(':')[1] == " resque-#{Resque::Version}" + if pargs_command.split(':')[1] == " resque-#{ResqueAdmin::Version}" real_pid end end.compact end # Given a string, sets the procline ($0) and logs. # Procline is always in the format of: # RESQUE_PROCLINE_PREFIXresque-VERSION: STRING def procline(string) - $0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque-#{Resque::Version}: #{string}" + $0 = "#{ENV['RESQUE_PROCLINE_PREFIX']}resque-#{ResqueAdmin::Version}: #{string}" log_with_severity :debug, $0 end def log(message) info(message) @@ -855,27 +855,27 @@ @very_verbose end def verbose=(value); if value && !very_verbose - Resque.logger.formatter = VerboseFormatter.new - Resque.logger.level = Logger::INFO + ResqueAdmin.logger.formatter = VerboseFormatter.new + ResqueAdmin.logger.level = Logger::INFO elsif !value - Resque.logger.formatter = QuietFormatter.new + ResqueAdmin.logger.formatter = QuietFormatter.new end @verbose = value end def very_verbose=(value) if value - Resque.logger.formatter = VeryVerboseFormatter.new - Resque.logger.level = Logger::DEBUG + ResqueAdmin.logger.formatter = VeryVerboseFormatter.new + ResqueAdmin.logger.level = Logger::DEBUG elsif !value && verbose - Resque.logger.formatter = VerboseFormatter.new - Resque.logger.level = Logger::INFO + ResqueAdmin.logger.formatter = VerboseFormatter.new + ResqueAdmin.logger.level = Logger::INFO else - Resque.logger.formatter = QuietFormatter.new + ResqueAdmin.logger.formatter = QuietFormatter.new end @very_verbose = value end