Sha256: ca6e45ecb9ecdb64208928aa992ed60761c237153f0c971810cbdf119bfebfc1

Contents?: true

Size: 1.35 KB

Versions: 26

Compression:

Stored size: 1.35 KB

Contents

require "active_publisher/async/redis_adapter/redis_multi_pop_queue"

module ActivePublisher
  module Async
    module RedisAdapter
      class Consumer
        SUPERVISOR_INTERVAL = {
          :execution_interval => 10, # seconds
          :timeout_interval => 5, # seconds
        }

        attr_reader :consumer, :queue, :supervisor

        def initialize(redis_pool)
          @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY)
          create_and_supervise_consumer!
        end

        def create_and_supervise_consumer!
          @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)

          supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do
            # This may also be the place to start additional publishers when we are getting backed up ... ?
            unless consumer.alive?
              consumer.kill rescue nil
              @consumer = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
            end

            # Notify the current queue size.
            ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size
          end

          supervisor_task.execute
        end

        def size
          queue.size
        end
      end
    end
  end
end

Version data entries

26 entries across 26 versions & 1 rubygems

Version Path
active_publisher-1.2.6-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.6 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.5-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.5 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.4-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.4 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.3-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.3 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.2-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.2 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.1-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.1 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.3.0.pre0-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.3.0.pre0 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.0-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.0 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.0.pre7-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.0.pre7 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.0.pre6-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.2.0.pre6 lib/active_publisher/async/redis_adapter/consumer.rb