Sha256: e6ed2b769b73cf12f31b9ad9c94e8879d670cf6e95869693847ba9e512133f73

Contents?: true

Size: 1.98 KB

Versions: 8

Compression:

Stored size: 1.98 KB

Contents

# frozen_string_literal: true

module QueueBus
  # A Subscription is the destination of an event.
  #
  # The subscription can be stored in redis but should only be executed on ruby processes that
  # have the application loaded. In general, this is controlled by having the background workers
  # listen to specific (and discrete) queues.
  class Subscription
    class << self
      def register(queue, key, class_name, matcher, block)
        Subscription.new(queue, key, class_name, matcher, block)
      end

      def from_redis(hash)
        queue_name = hash['queue_name'].to_s
        key        = hash['key'].to_s
        class_name = hash['class'].to_s
        matcher    = hash['matcher']
        return nil if key.empty? || queue_name.empty?

        Subscription.new(queue_name, key, class_name, matcher, nil)
      end

      def normalize(val)
        val.to_s.gsub(/\W/, '_').downcase
      end
    end

    attr_reader :matcher, :executor, :queue_name, :key, :class_name
    attr_accessor :app_key # dyanmically set on return from subscription_matches

    def initialize(queue_name, key, class_name, filters, executor = nil)
      @queue_name = self.class.normalize(queue_name)
      @key        = key.to_s
      @class_name = class_name.to_s
      @matcher    = Matcher.new(filters)
      @executor   = executor
    end

    # Executes the subscription. If this is run on a server/ruby process that did not subscribe
    # it will error as there will not be a proc.
    def execute!(attributes)
      if attributes.respond_to?(:with_indifferent_access)
        attributes = attributes.with_indifferent_access
      end
      ::QueueBus.with_global_attributes(attributes) do
        executor.call(attributes)
      end
    end

    def matches?(attributes)
      @matcher.matches?(attributes)
    end

    def to_redis
      out = {}
      out['queue_name'] = queue_name
      out['key']        = key
      out['class']      = class_name
      out['matcher']    = matcher.to_redis
      out
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
queue-bus-0.13.3 lib/queue_bus/subscription.rb
queue-bus-0.13.2 lib/queue_bus/subscription.rb
queue-bus-0.13.1 lib/queue_bus/subscription.rb
queue-bus-0.13.0 lib/queue_bus/subscription.rb
queue-bus-0.12.0 lib/queue_bus/subscription.rb
queue-bus-0.11.0 lib/queue_bus/subscription.rb
queue-bus-0.10.0 lib/queue_bus/subscription.rb
queue-bus-0.9.1 lib/queue_bus/subscription.rb