Sha256: eb5d0aa094ada3c2f80398b9fe147ea8455f20374567e6b34be7f8662dec2331

Contents?: true

Size: 1.3 KB

Versions: 4

Compression:

Stored size: 1.3 KB

Contents

require "active_publisher/async/redis_adapter/redis_multi_pop_queue"

module ActivePublisher
  module Async
    module RedisAdapter
      class Consumer
        SUPERVISOR_INTERVAL = {
          :execution_interval => 10, # seconds
          :timeout_interval => 5, # seconds
        }

        attr_reader :consumer, :queue, :supervisor

        def initialize(redis_pool)
          @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY)
          create_and_supervise_consumer!
        end

        def create_and_supervise_consumer!
          @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)

          supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do
            # This may also be the place to start additional publishers when we are getting backed up ... ?
            unless consumer.alive?
              consumer.kill rescue nil
              @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
            end

            # Notify the current queue size.
            ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size
          end

          supervisor_task.execute
        end
      end

    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
active_publisher-1.2.0.pre1-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.0.pre1 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.0.pre-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.0.pre lib/active_publisher/async/redis_adapter/consumer.rb