lib/belated/queue.rb in belated-0.7.0 vs lib/belated/queue.rb in belated-0.8.0
- old
+ new
@@ -2,36 +2,55 @@
require 'belated/job'
require 'belated/logging'
require 'belated/job_wrapper'
require 'sorted_set'
-
+require 'pstore'
class Belated
# Job queues that Belated uses.
# queue is the jobs that are currenly
# waiting for a worker to start working on them.
# future_jobs is a SortedSet of jobs that are going
# to be added to queue at some point in the future.
class Queue
include Logging
- attr_accessor :future_jobs
+ attr_accessor :future_jobs, :future_jobs_db
FILE_NAME = 'belated_dump'
def initialize(queue: Thread::Queue.new, future_jobs: SortedSet.new)
@queue = queue
@mutex = Mutex.new
self.future_jobs = future_jobs
+ self.future_jobs_db = PStore.new('future_jobs.pstore', true) # pass true for thread safety
end
+ def enqueue_future_jobs
+ loop do
+ job = future_jobs.min
+ if job.nil?
+ sleep Belated.heartbeat
+ next
+ end
+ if job.at <= Time.now.to_f
+ delete_job(job)
+ push(job)
+ end
+ rescue DRb::DRbConnError
+ error 'DRb connection error!!!!!!'
+ log stats
+ end
+ end
+
def push(job)
if job.is_a?(Symbol) || job.at.nil? ||
job.at <= Time.now.to_f
@queue.push(job)
else
@mutex.synchronize do
@future_jobs << job
+ insert_into_future_jobs_db(job) unless job.proc_klass
end
end
end
def pop
@@ -50,20 +69,20 @@
def length
@queue.length
end
def load_jobs
- log "reloading... if file exists #{File.exist?(FILE_NAME)}"
+ future_jobs_db.transaction(true) do
+ future_jobs_db.roots.each do |id|
+ future_jobs << future_jobs_db[id]
+ end
+ end
return unless File.exist?(FILE_NAME)
jobs = YAML.load(File.binread(FILE_NAME))
jobs.each do |job|
- if job.at && job.at > Time.now.to_f
- future_jobs.push(job)
- else
- @queue.push(job)
- end
+ @queue.push(job)
end
File.delete(FILE_NAME)
end
def save_jobs
@@ -71,16 +90,10 @@
@queue.length.times do |_i|
unless proc_or_shutdown?(klass = @queue.pop)
class_array << klass
end
end
- future_jobs.each do |_job|
- unless proc_or_shutdown?(klass = future_jobs.pop)
- class_array << klass
- end
- end
-
pp File.open(FILE_NAME, 'wb') { |f| f.write(YAML.dump(class_array)) }
end
def connected?
true
@@ -88,8 +101,21 @@
private
def proc_or_shutdown?(job)
job.is_a?(Symbol) || job.job.instance_of?(Proc)
+ end
+
+ def delete_job(job)
+ log "Deleting #{future_jobs.delete(job)} from future jobs"
+ future_jobs_db.transaction do
+ future_jobs_db.delete(job.id)
+ end
+ end
+
+ def insert_into_future_jobs_db(job)
+ future_jobs_db.transaction do
+ future_jobs_db[job.id] = job
+ end
end
end
end