Sha256: a0362d56d18031a9a07d5eee5fce1ed40f2c194ce592a5873c9b498b3e5f1b77
Contents?: true
Size: 1.95 KB
Versions: 1
Compression:
Stored size: 1.95 KB
Contents
# frozen_string_literal: true require 'belated/job' require 'belated/logging' require 'belated/job_wrapper' require 'sorted_set' class Belated # Job queues that Belated uses. # queue is the jobs that are currenly # waiting for a worker to start working on them. # future_jobs is a SortedSet of jobs that are going # to be added to queue at some point in the future. class Queue include Logging attr_accessor :future_jobs FILE_NAME = 'belated_dump' def initialize(queue: Thread::Queue.new, future_jobs: SortedSet.new) @queue = queue @mutex = Mutex.new self.future_jobs = future_jobs end def push(job) if job.is_a?(Symbol) || job.at.nil? || job.at <= Time.now.utc @queue.push(job) else @mutex.synchronize do @future_jobs << job end end end def pop @queue.pop end def clear @queue.clear self.future_jobs = [] end def empty? @queue.empty? end def length @queue.length end def load_jobs log "reloading... if file exists #{File.exist?(FILE_NAME)}" return unless File.exist?(FILE_NAME) jobs = YAML.load(File.binread(FILE_NAME)) jobs.each do |job| if job.at && job.at > Time.now.utc future_jobs.push(job) else @queue.push(job) end end File.delete(FILE_NAME) end def save_jobs class_array = [] @queue.length.times do |_i| unless proc_or_shutdown?(klass = @queue.pop) class_array << klass end end future_jobs.each do |_job| unless proc_or_shutdown?(klass = future_jobs.pop) class_array << klass end end pp File.open(FILE_NAME, 'wb') { |f| f.write(YAML.dump(class_array)) } end def connected? true end private def proc_or_shutdown?(job) job.is_a?(Symbol) || job.job.instance_of?(Proc) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
belated-0.6.7 | lib/belated/queue.rb |