Sha256: 01de9ad340fc9b81f4f78e21d47aa64fa0844e7f23417427ac37ca177eea7792
Contents?: true
Size: 1.98 KB
Versions: 2
Compression:
Stored size: 1.98 KB
Contents
module ActivePublisher module Async module InMemoryAdapter class AsyncQueue include ::ActivePublisher::Logging attr_accessor :drop_messages_when_queue_full, :max_queue_size, :supervisor_interval attr_reader :consumer, :queue, :supervisor def initialize(drop_messages_when_queue_full, max_queue_size, supervisor_interval) @drop_messages_when_queue_full = drop_messages_when_queue_full @max_queue_size = max_queue_size @supervisor_interval = supervisor_interval @queue = ::MultiOpQueue::Queue.new create_and_supervise_consumer! end def push(message) # default of 1_000_000 messages if queue.size > max_queue_size # Drop messages if the queue is full and we were configured to do so return if drop_messages_when_queue_full # By default we will raise an error to push the responsibility onto the caller fail ::ActivePublisher::Async::InMemoryAdapter::UnableToPersistMessageError, "Queue is full, messages will be dropped." end queue.push(message) end def size queue.size end private def create_and_supervise_consumer! @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) @supervisor = ::Thread.new do loop do unless consumer.alive? # We might need to requeue the last messages popped current_consumer_messages = consumer.current_messages queue.concat(current_consumer_messages) unless current_consumer_messages.empty? consumer.kill @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) end # Pause before checking the consumer again. sleep supervisor_interval end end end end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
active_publisher-0.2.0.pre-java | lib/active_publisher/async/in_memory_adapter/async_queue.rb |
active_publisher-0.2.0.pre | lib/active_publisher/async/in_memory_adapter/async_queue.rb |