Sha256: 95d8b6da79b0b66e17b02203e42e7606ccf6aef3eaf75f1ef9d272dae52e1587

Contents?: true

Size: 1.35 KB

Versions: 2

Compression:

Stored size: 1.35 KB

Contents

require "active_publisher/async/redis_adapter/redis_multi_pop_queue"

module ActivePublisher
  module Async
    module RedisAdapter
      class Consumer
        SUPERVISOR_INTERVAL = {
          :execution_interval => 10, # seconds
          :timeout_interval => 5, # seconds
        }

        attr_reader :consumer, :queue, :supervisor

        def initialize(redis_pool)
          @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY)
          create_and_supervise_consumer!
        end

        def create_and_supervise_consumer!
          @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)

          supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do
            # This may also be the place to start additional publishers when we are getting backed up ... ?
            unless consumer.alive?
              consumer.kill rescue nil
              @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
            end

            # Notify the current queue size.
            ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size
          end

          supervisor_task.execute
        end

        def size
          consumer.size
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
active_publisher-1.2.0.pre2 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.0.pre2-java lib/active_publisher/async/redis_adapter/consumer.rb