Sha256: 9a85fabad73133f2d10cf70839c4275a4d03f2e32a4495036bce27278da7d8be

Contents?: true

Size: 1.54 KB

Versions: 2

Compression:

Stored size: 1.54 KB

Contents

module Cloudist
  class Listener
    
    attr_reader :job_queue_name, :job_id, :callbacks
    
    @@valid_callbacks = ["event", "progress", "reply", "update"]
    
    def initialize(job_or_queue_name)
      @callbacks = {}
      
      if job_or_queue_name.is_a?(Cloudist::Job)
        @job_queue_name = Utils.reply_prefix(job_or_queue_name.payload.headers[:master_queue])
        @job_id = job_or_queue_name.id
      elsif job_or_queue_name.is_a?(String)
        @job_queue_name = Utils.reply_prefix(job_or_queue_name)
        @job_id = nil
      else
        raise ArgumentError, "Invalid listener type, accepts job queue name or Cloudist::Job instance"
      end
    end
    
    def subscribe(&block)
      reply_queue = Cloudist::ReplyQueue.new(job_queue_name)
      reply_queue.setup(job_id) if job_id
      
      self.instance_eval(&block)
      
      reply_queue.subscribe do |request|
        payload = request.payload
        
        key = [payload.message_type.to_s, payload.headers[:event]].compact.join(':')
        
        if callbacks.has_key?(key)
          callbacks_to_call = callbacks[key]
          callbacks_to_call.each do |c|
            c.call(payload)
          end
        end
      end
    end
    
    def method_missing(meth, *args, &blk)
      if @@valid_callbacks.include?(meth.to_s)
        
        # callback should in format of "event:started" or "progress"
        key = [meth.to_s, args.shift].compact.join(':')
        
        (@callbacks[key] ||= []) << Callback.new(blk)
      else
        super
      end
    end
    
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
cloudist-0.1.1 lib/cloudist/listener.rb
cloudist-0.1.0 lib/cloudist/listener.rb