Sha256: 616647cdcd8ebea7e4f165e391290d22cbe2ed3d8bcf75f1e933802dd7ab4bdd

Contents?: true

Size: 1.8 KB

Versions: 12

Compression:

Stored size: 1.8 KB

Contents

require "active_publisher/message"
require "active_publisher/async/in_memory_adapter/async_queue"
require "active_publisher/async/in_memory_adapter/consumer_thread"
require "multi_op_queue"

module ActivePublisher
  module Async
    module InMemoryAdapter

      def self.new(*args)
        ::ActivePublisher::Async::InMemoryAdapter::Adapter.new(*args)
      end

      class UnableToPersistMessageError < ::StandardError; end

      class Adapter
        include ::ActivePublisher::Logging

        attr_reader :async_queue

        def initialize(drop_messages_when_queue_full = false, max_queue_size = 1_000_000, supervisor_interval = 0.2)
          logger.info "Starting in-memory publisher adapter"

          @async_queue = ::ActivePublisher::Async::InMemoryAdapter::AsyncQueue.new(
            drop_messages_when_queue_full,
            max_queue_size,
            supervisor_interval
          )
        end

        def publish(route, payload, exchange_name, options = {})
          message = ::ActivePublisher::Message.new(route, payload, exchange_name, options)
          async_queue.push(message)
          nil
        end

        def shutdown!
          max_wait_time = ::ActivePublisher.configuration.seconds_to_wait_for_graceful_shutdown
          started_shutting_down_at = ::Time.now

          logger.info "Draining async publisher in-memory adapter queue before shutdown. current queue size: #{async_queue.size}."
          while async_queue.size > 0
            if (::Time.now - started_shutting_down_at) > max_wait_time
              logger.info "Forcing async publisher adapter shutdown because graceful shutdown period of #{max_wait_time} seconds was exceeded. Current queue size: #{async_queue.size}."
              break
            end

            sleep 0.1
          end
        end

      end
    end
  end
end

Version data entries

12 entries across 12 versions & 1 rubygems

Version Path
active_publisher-0.4.0 lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.4.0-java lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.4.0.pre1-java lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.4.0.pre1 lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.3.0-java lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.3.0 lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.2.1-java lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.2.1 lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.2.1.pre.pre1-java lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.2.1.pre.pre1 lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.2.0.pre-java lib/active_publisher/async/in_memory_adapter.rb
active_publisher-0.2.0.pre lib/active_publisher/async/in_memory_adapter.rb