Sha256: 8a04d6fc53744e08c87b5d2585542c02992e87850d6757ec3cc9b01b7fe50fa4

Contents?: true

Size: 629 Bytes

Versions: 1

Compression:

Stored size: 629 Bytes

Contents

# frozen_string_literal: true

require 'singleton'
require 'async/container'
require 'async/redis'
require_relative 'worker'

module Quiq
  class Server < Async::Container::Controller
    include Singleton

    # Called by Server.instance.run
    def setup(container)
      @queues = Quiq.queues.map { |q| "queue:#{q}" }

      container.async do
        loop do
          job = fetch_one
          Worker.new(job).run
        end
      ensure
        Quiq.redis.close
      end
    end

    def fetch_one
      # BRPOP returns a tuple made of the queue name then the args
      Quiq.redis.brpop(*@queues).last
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
quiq-0.1.0 lib/quiq/server.rb