lib/async_observer/extend.rb in beanstalker-0.0.1 vs lib/async_observer/extend.rb in beanstalker-0.1.1
- old
+ new
@@ -27,16 +27,43 @@
String,
Symbol,
]
module AsyncObserver::Extensions
+ def self.included(receiver)
+ @@methods_async_options = {}
+ receiver.extend(ClassMethods)
+ end
+
+ module ClassMethods
+ def async_method(method, options = {})
+ methods_async_options = class_variable_get(:@@methods_async_options)
+ if options
+ class_variable_set(:@@methods_async_options, methods_async_options.merge(method.to_sym => options))
+ end
+ end
+ end
+
+ def interpolate_async_options(options, object)
+ result = {}
+ options.each do |k,v|
+ result[k] = if v.is_a?(Proc)
+ v.call(object)
+ else
+ v
+ end
+ end
+ result
+ end
+
def async_send(selector, *args)
- async_send_opts(selector, {}, *args)
+ async_send_opts(selector, @@methods_async_options[selector.to_sym] || {}, *args)
end
def async_send_opts(selector, opts, *args)
- AsyncObserver::Queue.put_call!(self, selector, opts, args)
+ interpolated_options = interpolate_async_options(opts, self)
+ AsyncObserver::Queue.put_call!(self, selector, interpolated_options, args)
end
end
CLASSES_TO_EXTEND.each do |c|
c.send :include, AsyncObserver::Extensions
@@ -71,10 +98,10 @@
each {|i| rcv.async_send_opts(selector, opts, i, *extra)}
else
fanout_opts = opts.merge(:fuzz => opts.fetch(:fanout_fuzz,
DEFAULT_FANOUT_FUZZ))
fanout_opts[:pri] = opts[:fanout_pri] || opts[:pri]
- fanout_opts = fanout_opts.reject_hash{|k,v| nil.equal?(v)}
+ fanout_opts = fanout_opts.reject{ |k,v| v.nil? }
split_to(fanout_degree) do |subrange|
subrange.async_send_opts(:async_each_opts, fanout_opts, rcv, selector,
opts, *extra)
end
end