lib/reactor/event.rb in reactor-0.3.2 vs lib/reactor/event.rb in reactor-0.4.0

- old
+ new

@@ -20,68 +20,66 @@ def to_s name end - def self.publish(name, data = {}) - message = new(data.merge(event: name)) - #if (message.at) - # delay_until(message.at).process name, message.data - #else - # delay.process name, message.data - #end + class << self + def publish(name, data = {}) + message = new(data.merge(event: name)) - if message.at.nil? - delay.process name, message.data - elsif message.at.future? - delay_until(message.at).process name, message.data + if message.at.nil? + delay.process name, message.data + elsif message.at.future? + delay_until(message.at).process name, message.data + end end - end - def self.reschedule(name, data = {}) - job = scheduled_jobs.detect do |job| - job['class'] == name.to_s.camelize && job['at'].to_i == data[:was].to_i + def reschedule(name, data = {}) + job = scheduled_jobs.detect do |job| + job['class'] == name.to_s.camelize && job['at'].to_i == data[:was].to_i + end + remove_scheduled_job job if job + delay.publish(name, data.except(:was)) if data[:at].future? end - remove_scheduled_job job if job - delay.publish(name, data.except(:was)) if data[:at].future? - end - def self.process(name, data) - # fire database listeners - Reactor::Subscriber.where(event: name.to_s).each do |subscriber| - Reactor::Subscriber.delay.fire subscriber.id, data - end + def process(name, data) + data.merge!(fired_at: Time.current) + Reactor::Subscriber.where(event: name.to_s).each do |subscriber| + Reactor::Subscriber.delay.fire subscriber.id, data + end - #TODO: support more matching? - Reactor::Subscriber.where(event: '*').each do |s| - Reactor::Subscriber.delay.fire s.id, data - end + #TODO: support more matching? + Reactor::Subscriber.where(event: '*').each do |s| + Reactor::Subscriber.delay.fire s.id, data + end - if (static_subscribers = Reactor::SUBSCRIBERS[name.to_s] || []).any? - static_subscribers.each do |callback| - case callback - when Hash - callback.keys.first.send callback.values.first, Reactor::Event.new(data.merge(event: name.to_s)) - else - callback.call Reactor::Event.new(data.merge(event: name.to_s)) + if (static_subscribers = Reactor::SUBSCRIBERS[name.to_s] || []).any? + static_subscribers.each do |callback| + delay = callback[:options].try(:[], :delay) || 0 + case method = callback[:method] + when Symbol + callback[:source].delay_for(delay).send method, Reactor::Event.new(data.merge(event: name.to_s)) + else + method.call Reactor::Event.new(data.merge(event: name.to_s)) + end end end end - end - private + def scheduled_jobs(options = {}) + Sidekiq.redis do |r| + from = options[:from] ? options[:from].to_f.to_s : '-inf' + to = options[:to] ? options[:to].to_f.to_s : '+inf' + r.zrangebyscore('schedule', from, to).map{|job| MultiJson.decode(job)} + end + end - def self.scheduled_jobs(options = {}) - Sidekiq.redis do |r| - from = options[:from] ? options[:from].to_f.to_s : '-inf' - to = options[:to] ? options[:to].to_f.to_s : '+inf' - r.zrangebyscore('schedule', from, to).map{|job| MultiJson.decode(job)} + def remove_scheduled_job(job) + Sidekiq.redis { |r| r.zrem 'schedule', MultiJson.encode(job) } end end - def self.remove_scheduled_job(job) - Sidekiq.redis { |r| r.zrem 'schedule', MultiJson.encode(job) } - end + private def try_setter(method, object, *args) if object.is_a? ActiveRecord::Base send("#{method}_id", object.id) send("#{method}_type", object.class.to_s)