lib/async_observer/worker.rb in beanstalker-0.0.1 vs lib/async_observer/worker.rb in beanstalker-0.1.1

- old
+ new

@@ -13,24 +13,15 @@ # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see <http://www.gnu.org/licenses/>. - -begin - require 'mysql' -rescue LoadError - # Ignore case where we don't have mysql -end require 'async_observer/queue' -require 'async_observer/util' module AsyncObserver; end class AsyncObserver::Worker - extend AsyncObserver::Util - include AsyncObserver::Util SLEEP_TIME = 60 if !defined?(SLEEP_TIME) # rails loads this file twice class << self attr_accessor :finish @@ -53,104 +44,94 @@ def before_reserve(&block) before_reserves << block end def run_before_reserve - before_reserves.each {|b| b.call()} + before_reserves.each {|b| b.call} end end + + def logger + $logger or RAILS_DEFAULT_LOGGER + end - def initialize(top_binding) + def initialize(top_binding, options = {}) @top_binding = top_binding @stop = false + @options = options + if @options && @options[:servers] + AsyncObserver::Queue.queue = Beanstalk::Pool.new(@options[:servers]) + end end - def main_loop() + def main_loop trap('TERM') { @stop = true } loop do break if @stop - safe_dispatch(get_job()) + safe_dispatch(get_job) end end - def startup() - log_bracketed('worker-startup') do - appver = AsyncObserver::Queue.app_version - RAILS_DEFAULT_LOGGER.info "pid is #{$$}" - RAILS_DEFAULT_LOGGER.info "app version is #{appver}" - mark_db_socket_close_on_exec() - if AsyncObserver::Queue.queue.nil? - RAILS_DEFAULT_LOGGER.info 'no queue has been configured' - exit(1) - end - AsyncObserver::Queue.queue.watch(appver) if appver - end + def startup + tube = @options[:tube] || "default" + logger.info "Using tube #{tube}" + AsyncObserver::Queue.queue.watch(tube) flush_logger end - # This prevents us from leaking fds when we exec. Only works for mysql. - def mark_db_socket_close_on_exec() - ActiveRecord::Base.active_connections.each(&:set_close_on_exec) - rescue NoMethodError + def shutdown + do_all_work end - def shutdown() - log_bracketed('worker-shutdown') do - do_all_work() - end - end - - def run() - startup() - main_loop() + def run + startup + main_loop rescue Interrupt - shutdown() + shutdown end - def q_hint() + def q_hint @q_hint || AsyncObserver::Queue.queue end # This heuristic is to help prevent one queue from starving. The idea is that # if the connection returns a job right away, it probably has more available. # But if it takes time, then it's probably empty. So reuse the same # connection as long as it stays fast. Otherwise, have no preference. - def reserve_and_set_hint() + def reserve_and_set_hint t1 = Time.now.utc - return job = q_hint().reserve() + return job = q_hint.reserve ensure t2 = Time.now.utc @q_hint = if brief?(t1, t2) and job then job.conn else nil end end def brief?(t1, t2) ((t2 - t1) * 100).to_i.abs < 10 end - def get_job() - log_bracketed('worker-get-job') do - loop do - begin - AsyncObserver::Queue.queue.connect() - self.class.run_before_reserve - return reserve_and_set_hint() - rescue Interrupt => ex - raise ex - rescue SignalException => ex - raise ex - rescue Beanstalk::DeadlineSoonError - # Do nothing; immediately try again, giving the user a chance to - # clean up in the before_reserve hook. - RAILS_DEFAULT_LOGGER.info 'Job deadline soon; you should clean up.' - rescue Exception => ex - @q_hint = nil # in case there's something wrong with this conn - RAILS_DEFAULT_LOGGER.info( - "#{ex.class}: #{ex}\n" + ex.backtrace.join("\n")) - RAILS_DEFAULT_LOGGER.info 'something is wrong. We failed to get a job.' - RAILS_DEFAULT_LOGGER.info "sleeping for #{SLEEP_TIME}s..." - sleep(SLEEP_TIME) - end + def get_job + loop do + begin + AsyncObserver::Queue.queue.connect + self.class.run_before_reserve + return reserve_and_set_hint + rescue Interrupt => ex + raise ex + rescue SignalException => ex + raise ex + rescue Beanstalk::DeadlineSoonError + # Do nothing; immediately try again, giving the user a chance to + # clean up in the before_reserve hook. + logger.info 'Job deadline soon; you should clean up.' + rescue Exception => ex + @q_hint = nil # in case there's something wrong with this conn + logger.info( + "#{ex.class}: #{ex}\n" + ex.backtrace.join("\n")) + logger.info 'something is wrong. We failed to get a job.' + logger.info "sleeping for #{SLEEP_TIME}s..." + sleep(SLEEP_TIME) end end end def dispatch(job) @@ -158,34 +139,30 @@ return run_ao_job(job) if async_observer_job?(job) return run_other(job) end def safe_dispatch(job) - log_bracketed('worker-dispatch') do - RAILS_DEFAULT_LOGGER.info "got #{job.inspect}:\n" + job.body - log_bracketed('job-stats') do - job.stats.each do |k,v| - RAILS_DEFAULT_LOGGER.info "#{k}=#{v}" - end - end - begin - return dispatch(job) - rescue Interrupt => ex - begin job.release() rescue :ok end - raise ex - rescue Exception => ex - handle_error(job, ex) - ensure - flush_logger - end + logger.info "got #{job.inspect}:\n" + job.body + job.stats.each do |k,v| + logger.info "#{k}=#{v}" end + begin + return dispatch(job) + rescue Interrupt => ex + begin job.release rescue :ok end + raise ex + rescue Exception => ex + handle_error(job, ex) + ensure + flush_logger + end end def flush_logger - if defined?(RAILS_DEFAULT_LOGGER) && - RAILS_DEFAULT_LOGGER.respond_to?(:flush) - RAILS_DEFAULT_LOGGER.flush + if defined?(logger) && + logger.respond_to?(:flush) + logger.flush end end def handle_error(job, ex) if self.class.custom_error_handler @@ -194,29 +171,34 @@ self.class.default_handle_error(job, ex) end end def self.default_handle_error(job, ex) - RAILS_DEFAULT_LOGGER.info "Job failed: #{job.server}/#{job.id}" - RAILS_DEFAULT_LOGGER.info("#{ex.class}: #{ex}\n" + ex.backtrace.join("\n")) - job.decay() + logger.info "Job failed: #{job.server}/#{job.id}" + logger.info("#{ex.class}: #{ex}\n" + ex.backtrace.join("\n")) + if job.stats['releases'] > 10 + job.bury + logger.info "BURY job due to many releases" + else + job.decay + end rescue Beanstalk::UnexpectedResponse end def run_ao_job(job) - RAILS_DEFAULT_LOGGER.info 'running as async observer job' + logger.info 'running as async observer job' f = self.class.before_filter f.call(job) if f job.delete if job.ybody[:delete_first] run_code(job) - job.delete() unless job.ybody[:delete_first] + job.delete unless job.ybody[:delete_first] rescue ActiveRecord::RecordNotFound => ex unless job.ybody[:delete_first] if job.age > 60 - job.delete() # it's old; this error is most likely permanent + job.delete # it's old; this error is most likely permanent else - job.decay() # it could be replication delay so retry quietly + job.decay # it could be replication delay so retry quietly end end end def run_code(job) @@ -226,38 +208,16 @@ def async_observer_job?(job) begin job.ybody[:type] == :rails rescue false end end def run_other(job) - RAILS_DEFAULT_LOGGER.info 'trying custom handler' + logger.info 'trying custom handler' self.class.handle.call(job) end - def do_all_work() - RAILS_DEFAULT_LOGGER.info 'finishing all running jobs. interrupt again to kill them.' + def do_all_work + logger.info 'finishing all running jobs. interrupt again to kill them.' f = self.class.finish - f.call() if f + f.call if f end end -class ActiveRecord::ConnectionAdapters::MysqlAdapter < ActiveRecord::ConnectionAdapters::AbstractAdapter - def set_close_on_exec() - @connection.set_close_on_exec() - end -end - -class Mysql - def set_close_on_exec() - if @net - @net.set_close_on_exec() - else - # we are in the c mysql binding - RAILS_DEFAULT_LOGGER.info "Warning: we are using the C mysql binding, can't set close-on-exec" - end - end -end - -class Mysql::Net - def set_close_on_exec() - @sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) - end -end