Sha256: 6e6d46c9a56302035fc191c8af431e84fb2b307970c048e2f46e850f7abcd8c7
Contents?: true
Size: 1.57 KB
Versions: 15
Compression:
Stored size: 1.57 KB
Contents
require "active_publisher/async/redis_adapter/redis_multi_pop_queue" module ActivePublisher module Async module RedisAdapter class Consumer SUPERVISOR_INTERVAL = { :execution_interval => 10, # seconds :timeout_interval => 5, # seconds } attr_reader :consumers, :queue, :supervisor def initialize(redis_pool) @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY) @consumers = {} create_and_supervise_consumers! end def create_and_supervise_consumers! ::ActivePublisher.configuration.publisher_threads.times do consumer_id = ::SecureRandom.uuid consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do consumer = consumers[consumer_id] unless consumer.alive? consumer.kill rescue nil consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) ::ActiveSupport::Notifications.instrument "async_queue.thread_restart" end # Notify the current queue size. ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size end supervisor_task.execute end end def size queue.size end end end end end
Version data entries
15 entries across 15 versions & 1 rubygems