lib/belated/queue.rb in belated-0.7.0 vs lib/belated/queue.rb in belated-0.8.0

- old
+ new

@@ -2,36 +2,55 @@ require 'belated/job' require 'belated/logging' require 'belated/job_wrapper' require 'sorted_set' - +require 'pstore' class Belated # Job queues that Belated uses. # queue is the jobs that are currenly # waiting for a worker to start working on them. # future_jobs is a SortedSet of jobs that are going # to be added to queue at some point in the future. class Queue include Logging - attr_accessor :future_jobs + attr_accessor :future_jobs, :future_jobs_db FILE_NAME = 'belated_dump' def initialize(queue: Thread::Queue.new, future_jobs: SortedSet.new) @queue = queue @mutex = Mutex.new self.future_jobs = future_jobs + self.future_jobs_db = PStore.new('future_jobs.pstore', true) # pass true for thread safety end + def enqueue_future_jobs + loop do + job = future_jobs.min + if job.nil? + sleep Belated.heartbeat + next + end + if job.at <= Time.now.to_f + delete_job(job) + push(job) + end + rescue DRb::DRbConnError + error 'DRb connection error!!!!!!' + log stats + end + end + def push(job) if job.is_a?(Symbol) || job.at.nil? || job.at <= Time.now.to_f @queue.push(job) else @mutex.synchronize do @future_jobs << job + insert_into_future_jobs_db(job) unless job.proc_klass end end end def pop @@ -50,20 +69,20 @@ def length @queue.length end def load_jobs - log "reloading... if file exists #{File.exist?(FILE_NAME)}" + future_jobs_db.transaction(true) do + future_jobs_db.roots.each do |id| + future_jobs << future_jobs_db[id] + end + end return unless File.exist?(FILE_NAME) jobs = YAML.load(File.binread(FILE_NAME)) jobs.each do |job| - if job.at && job.at > Time.now.to_f - future_jobs.push(job) - else - @queue.push(job) - end + @queue.push(job) end File.delete(FILE_NAME) end def save_jobs @@ -71,16 +90,10 @@ @queue.length.times do |_i| unless proc_or_shutdown?(klass = @queue.pop) class_array << klass end end - future_jobs.each do |_job| - unless proc_or_shutdown?(klass = future_jobs.pop) - class_array << klass - end - end - pp File.open(FILE_NAME, 'wb') { |f| f.write(YAML.dump(class_array)) } end def connected? true @@ -88,8 +101,21 @@ private def proc_or_shutdown?(job) job.is_a?(Symbol) || job.job.instance_of?(Proc) + end + + def delete_job(job) + log "Deleting #{future_jobs.delete(job)} from future jobs" + future_jobs_db.transaction do + future_jobs_db.delete(job.id) + end + end + + def insert_into_future_jobs_db(job) + future_jobs_db.transaction do + future_jobs_db[job.id] = job + end end end end