lib/async_observer/worker.rb in beanstalker-0.0.1 vs lib/async_observer/worker.rb in beanstalker-0.1.1
- old
+ new
@@ -13,24 +13,15 @@
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
-
-begin
- require 'mysql'
-rescue LoadError
- # Ignore case where we don't have mysql
-end
require 'async_observer/queue'
-require 'async_observer/util'
module AsyncObserver; end
class AsyncObserver::Worker
- extend AsyncObserver::Util
- include AsyncObserver::Util
SLEEP_TIME = 60 if !defined?(SLEEP_TIME) # rails loads this file twice
class << self
attr_accessor :finish
@@ -53,104 +44,94 @@
def before_reserve(&block)
before_reserves << block
end
def run_before_reserve
- before_reserves.each {|b| b.call()}
+ before_reserves.each {|b| b.call}
end
end
+
+ def logger
+ $logger or RAILS_DEFAULT_LOGGER
+ end
- def initialize(top_binding)
+ def initialize(top_binding, options = {})
@top_binding = top_binding
@stop = false
+ @options = options
+ if @options && @options[:servers]
+ AsyncObserver::Queue.queue = Beanstalk::Pool.new(@options[:servers])
+ end
end
- def main_loop()
+ def main_loop
trap('TERM') { @stop = true }
loop do
break if @stop
- safe_dispatch(get_job())
+ safe_dispatch(get_job)
end
end
- def startup()
- log_bracketed('worker-startup') do
- appver = AsyncObserver::Queue.app_version
- RAILS_DEFAULT_LOGGER.info "pid is #{$$}"
- RAILS_DEFAULT_LOGGER.info "app version is #{appver}"
- mark_db_socket_close_on_exec()
- if AsyncObserver::Queue.queue.nil?
- RAILS_DEFAULT_LOGGER.info 'no queue has been configured'
- exit(1)
- end
- AsyncObserver::Queue.queue.watch(appver) if appver
- end
+ def startup
+ tube = @options[:tube] || "default"
+ logger.info "Using tube #{tube}"
+ AsyncObserver::Queue.queue.watch(tube)
flush_logger
end
- # This prevents us from leaking fds when we exec. Only works for mysql.
- def mark_db_socket_close_on_exec()
- ActiveRecord::Base.active_connections.each(&:set_close_on_exec)
- rescue NoMethodError
+ def shutdown
+ do_all_work
end
- def shutdown()
- log_bracketed('worker-shutdown') do
- do_all_work()
- end
- end
-
- def run()
- startup()
- main_loop()
+ def run
+ startup
+ main_loop
rescue Interrupt
- shutdown()
+ shutdown
end
- def q_hint()
+ def q_hint
@q_hint || AsyncObserver::Queue.queue
end
# This heuristic is to help prevent one queue from starving. The idea is that
# if the connection returns a job right away, it probably has more available.
# But if it takes time, then it's probably empty. So reuse the same
# connection as long as it stays fast. Otherwise, have no preference.
- def reserve_and_set_hint()
+ def reserve_and_set_hint
t1 = Time.now.utc
- return job = q_hint().reserve()
+ return job = q_hint.reserve
ensure
t2 = Time.now.utc
@q_hint = if brief?(t1, t2) and job then job.conn else nil end
end
def brief?(t1, t2)
((t2 - t1) * 100).to_i.abs < 10
end
- def get_job()
- log_bracketed('worker-get-job') do
- loop do
- begin
- AsyncObserver::Queue.queue.connect()
- self.class.run_before_reserve
- return reserve_and_set_hint()
- rescue Interrupt => ex
- raise ex
- rescue SignalException => ex
- raise ex
- rescue Beanstalk::DeadlineSoonError
- # Do nothing; immediately try again, giving the user a chance to
- # clean up in the before_reserve hook.
- RAILS_DEFAULT_LOGGER.info 'Job deadline soon; you should clean up.'
- rescue Exception => ex
- @q_hint = nil # in case there's something wrong with this conn
- RAILS_DEFAULT_LOGGER.info(
- "#{ex.class}: #{ex}\n" + ex.backtrace.join("\n"))
- RAILS_DEFAULT_LOGGER.info 'something is wrong. We failed to get a job.'
- RAILS_DEFAULT_LOGGER.info "sleeping for #{SLEEP_TIME}s..."
- sleep(SLEEP_TIME)
- end
+ def get_job
+ loop do
+ begin
+ AsyncObserver::Queue.queue.connect
+ self.class.run_before_reserve
+ return reserve_and_set_hint
+ rescue Interrupt => ex
+ raise ex
+ rescue SignalException => ex
+ raise ex
+ rescue Beanstalk::DeadlineSoonError
+ # Do nothing; immediately try again, giving the user a chance to
+ # clean up in the before_reserve hook.
+ logger.info 'Job deadline soon; you should clean up.'
+ rescue Exception => ex
+ @q_hint = nil # in case there's something wrong with this conn
+ logger.info(
+ "#{ex.class}: #{ex}\n" + ex.backtrace.join("\n"))
+ logger.info 'something is wrong. We failed to get a job.'
+ logger.info "sleeping for #{SLEEP_TIME}s..."
+ sleep(SLEEP_TIME)
end
end
end
def dispatch(job)
@@ -158,34 +139,30 @@
return run_ao_job(job) if async_observer_job?(job)
return run_other(job)
end
def safe_dispatch(job)
- log_bracketed('worker-dispatch') do
- RAILS_DEFAULT_LOGGER.info "got #{job.inspect}:\n" + job.body
- log_bracketed('job-stats') do
- job.stats.each do |k,v|
- RAILS_DEFAULT_LOGGER.info "#{k}=#{v}"
- end
- end
- begin
- return dispatch(job)
- rescue Interrupt => ex
- begin job.release() rescue :ok end
- raise ex
- rescue Exception => ex
- handle_error(job, ex)
- ensure
- flush_logger
- end
+ logger.info "got #{job.inspect}:\n" + job.body
+ job.stats.each do |k,v|
+ logger.info "#{k}=#{v}"
end
+ begin
+ return dispatch(job)
+ rescue Interrupt => ex
+ begin job.release rescue :ok end
+ raise ex
+ rescue Exception => ex
+ handle_error(job, ex)
+ ensure
+ flush_logger
+ end
end
def flush_logger
- if defined?(RAILS_DEFAULT_LOGGER) &&
- RAILS_DEFAULT_LOGGER.respond_to?(:flush)
- RAILS_DEFAULT_LOGGER.flush
+ if defined?(logger) &&
+ logger.respond_to?(:flush)
+ logger.flush
end
end
def handle_error(job, ex)
if self.class.custom_error_handler
@@ -194,29 +171,34 @@
self.class.default_handle_error(job, ex)
end
end
def self.default_handle_error(job, ex)
- RAILS_DEFAULT_LOGGER.info "Job failed: #{job.server}/#{job.id}"
- RAILS_DEFAULT_LOGGER.info("#{ex.class}: #{ex}\n" + ex.backtrace.join("\n"))
- job.decay()
+ logger.info "Job failed: #{job.server}/#{job.id}"
+ logger.info("#{ex.class}: #{ex}\n" + ex.backtrace.join("\n"))
+ if job.stats['releases'] > 10
+ job.bury
+ logger.info "BURY job due to many releases"
+ else
+ job.decay
+ end
rescue Beanstalk::UnexpectedResponse
end
def run_ao_job(job)
- RAILS_DEFAULT_LOGGER.info 'running as async observer job'
+ logger.info 'running as async observer job'
f = self.class.before_filter
f.call(job) if f
job.delete if job.ybody[:delete_first]
run_code(job)
- job.delete() unless job.ybody[:delete_first]
+ job.delete unless job.ybody[:delete_first]
rescue ActiveRecord::RecordNotFound => ex
unless job.ybody[:delete_first]
if job.age > 60
- job.delete() # it's old; this error is most likely permanent
+ job.delete # it's old; this error is most likely permanent
else
- job.decay() # it could be replication delay so retry quietly
+ job.decay # it could be replication delay so retry quietly
end
end
end
def run_code(job)
@@ -226,38 +208,16 @@
def async_observer_job?(job)
begin job.ybody[:type] == :rails rescue false end
end
def run_other(job)
- RAILS_DEFAULT_LOGGER.info 'trying custom handler'
+ logger.info 'trying custom handler'
self.class.handle.call(job)
end
- def do_all_work()
- RAILS_DEFAULT_LOGGER.info 'finishing all running jobs. interrupt again to kill them.'
+ def do_all_work
+ logger.info 'finishing all running jobs. interrupt again to kill them.'
f = self.class.finish
- f.call() if f
+ f.call if f
end
end
-class ActiveRecord::ConnectionAdapters::MysqlAdapter < ActiveRecord::ConnectionAdapters::AbstractAdapter
- def set_close_on_exec()
- @connection.set_close_on_exec()
- end
-end
-
-class Mysql
- def set_close_on_exec()
- if @net
- @net.set_close_on_exec()
- else
- # we are in the c mysql binding
- RAILS_DEFAULT_LOGGER.info "Warning: we are using the C mysql binding, can't set close-on-exec"
- end
- end
-end
-
-class Mysql::Net
- def set_close_on_exec()
- @sock.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
- end
-end