Sha256: fad2c80594efe6231a513846227cc964cf8fbf662b63e33f225990e15ec6955b
Contents?: true
Size: 1.1 KB
Versions: 1
Compression:
Stored size: 1.1 KB
Contents
# frozen_string_literal: true require 'singleton' module Quiq class Scheduler include Singleton SCHEDULER_KEY = 'quiq:schedule' def start # Set the process name Process.setproctitle('quiq scheduler') Async do loop do sleep 0.2 # TODO: use ZRANGEBYSCORE instead to batch enqueuing job, scheduled_at = Quiq.redis.zrange( SCHEDULER_KEY, 0, 0, with_scores: true ) enqueue(job) if job && scheduled_at.to_f <= Time.now.to_f end ensure Quiq.redis.close end end def self.enqueue_at(job, scheduled_at) Quiq.redis.zadd(SCHEDULER_KEY, scheduled_at, job) end private # Push the job in its queue and remove from scheduler_queue def enqueue(job) begin payload = JSON.parse(job) rescue JSON::ParserError => e Quiq.logger.warn("Invalid format: #{e}") Queue.send_to_dlq(job) end # TODO: wrap those 2 calls in a transaction Queue.push(payload['queue_name'], job) Quiq.redis.zrem(SCHEDULER_KEY, job) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
quiq-0.2.0 | lib/quiq/scheduler.rb |