Sha256: 0b6478737080be6545e13a3304aabec1be4077a5454909e4e8a27513ac64c994

Contents?: true

Size: 1.07 KB

Versions: 6

Compression:

Stored size: 1.07 KB

Contents

require 'announce/adapters/base_adapter'

module Announce
  module Adapters
    class InlineAdapter < BaseAdapter

      def self.subscriptions
        @@subscriptions ||= {}
      end

      class BrokerManager < BaseAdapter::BrokerManager
        def configure; end
      end

      class Subscriber < BaseAdapter::Subscriber
        def subscribe(worker_class, subject, actions, options)
          Array(actions).each do |action|
            queue_name = Queue.name_for(subject, action)
            InlineAdapter.subscriptions[queue_name] = worker_class
          end
        end
      end

      class Topic < BaseAdapter::Topic
        def publish(message, options = {})
          queue_name = Queue.name_for(subject, action)
          worker_class = InlineAdapter.subscriptions[queue_name]
          if defined?(::ActiveJob)
            job = worker_class.new(message)
            ::ActiveJob::Base.execute(job.serialize)
          else
            worker_class.new.perform(message)
          end
        end
      end

      class Queue < BaseAdapter::Queue
      end
    end
  end
end

Version data entries

6 entries across 6 versions & 1 rubygems

Version Path
announce-0.2.3 lib/announce/adapters/inline_adapter.rb
announce-0.2.2 lib/announce/adapters/inline_adapter.rb
announce-0.2.1 lib/announce/adapters/inline_adapter.rb
announce-0.2.0 lib/announce/adapters/inline_adapter.rb
announce-0.1.1 lib/announce/adapters/inline_adapter.rb
announce-0.1.0 lib/announce/adapters/inline_adapter.rb