lib/qs/queue.rb in qs-0.3.0 vs lib/qs/queue.rb in qs-0.4.0
- old
+ new
@@ -1,17 +1,19 @@
+require 'qs'
require 'qs/route'
module Qs
class Queue
- attr_reader :routes
+ attr_reader :routes, :event_route_names
attr_reader :enqueued_jobs
def initialize(&block)
- @routes = []
- @enqueued_jobs = []
+ @routes = []
+ @event_route_names = []
+ @enqueued_jobs = []
self.instance_eval(&block) if !block.nil?
raise InvalidError, "a queue must have a name" if self.name.nil?
end
def name(value = nil)
@@ -26,38 +28,69 @@
def job_handler_ns(value = nil)
@job_handler_ns = value if !value.nil?
@job_handler_ns
end
+ def event_handler_ns(value = nil)
+ @event_handler_ns = value if !value.nil?
+ @event_handler_ns
+ end
+
def job(name, handler_name)
if self.job_handler_ns && !(handler_name =~ /^::/)
handler_name = "#{self.job_handler_ns}::#{handler_name}"
end
- @routes.push(Qs::Route.new(name, handler_name))
+ route_id = Message::RouteId.new(Qs::Job::PAYLOAD_TYPE, name)
+ @routes.push(Qs::Route.new(route_id, handler_name))
end
+ def event(channel, name, handler_name)
+ if self.event_handler_ns && !(handler_name =~ /^::/)
+ handler_name = "#{self.event_handler_ns}::#{handler_name}"
+ end
+
+ route_name = Qs::Event::RouteName.new(channel, name)
+ route_id = Qs::Message::RouteId.new(Qs::Event::PAYLOAD_TYPE, route_name)
+
+ @event_route_names.push(route_name)
+ @routes.push(Qs::Route.new(route_id, handler_name))
+ end
+
def enqueue(job_name, params = nil)
Qs.enqueue(self, job_name, params)
end
alias :add :enqueue
+ def sync_subscriptions
+ Qs.sync_subscriptions(self)
+ end
+
+ def clear_subscriptions
+ Qs.clear_subscriptions(self)
+ end
+
+ def published_events
+ self.enqueued_jobs.map(&:event)
+ end
+
def reset!
self.enqueued_jobs.clear
end
def inspect
reference = '0x0%x' % (self.object_id << 1)
"#<#{self.class}:#{reference} " \
"@name=#{self.name.inspect} " \
- "@job_handler_ns=#{self.job_handler_ns.inspect}>"
+ "@job_handler_ns=#{self.job_handler_ns.inspect} " \
+ "@event_handler_ns=#{self.event_handler_ns.inspect}>"
end
InvalidError = Class.new(RuntimeError)
module RedisKey
def self.parse_name(key)
- key.split(':').last
+ key.split(':', 2).last
end
def self.new(name)
"queues:#{name}"
end