Sha256: c1420600eb161299ccb69a62524ac5aab0e97ef78995a83c69e35f71e745f148

Contents?: true

Size: 1.91 KB

Versions: 11

Compression:

Stored size: 1.91 KB

Contents

module ResqueBus
  module Subscriber
    
    def self.included(base)
      base.extend ClassMethods
    end
    
    module ClassMethods

      def application(app_key)
        @app_key = ::ResqueBus::Application.normalize(app_key)
      end
      
      def app_key
        return @app_key if @app_key
        @app_key = ::ResqueBus.default_app_key
        return @app_key if @app_key
        # module or class_name
        val = self.name.to_s.split("::").first
        @app_key = ::ResqueBus::Util.underscore(val)
      end
      
      def subscribe(method_name, matcher_hash = nil)
        queue_name   = ::Resque.queue_from_class(self)
        queue_name ||= ::ResqueBus.default_queue
        queue_name ||= "#{app_key}_default"
        subscribe_queue(queue_name, method_name, matcher_hash)
      end
      
      def subscribe_queue(queue_name, method_name, matcher_hash = nil)
        klass = self
        matcher_hash ||= {"bus_event_type" => method_name}
        sub_key = "#{self.name}.#{method_name}"
        dispatcher = ::ResqueBus.dispatcher_by_key(app_key)
        dispatcher.add_subscription(queue_name, sub_key, klass.name.to_s, matcher_hash, lambda{ |att| klass.perform(att) })
      end
      
      def transform(method_name)
        @transform = method_name
      end
      
      def perform(attributes)
        ResqueBus.with_global_attributes(attributes) do
          sub_key = attributes["bus_rider_sub_key"]
          meth_key = sub_key.split(".").last
          resque_bus_execute(meth_key, attributes)
        end
      end
      
      def resque_bus_execute(key, attributes)
        args = attributes
        args = send(@transform, attributes) if @transform
        args = [args] unless args.is_a?(Array)
        if self.respond_to?(:subscriber_with_attributes)
          me = self.subscriber_with_attributes(attributes)
        else
          me = self.new
        end
        me.send(key, *args)
      end
    end
  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
resque-bus-0.3.7 lib/resque_bus/subscriber.rb
resque-bus-0.3.6 lib/resque_bus/subscriber.rb
resque-bus-0.3.5 lib/resque_bus/subscriber.rb
resque-bus-0.3.4 lib/resque_bus/subscriber.rb
resque-bus-0.3.3 lib/resque_bus/subscriber.rb
resque-bus-0.3.2 lib/resque_bus/subscriber.rb
resque-bus-0.3.1 lib/resque_bus/subscriber.rb
resque-bus-0.3.0 lib/resque_bus/subscriber.rb
resque-bus-0.2.10 lib/resque_bus/subscriber.rb
resque-bus-0.2.9 lib/resque_bus/subscriber.rb
resque-bus-0.2.8 lib/resque_bus/subscriber.rb