lib/async_observer/extend.rb in beanstalker-0.0.1 vs lib/async_observer/extend.rb in beanstalker-0.1.1

- old
+ new

@@ -27,16 +27,43 @@ String, Symbol, ] module AsyncObserver::Extensions + def self.included(receiver) + @@methods_async_options = {} + receiver.extend(ClassMethods) + end + + module ClassMethods + def async_method(method, options = {}) + methods_async_options = class_variable_get(:@@methods_async_options) + if options + class_variable_set(:@@methods_async_options, methods_async_options.merge(method.to_sym => options)) + end + end + end + + def interpolate_async_options(options, object) + result = {} + options.each do |k,v| + result[k] = if v.is_a?(Proc) + v.call(object) + else + v + end + end + result + end + def async_send(selector, *args) - async_send_opts(selector, {}, *args) + async_send_opts(selector, @@methods_async_options[selector.to_sym] || {}, *args) end def async_send_opts(selector, opts, *args) - AsyncObserver::Queue.put_call!(self, selector, opts, args) + interpolated_options = interpolate_async_options(opts, self) + AsyncObserver::Queue.put_call!(self, selector, interpolated_options, args) end end CLASSES_TO_EXTEND.each do |c| c.send :include, AsyncObserver::Extensions @@ -71,10 +98,10 @@ each {|i| rcv.async_send_opts(selector, opts, i, *extra)} else fanout_opts = opts.merge(:fuzz => opts.fetch(:fanout_fuzz, DEFAULT_FANOUT_FUZZ)) fanout_opts[:pri] = opts[:fanout_pri] || opts[:pri] - fanout_opts = fanout_opts.reject_hash{|k,v| nil.equal?(v)} + fanout_opts = fanout_opts.reject{ |k,v| v.nil? } split_to(fanout_degree) do |subrange| subrange.async_send_opts(:async_each_opts, fanout_opts, rcv, selector, opts, *extra) end end