# async-observer - Rails plugin for asynchronous job execution # Copyright (C) 2007 Philotic Inc. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . require 'beanstalker/queue' CLASSES_TO_EXTEND = [ ActiveRecord::Base, Array, Hash, Module, Numeric, Range, String, Symbol, ] module Beanstalker::Extensions def self.included(receiver) @@methods_async_options = {} receiver.extend(ClassMethods) end module ClassMethods def async_method(method, options = {}) methods_async_options = class_variable_get(:@@methods_async_options) if options class_variable_set(:@@methods_async_options, methods_async_options.merge(method.to_sym => options)) end end end def interpolate_async_options(options, object) result = {} options.each do |k,v| result[k] = if v.is_a?(Proc) v.call(object) else v end end result end def async_send(selector, *args) async_send_opts(selector, @@methods_async_options[selector.to_sym] || {}, *args) end def async_send_opts(selector, opts, *args) interpolated_options = interpolate_async_options(opts, self) Beanstalker::Queue.put_call!(self, selector, interpolated_options, args) end end CLASSES_TO_EXTEND.each do |c| c.send :include, Beanstalker::Extensions end class Range DEFAULT_FANOUT_FUZZ = 0 DEFAULT_FANOUT_DEGREE = 1000 def split_to(n) split_by((size + n - 1) / n) { |group| yield(group) } end def split_by(n) raise ArgumentError.new('invalid slice size') if n < 1 n -= 1 if !exclude_end? i = first while member?(i) j = [i + n, last].min yield(Range.new(i, j, exclude_end?)) i = j + (exclude_end? ? 0 : 1) end end def size last - first + (exclude_end? ? 0 : 1) end def async_each_opts(rcv, selector, opts, *extra) fanout_degree = opts.fetch(:fanout_degree, DEFAULT_FANOUT_DEGREE) if size <= fanout_degree each {|i| rcv.async_send_opts(selector, opts, i, *extra)} else fanout_opts = opts.merge(:fuzz => opts.fetch(:fanout_fuzz, DEFAULT_FANOUT_FUZZ)) fanout_opts[:pri] = opts[:fanout_pri] || opts[:pri] fanout_opts = fanout_opts.reject{ |k,v| v.nil? } split_to(fanout_degree) do |subrange| subrange.async_send_opts(:async_each_opts, fanout_opts, rcv, selector, opts, *extra) end end end def async_each(rcv, selector, *extra) async_each_opts(rcv, selector, {}, *extra) end end HOOKS = [:after_create, :after_update, :after_save] class << ActiveRecord::Base HOOKS.each do |hook| code = %Q{def async_#{hook}(*methods, &b) add_async_hook(#{hook.inspect}, *methods, &b) end} class_eval(code, __FILE__, __LINE__ - 1) end def add_async_hook(hook, *args, &block) if args && args.first.is_a?(Symbol) method = args.shift async_hooks[hook] << lambda{|o| o.send(method)} else async_hooks[hook] << block end end def async_hooks @async_hooks ||= Hash.new do |hash, hook| ahook = :"_async_#{hook}" # This is for the producer's benefit send(hook){|o| async_send(ahook, o)} # This is for the worker's benefit code = "def #{ahook}(o) run_async_hooks(#{hook.inspect}, o) end" instance_eval(code, __FILE__, __LINE__ - 1) hash[hook] = [] end end def run_async_hooks(hook, o) async_hooks[hook].each{|b| b.call(o)} end def send_to_instance(id, selector, *args) x = find_by_id(id) x.send(selector, *args) if x end def async_each_opts(selector, opts, *args) min = opts.fetch(:min, minimum(:id)) max = opts.fetch(:max, maximum(:id)) (min..max).async_each_opts(self, :send_to_instance, opts, selector, *args) end def async_each(selector, *args) async_each_opts(selector, {}, *args) end end