Sha256: eb5d0aa094ada3c2f80398b9fe147ea8455f20374567e6b34be7f8662dec2331
Contents?: true
Size: 1.3 KB
Versions: 4
Compression:
Stored size: 1.3 KB
Contents
require "active_publisher/async/redis_adapter/redis_multi_pop_queue" module ActivePublisher module Async module RedisAdapter class Consumer SUPERVISOR_INTERVAL = { :execution_interval => 10, # seconds :timeout_interval => 5, # seconds } attr_reader :consumer, :queue, :supervisor def initialize(redis_pool) @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY) create_and_supervise_consumer! end def create_and_supervise_consumer! @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do # This may also be the place to start additional publishers when we are getting backed up ... ? unless consumer.alive? consumer.kill rescue nil @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) end # Notify the current queue size. ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size end supervisor_task.execute end end end end end
Version data entries
4 entries across 4 versions & 1 rubygems