Sha256: 9a85fabad73133f2d10cf70839c4275a4d03f2e32a4495036bce27278da7d8be
Contents?: true
Size: 1.54 KB
Versions: 2
Compression:
Stored size: 1.54 KB
Contents
module Cloudist class Listener attr_reader :job_queue_name, :job_id, :callbacks @@valid_callbacks = ["event", "progress", "reply", "update"] def initialize(job_or_queue_name) @callbacks = {} if job_or_queue_name.is_a?(Cloudist::Job) @job_queue_name = Utils.reply_prefix(job_or_queue_name.payload.headers[:master_queue]) @job_id = job_or_queue_name.id elsif job_or_queue_name.is_a?(String) @job_queue_name = Utils.reply_prefix(job_or_queue_name) @job_id = nil else raise ArgumentError, "Invalid listener type, accepts job queue name or Cloudist::Job instance" end end def subscribe(&block) reply_queue = Cloudist::ReplyQueue.new(job_queue_name) reply_queue.setup(job_id) if job_id self.instance_eval(&block) reply_queue.subscribe do |request| payload = request.payload key = [payload.message_type.to_s, payload.headers[:event]].compact.join(':') if callbacks.has_key?(key) callbacks_to_call = callbacks[key] callbacks_to_call.each do |c| c.call(payload) end end end end def method_missing(meth, *args, &blk) if @@valid_callbacks.include?(meth.to_s) # callback should in format of "event:started" or "progress" key = [meth.to_s, args.shift].compact.join(':') (@callbacks[key] ||= []) << Callback.new(blk) else super end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
cloudist-0.1.1 | lib/cloudist/listener.rb |
cloudist-0.1.0 | lib/cloudist/listener.rb |