Sha256: ca6e45ecb9ecdb64208928aa992ed60761c237153f0c971810cbdf119bfebfc1
Contents?: true
Size: 1.35 KB
Versions: 26
Compression:
Stored size: 1.35 KB
Contents
require "active_publisher/async/redis_adapter/redis_multi_pop_queue" module ActivePublisher module Async module RedisAdapter class Consumer SUPERVISOR_INTERVAL = { :execution_interval => 10, # seconds :timeout_interval => 5, # seconds } attr_reader :consumer, :queue, :supervisor def initialize(redis_pool) @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY) create_and_supervise_consumer! end def create_and_supervise_consumer! @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do # This may also be the place to start additional publishers when we are getting backed up ... ? unless consumer.alive? consumer.kill rescue nil @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue) end # Notify the current queue size. ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size end supervisor_task.execute end def size queue.size end end end end end
Version data entries
26 entries across 26 versions & 1 rubygems