Sha256: 6e6d46c9a56302035fc191c8af431e84fb2b307970c048e2f46e850f7abcd8c7

Contents?: true

Size: 1.57 KB

Versions: 15

Compression:

Stored size: 1.57 KB

Contents

require "active_publisher/async/redis_adapter/redis_multi_pop_queue"

module ActivePublisher
  module Async
    module RedisAdapter
      class Consumer
        SUPERVISOR_INTERVAL = {
          :execution_interval => 10, # seconds
          :timeout_interval => 5, # seconds
        }

        attr_reader :consumers, :queue, :supervisor

        def initialize(redis_pool)
          @queue = ::ActivePublisher::Async::RedisAdapter::RedisMultiPopQueue.new(redis_pool, ::ActivePublisher::Async::RedisAdapter::REDIS_LIST_KEY)
          @consumers = {}
          create_and_supervise_consumers!
        end

        def create_and_supervise_consumers!
          ::ActivePublisher.configuration.publisher_threads.times do
            consumer_id = ::SecureRandom.uuid
            consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)

            supervisor_task = ::Concurrent::TimerTask.new(SUPERVISOR_INTERVAL) do
              consumer = consumers[consumer_id]
              unless consumer.alive?
                consumer.kill rescue nil
                consumers[consumer_id] = ::ActivePublisher::Async::InMemoryAdapter::ConsumerThread.new(queue)
                ::ActiveSupport::Notifications.instrument "async_queue.thread_restart"
              end

              # Notify the current queue size.
              ::ActiveSupport::Notifications.instrument "redis_async_queue_size.active_publisher", queue.size
            end

            supervisor_task.execute
          end
        end

        def size
          queue.size
        end
      end
    end
  end
end

Version data entries

15 entries across 15 versions & 1 rubygems

Version Path
active_publisher-1.5.0.pre lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.4.2-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.4.2 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.4.1-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.4.1 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.4.1.pre-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.4.1.pre lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.4.0-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.4.0 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.3.2 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.3.2-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.3.1-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.3.1 lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.3.0-java lib/active_publisher/async/redis_adapter/consumer.rb
active_publisher-1.3.0 lib/active_publisher/async/redis_adapter/consumer.rb