module Cloudist class Listener attr_reader :job_queue_name, :job_id, :callbacks @@valid_callbacks = ["event", "progress", "reply", "update"] def initialize(job_or_queue_name) @callbacks = {} if job_or_queue_name.is_a?(Cloudist::Job) @job_queue_name = Utils.reply_prefix(job_or_queue_name.payload.headers[:master_queue]) @job_id = job_or_queue_name.id elsif job_or_queue_name.is_a?(String) @job_queue_name = Utils.reply_prefix(job_or_queue_name) @job_id = nil else raise ArgumentError, "Invalid listener type, accepts job queue name or Cloudist::Job instance" end end def subscribe(&block) reply_queue = Cloudist::ReplyQueue.new(job_queue_name) reply_queue.setup(job_id) if job_id self.instance_eval(&block) reply_queue.subscribe do |request| payload = request.payload key = [payload.message_type.to_s, payload.headers[:event]].compact.join(':') if callbacks.has_key?(key) callbacks_to_call = callbacks[key] callbacks_to_call.each do |c| c.call(payload) end end end end def method_missing(meth, *args, &blk) if @@valid_callbacks.include?(meth.to_s) # callback should in format of "event:started" or "progress" key = [meth.to_s, args.shift].compact.join(':') (@callbacks[key] ||= []) << Callback.new(blk) else super end end end end