Sha256: 09bc1f685ea8efd74ca6945ad6b6e3d63f0edd89c588936aafa256729f92f600

Contents?: true

Size: 1.96 KB

Versions: 1

Compression:

Stored size: 1.96 KB

Contents

# frozen_string_literal: true

require "litequeue"
require "litescheduler"
require_relative "processor"

module Litejob
  # Litejob::Server is responsible for popping job payloads from the SQLite queue.
  class Server
    def initialize(queues = ["default"])
      @queue = Litequeue.instance
      @scheduler = Litescheduler.instance
      @queues = queues
      # group and order queues according to their priority
      @prioritized_queues = queues.each_with_object({}) do |(name, priority, spawns), memo|
        priority ||= 5
        memo[priority] ||= []
        memo[priority] << [name, spawns == "spawn"]
      end.sort_by do |priority, _|
        -priority
      end
      @running = true
      @sleep_intervals = [0.001, 0.005, 0.025, 0.125, 0.625, 1.0, 2.0]
      run!
    end

    def pop(queue)
      result = @queue.pop(queue: queue)

      return result unless result.is_a?(Array)
      return false if result.empty?

      result
    end

    def run!
      @scheduler.spawn do
        Litejob.logger.info("[litejob]:[RUN] id=#{@scheduler.context.object_id}")
        worker_sleep_index = 0
        while @running
          processed = 0
          @prioritized_queues.each do |priority, queues|
            queues.each do |queue, spawns|
              batched = 0
              while (batched < priority) && (payload = pop(queue))
                batched += 1
                processed += 1

                id, serialized_job = payload
                processor = Processor.new(queue, id, serialized_job)
                processor.process!

                # give other contexts a chance to run here
                @scheduler.switch
              end
            end

            if processed == 0
              sleep @sleep_intervals[worker_sleep_index]
              worker_sleep_index += 1 if worker_sleep_index < @sleep_intervals.length - 1
            else
              worker_sleep_index = 0 # reset the index
            end
          end
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
litejob-0.2.3 lib/litejob/server.rb