Sha256: ffa0ed53e72a6ee4f8dac965a1e058804a8036ebab203c04e426a698beb4b522

Contents?: true

Size: 1.56 KB

Versions: 8

Compression:

Stored size: 1.56 KB

Contents

# frozen_string_literal: true

module QueueBus
  # only process local queues
  class Local
    class << self
      def publish(attributes = {})
        if ::QueueBus.local_mode == :suppress
          ::QueueBus.log_worker("Suppressed: #{attributes.inspect}")
          return # not doing anything
        end

        # To json and back to simlulate enqueueing
        json = ::QueueBus::Util.encode(attributes)
        attributes = ::QueueBus::Util.decode(json)

        ::QueueBus.log_worker("Local running: #{attributes.inspect}")

        # looking for subscriptions, not queues
        subscription_matches(attributes).each do |sub|
          bus_attr = {  'bus_driven_at' => Time.now.to_i,
                        'bus_rider_queue' => sub.queue_name,
                        'bus_rider_app_key' => sub.app_key,
                        'bus_rider_sub_key' => sub.key,
                        'bus_rider_class_name' => sub.class_name }
          to_publish = bus_attr.merge(attributes || {})
          if ::QueueBus.local_mode == :standalone
            ::QueueBus.enqueue_to(sub.queue_name, sub.class_name, bus_attr.merge(attributes || {}))
          else # defaults to inline mode
            sub.execute!(to_publish)
          end
        end
      end

      # looking directly at subscriptions loaded into dispatcher
      # so we don't need redis server up
      def subscription_matches(attributes)
        out = []
        ::QueueBus.dispatchers.each do |dispatcher|
          out.concat(dispatcher.subscription_matches(attributes))
        end
        out
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
queue-bus-0.13.3 lib/queue_bus/local.rb
queue-bus-0.13.2 lib/queue_bus/local.rb
queue-bus-0.13.1 lib/queue_bus/local.rb
queue-bus-0.13.0 lib/queue_bus/local.rb
queue-bus-0.12.0 lib/queue_bus/local.rb
queue-bus-0.11.0 lib/queue_bus/local.rb
queue-bus-0.10.0 lib/queue_bus/local.rb
queue-bus-0.9.1 lib/queue_bus/local.rb