Sha256: acadd356020ebd0d819eebc10f250c0762a79f75d6be306a54a63a062961a14d

Contents?: true

Size: 1.06 KB

Versions: 1

Compression:

Stored size: 1.06 KB

Contents

require "announce/adapters/base_adapter"

module Announce
  module Adapters
    class InlineAdapter < BaseAdapter
      def self.subscriptions
        @@subscriptions ||= {}
      end

      class BrokerManager < BaseAdapter::BrokerManager
        def configure; end
      end

      class Subscriber < BaseAdapter::Subscriber
        def subscribe(worker_class, subject, actions, options)
          Array(actions).each do |action|
            queue_name = Queue.name_for(subject, action)
            InlineAdapter.subscriptions[queue_name] = worker_class
          end
        end
      end

      class Topic < BaseAdapter::Topic
        def publish(message, options = {})
          queue_name = Queue.name_for(subject, action)
          worker_class = InlineAdapter.subscriptions[queue_name]
          if defined?(::ActiveJob)
            job = worker_class.new(message)
            ::ActiveJob::Base.execute(job.serialize)
          else
            worker_class.new.perform(message)
          end
        end
      end

      class Queue < BaseAdapter::Queue; end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
announce-0.3.0 lib/announce/adapters/inline_adapter.rb