lib/reactor/event.rb in reactor-0.4.0 vs lib/reactor/event.rb in reactor-0.4.2
- old
+ new
@@ -1,17 +1,42 @@
class Reactor::Event
include Reactor::OptionallySubclassable
+ include Sidekiq::Worker
attr_accessor :data
def initialize(data = {})
self.data = {}.with_indifferent_access
data.each do |key, value|
self.send("#{key}=", value)
end
end
+ def perform(name, data)
+ data.merge!(fired_at: Time.current)
+ Reactor::Subscriber.where(event: name).each do |subscriber|
+ Reactor::Subscriber.delay.fire subscriber.id, data
+ end
+
+ #TODO: support more matching?
+ Reactor::Subscriber.where(event: '*').each do |s|
+ Reactor::Subscriber.delay.fire s.id, data
+ end
+
+ if (static_subscribers = Reactor::SUBSCRIBERS[name] || []).any?
+ static_subscribers.each do |callback|
+ delay = callback[:options].try(:[], :delay) || 0
+ case method = callback[:method]
+ when Symbol
+ callback[:source].delay_for(delay).send method, Reactor::Event.new(data.merge(event: name))
+ else
+ method.call Reactor::Event.new(data.merge(event: name))
+ end
+ end
+ end
+ end
+
def method_missing(method, *args)
if method.to_s.include?('=')
try_setter(method, *args)
else
try_getter(method)
@@ -21,61 +46,31 @@
def to_s
name
end
class << self
+ def perform(name, data)
+ new.perform(name, data)
+ end
+
def publish(name, data = {})
message = new(data.merge(event: name))
if message.at.nil?
- delay.process name, message.data
+ perform_async name, message.data
elsif message.at.future?
- delay_until(message.at).process name, message.data
+ perform_at message.at, name, message.data
end
end
def reschedule(name, data = {})
+ scheduled_jobs = Sidekiq::ScheduledSet.new
job = scheduled_jobs.detect do |job|
- job['class'] == name.to_s.camelize && job['at'].to_i == data[:was].to_i
+ job['args'].first == name.to_s &&
+ job.score == data[:was].to_f
end
- remove_scheduled_job job if job
- delay.publish(name, data.except(:was)) if data[:at].future?
- end
-
- def process(name, data)
- data.merge!(fired_at: Time.current)
- Reactor::Subscriber.where(event: name.to_s).each do |subscriber|
- Reactor::Subscriber.delay.fire subscriber.id, data
- end
-
- #TODO: support more matching?
- Reactor::Subscriber.where(event: '*').each do |s|
- Reactor::Subscriber.delay.fire s.id, data
- end
-
- if (static_subscribers = Reactor::SUBSCRIBERS[name.to_s] || []).any?
- static_subscribers.each do |callback|
- delay = callback[:options].try(:[], :delay) || 0
- case method = callback[:method]
- when Symbol
- callback[:source].delay_for(delay).send method, Reactor::Event.new(data.merge(event: name.to_s))
- else
- method.call Reactor::Event.new(data.merge(event: name.to_s))
- end
- end
- end
- end
-
- def scheduled_jobs(options = {})
- Sidekiq.redis do |r|
- from = options[:from] ? options[:from].to_f.to_s : '-inf'
- to = options[:to] ? options[:to].to_f.to_s : '+inf'
- r.zrangebyscore('schedule', from, to).map{|job| MultiJson.decode(job)}
- end
- end
-
- def remove_scheduled_job(job)
- Sidekiq.redis { |r| r.zrem 'schedule', MultiJson.encode(job) }
+ job.delete
+ publish(name, data.except(:was)) if data[:at].future?
end
end
private