Sha256: d01ae5f633fe96b245b53406386b360ca4c1a8bde9444d708e1d6a371b6597cb
Contents?: true
Size: 1.77 KB
Versions: 1
Compression:
Stored size: 1.77 KB
Contents
# frozen_string_literal: true require "litequeue" require "litescheduler" module Litejob # Litejob::Server is responsible for popping job payloads from the SQLite queue. # :nocov: class Server def initialize(queues) @queue = Litequeue.instance @scheduler = Litescheduler.instance @queues = queues # group and order queues according to their priority @prioritized_queues = queues.each_with_object({}) do |(name, priority, spawns), memo| memo[priority] ||= [] memo[priority] << [name, spawns == "spawn"] end.sort_by do |priority, _| -priority end @running = true @sleep_intervals = [0.001, 0.005, 0.025, 0.125, 0.625, 1.0, 2.0] run! end def pop(queue) result = @queue.pop(queue: queue) return result[0] if result.length == 1 return false if result.empty? result end def run! @scheduler.spawn do worker_sleep_index = 0 while @running processed = 0 @prioritized_queues.each do |priority, queues| queues.each do |queue, spawns| batched = 0 while (batched < priority) && (payload = pop(queue)) batched += 1 processed += 1 processor = Processor.new(payload) processor.process! # give other contexts a chance to run here @scheduler.switch end end if processed == 0 sleep @sleep_intervals[worker_sleep_index] worker_sleep_index += 1 if worker_sleep_index < @sleep_intervals.length - 1 else worker_sleep_index = 0 # reset the index end end end end end end # :nocov: end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
litejob-0.2.0 | lib/litejob/server.rb |