Sha256: 79c960505422009b401c7baf70bb042aaab872f1c52cad8447446913c9691429

Contents?: true

Size: 1.89 KB

Versions: 1

Compression:

Stored size: 1.89 KB

Contents

# frozen_string_literal: true

require "litequeue"
require "litescheduler"
require_relative "processor"

module Litejob
  # Litejob::Server is responsible for popping job payloads from the SQLite queue.
  # :nocov:
  class Server
    # TODO: make queues use [["default", 1]]
    # TODO: make queue priorities optional
    def initialize(queues)
      @queue = Litequeue.instance
      @scheduler = Litescheduler.instance
      @queues = queues
      # group and order queues according to their priority
      @prioritized_queues = queues.each_with_object({}) do |(name, priority, spawns), memo|
        memo[priority] ||= []
        memo[priority] << [name, spawns == "spawn"]
      end.sort_by do |priority, _|
        -priority
      end
      @running = true
      @sleep_intervals = [0.001, 0.005, 0.025, 0.125, 0.625, 1.0, 2.0]
      run!
    end

    def pop(queue)
      result = @queue.pop(queue: queue)

      return result unless result.is_a?(Array)
      return false if result.empty?

      result
    end

    def run!
      @scheduler.spawn do
        worker_sleep_index = 0
        while @running
          processed = 0
          @prioritized_queues.each do |priority, queues|
            queues.each do |queue, spawns|
              batched = 0
              while (batched < priority) && (payload = pop(queue))
                batched += 1
                processed += 1

                processor = Processor.new(payload)
                processor.process!

                # give other contexts a chance to run here
                @scheduler.switch
              end
            end

            if processed == 0
              sleep @sleep_intervals[worker_sleep_index]
              worker_sleep_index += 1 if worker_sleep_index < @sleep_intervals.length - 1
            else
              worker_sleep_index = 0 # reset the index
            end
          end
        end
      end
    end
  end
  # :nocov:
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
litejob-0.2.1 lib/litejob/server.rb