lib/reactor/event.rb in reactor-0.4.0 vs lib/reactor/event.rb in reactor-0.4.2

- old
+ new

@@ -1,17 +1,42 @@ class Reactor::Event include Reactor::OptionallySubclassable + include Sidekiq::Worker attr_accessor :data def initialize(data = {}) self.data = {}.with_indifferent_access data.each do |key, value| self.send("#{key}=", value) end end + def perform(name, data) + data.merge!(fired_at: Time.current) + Reactor::Subscriber.where(event: name).each do |subscriber| + Reactor::Subscriber.delay.fire subscriber.id, data + end + + #TODO: support more matching? + Reactor::Subscriber.where(event: '*').each do |s| + Reactor::Subscriber.delay.fire s.id, data + end + + if (static_subscribers = Reactor::SUBSCRIBERS[name] || []).any? + static_subscribers.each do |callback| + delay = callback[:options].try(:[], :delay) || 0 + case method = callback[:method] + when Symbol + callback[:source].delay_for(delay).send method, Reactor::Event.new(data.merge(event: name)) + else + method.call Reactor::Event.new(data.merge(event: name)) + end + end + end + end + def method_missing(method, *args) if method.to_s.include?('=') try_setter(method, *args) else try_getter(method) @@ -21,61 +46,31 @@ def to_s name end class << self + def perform(name, data) + new.perform(name, data) + end + def publish(name, data = {}) message = new(data.merge(event: name)) if message.at.nil? - delay.process name, message.data + perform_async name, message.data elsif message.at.future? - delay_until(message.at).process name, message.data + perform_at message.at, name, message.data end end def reschedule(name, data = {}) + scheduled_jobs = Sidekiq::ScheduledSet.new job = scheduled_jobs.detect do |job| - job['class'] == name.to_s.camelize && job['at'].to_i == data[:was].to_i + job['args'].first == name.to_s && + job.score == data[:was].to_f end - remove_scheduled_job job if job - delay.publish(name, data.except(:was)) if data[:at].future? - end - - def process(name, data) - data.merge!(fired_at: Time.current) - Reactor::Subscriber.where(event: name.to_s).each do |subscriber| - Reactor::Subscriber.delay.fire subscriber.id, data - end - - #TODO: support more matching? - Reactor::Subscriber.where(event: '*').each do |s| - Reactor::Subscriber.delay.fire s.id, data - end - - if (static_subscribers = Reactor::SUBSCRIBERS[name.to_s] || []).any? - static_subscribers.each do |callback| - delay = callback[:options].try(:[], :delay) || 0 - case method = callback[:method] - when Symbol - callback[:source].delay_for(delay).send method, Reactor::Event.new(data.merge(event: name.to_s)) - else - method.call Reactor::Event.new(data.merge(event: name.to_s)) - end - end - end - end - - def scheduled_jobs(options = {}) - Sidekiq.redis do |r| - from = options[:from] ? options[:from].to_f.to_s : '-inf' - to = options[:to] ? options[:to].to_f.to_s : '+inf' - r.zrangebyscore('schedule', from, to).map{|job| MultiJson.decode(job)} - end - end - - def remove_scheduled_job(job) - Sidekiq.redis { |r| r.zrem 'schedule', MultiJson.encode(job) } + job.delete + publish(name, data.except(:was)) if data[:at].future? end end private