# async-observer - Rails plugin for asynchronous job execution
# Copyright (C) 2007 Philotic Inc.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see .
require 'beanstalk-client'
module AsyncObserver; end
class AsyncObserver::Queue; end
class << AsyncObserver::Queue
DEFAULT_PRI = 512
DEFAULT_FUZZ = 0
DEFAULT_DELAY = 0
DEFAULT_TTR = 120
DEFAULT_TUBE = 'default'
attr_accessor :queue, :after_put
# This is a fake worker instance for running jobs synchronously.
def sync_worker()
require 'async_observer/worker'
@sync_worker ||= AsyncObserver::Worker.new(binding)
end
# This runs jobs synchronously; it's used when no queue is configured.
def sync_run(obj)
body = YAML.dump(obj)
job = Beanstalk::Job.new(AsyncObserver::FakeConn.new(), 0, body)
sync_worker.dispatch(job)
sync_worker.do_all_work()
return 0, '0.0.0.0'
end
def put!(obj, pri=DEFAULT_PRI, delay=DEFAULT_DELAY, ttr=DEFAULT_TTR,
tube=DEFAULT_TUBE)
return sync_run(obj) if pri == :direct || !queue
queue.connect()
queue.use(tube)
info = [queue.yput(obj, pri, delay, ttr), queue.last_server]
f = AsyncObserver::Queue.after_put
f.call(*info) if f
return info
end
SUBMIT_OPTS = [:pri, :fuzz, :delay, :ttr, :tube]
def put_call!(obj, sel, opts, args=[])
pri = opts.fetch(:pri, DEFAULT_PRI)
fuzz = opts.fetch(:fuzz, DEFAULT_FUZZ)
delay = opts.fetch(:delay, DEFAULT_DELAY)
ttr = opts.fetch(:ttr, DEFAULT_TTR)
tube = opts.fetch(:tube, DEFAULT_TUBE)
worker_opts = opts.reject{|k,v| SUBMIT_OPTS.include?(k)}
interpolator = opts.fetch(:interpolator, nil)
pri = pri + rand(fuzz + 1) if pri != :direct
if interpolator
code = packed = interpolator
else
code = gen(obj, sel, args)
packed = pkg(code, worker_opts)
end
RAILS_DEFAULT_LOGGER.info("put #{pri} #{code} to #{tube}")
put!(packed, pri, delay, ttr, tube)
end
def pkg(code, opts)
opts.merge(:type => :rails, :code => code)
end
# Be careful not to pass in a selector that's not valid ruby source.
def gen(obj, selector, args)
obj.rrepr + '.' + selector.to_s + '(' + gen_args(args) + ')'
end
def gen_args(args)
args.rrepr[1...-1]
end
end
class AsyncObserver::FakeConn
def delete(x)
end
def release(x, y, z)
end
def bury(x, y)
end
def addr
''
end
def job_stats(id)
{
'id' => id,
'state' => 'reserved',
'age' => 0,
'delay' => 0,
'time-left' => 5000,
'timeouts' => 0,
'releases' => 0,
'buries' => 0,
'kicks' => 0,
}
end
end
# This is commented out to workaround (what we think is) a ruby bug in method
# lookup. Somehow the methods defined here are being used instead of ones in
# ActiveRecord::Base.
#class Object
# def rrepr()
# raise ArgumentError.new('no consistent external repr for ' + self.inspect)
# end
#end
class Symbol
def rrepr() inspect end
end
class Module
def rrepr() name end
end
class NilClass
def rrepr() inspect end
end
class FalseClass
def rrepr() inspect end
end
class TrueClass
def rrepr() inspect end
end
class Numeric
def rrepr() inspect end
end
class String
def rrepr() inspect end
end
class Array
def rrepr() '[' + map(&:rrepr).join(', ') + ']' end
end
class Hash
def rrepr() '{' + map{|k,v| k.rrepr + '=>' + v.rrepr}.join(', ') + '}' end
end
class Range
def rrepr() "(#{first.rrepr}#{exclude_end? ? '...' : '..'}#{last.rrepr})" end
end
class Time
def rrepr() "Time.parse('#{self.inspect}')" end
end
class Date
def rrepr() "Date.parse('#{self.inspect}')" end
end
module AsyncObserver::Extensions
def rrepr()
method = (respond_to? :get_cache) ? 'get_cache' : 'find'
"#{self.class.rrepr}.#{method}(#{id.rrepr})"
end
end