lib/reactor/event.rb in reactor-0.3.2 vs lib/reactor/event.rb in reactor-0.4.0
- old
+ new
@@ -20,68 +20,66 @@
def to_s
name
end
- def self.publish(name, data = {})
- message = new(data.merge(event: name))
- #if (message.at)
- # delay_until(message.at).process name, message.data
- #else
- # delay.process name, message.data
- #end
+ class << self
+ def publish(name, data = {})
+ message = new(data.merge(event: name))
- if message.at.nil?
- delay.process name, message.data
- elsif message.at.future?
- delay_until(message.at).process name, message.data
+ if message.at.nil?
+ delay.process name, message.data
+ elsif message.at.future?
+ delay_until(message.at).process name, message.data
+ end
end
- end
- def self.reschedule(name, data = {})
- job = scheduled_jobs.detect do |job|
- job['class'] == name.to_s.camelize && job['at'].to_i == data[:was].to_i
+ def reschedule(name, data = {})
+ job = scheduled_jobs.detect do |job|
+ job['class'] == name.to_s.camelize && job['at'].to_i == data[:was].to_i
+ end
+ remove_scheduled_job job if job
+ delay.publish(name, data.except(:was)) if data[:at].future?
end
- remove_scheduled_job job if job
- delay.publish(name, data.except(:was)) if data[:at].future?
- end
- def self.process(name, data)
- # fire database listeners
- Reactor::Subscriber.where(event: name.to_s).each do |subscriber|
- Reactor::Subscriber.delay.fire subscriber.id, data
- end
+ def process(name, data)
+ data.merge!(fired_at: Time.current)
+ Reactor::Subscriber.where(event: name.to_s).each do |subscriber|
+ Reactor::Subscriber.delay.fire subscriber.id, data
+ end
- #TODO: support more matching?
- Reactor::Subscriber.where(event: '*').each do |s|
- Reactor::Subscriber.delay.fire s.id, data
- end
+ #TODO: support more matching?
+ Reactor::Subscriber.where(event: '*').each do |s|
+ Reactor::Subscriber.delay.fire s.id, data
+ end
- if (static_subscribers = Reactor::SUBSCRIBERS[name.to_s] || []).any?
- static_subscribers.each do |callback|
- case callback
- when Hash
- callback.keys.first.send callback.values.first, Reactor::Event.new(data.merge(event: name.to_s))
- else
- callback.call Reactor::Event.new(data.merge(event: name.to_s))
+ if (static_subscribers = Reactor::SUBSCRIBERS[name.to_s] || []).any?
+ static_subscribers.each do |callback|
+ delay = callback[:options].try(:[], :delay) || 0
+ case method = callback[:method]
+ when Symbol
+ callback[:source].delay_for(delay).send method, Reactor::Event.new(data.merge(event: name.to_s))
+ else
+ method.call Reactor::Event.new(data.merge(event: name.to_s))
+ end
end
end
end
- end
- private
+ def scheduled_jobs(options = {})
+ Sidekiq.redis do |r|
+ from = options[:from] ? options[:from].to_f.to_s : '-inf'
+ to = options[:to] ? options[:to].to_f.to_s : '+inf'
+ r.zrangebyscore('schedule', from, to).map{|job| MultiJson.decode(job)}
+ end
+ end
- def self.scheduled_jobs(options = {})
- Sidekiq.redis do |r|
- from = options[:from] ? options[:from].to_f.to_s : '-inf'
- to = options[:to] ? options[:to].to_f.to_s : '+inf'
- r.zrangebyscore('schedule', from, to).map{|job| MultiJson.decode(job)}
+ def remove_scheduled_job(job)
+ Sidekiq.redis { |r| r.zrem 'schedule', MultiJson.encode(job) }
end
end
- def self.remove_scheduled_job(job)
- Sidekiq.redis { |r| r.zrem 'schedule', MultiJson.encode(job) }
- end
+ private
def try_setter(method, object, *args)
if object.is_a? ActiveRecord::Base
send("#{method}_id", object.id)
send("#{method}_type", object.class.to_s)