Sha256: 01de9ad340fc9b81f4f78e21d47aa64fa0844e7f23417427ac37ca177eea7792

Contents?: true

Size: 1.98 KB

Versions: 2

Compression:

Stored size: 1.98 KB

Contents

module ActivePublisher
  module Async
    module InMemoryAdapter
      class AsyncQueue
        include ::ActivePublisher::Logging

        attr_accessor :drop_messages_when_queue_full,
          :max_queue_size,
          :supervisor_interval

        attr_reader :consumer, :queue, :supervisor

        def initialize(drop_messages_when_queue_full, max_queue_size, supervisor_interval)
          @drop_messages_when_queue_full = drop_messages_when_queue_full
          @max_queue_size = max_queue_size
          @supervisor_interval = supervisor_interval
          @queue = ::MultiOpQueue::Queue.new
          create_and_supervise_consumer!
        end

        def push(message)
          # default of 1_000_000 messages
          if queue.size > max_queue_size
            # Drop messages if the queue is full and we were configured to do so
            return if drop_messages_when_queue_full

            # By default we will raise an error to push the responsibility onto the caller
            fail ::ActivePublisher::Async::InMemoryAdapter::UnableToPersistMessageError, "Queue is full, messages will be dropped."
          end

          queue.push(message)
        end

        def size
          queue.size
        end

        private

        def create_and_supervise_consumer!
          @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
          @supervisor = ::Thread.new do
            loop do
              unless consumer.alive?
                # We might need to requeue the last messages popped
                current_consumer_messages = consumer.current_messages
                queue.concat(current_consumer_messages) unless current_consumer_messages.empty?
                consumer.kill
                @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
              end

              # Pause before checking the consumer again.
              sleep supervisor_interval
            end
          end
        end
      end

    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
active_publisher-0.2.0.pre-java lib/active_publisher/async/in_memory_adapter/async_queue.rb
active_publisher-0.2.0.pre lib/active_publisher/async/in_memory_adapter/async_queue.rb