lib/resque/scheduler.rb in resque-scheduler-2.0.0.a vs lib/resque/scheduler.rb in resque-scheduler-2.0.0.b
- old
+ new
@@ -60,10 +60,13 @@
end
# Pulls the schedule from Resque.schedule and loads it into the
# rufus scheduler instance
def load_schedule!
+ # Need to load the schedule from redis for the first time if dynamic
+ Resque.reload_schedule! if dynamic
+
log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty?
@@scheduled_jobs = {}
Resque.schedule.each do |name, config|
@@ -127,18 +130,11 @@
item = nil
begin
handle_shutdown do
if item = Resque.next_item_for_timestamp(timestamp)
log "queuing #{item['class']} [delayed]"
- queue = item['queue'] || Resque.queue_from_class(constantize(item['class']))
- # Support custom job classes like job with status
- if (job_klass = item['custom_job_class']) && (job_klass != 'Resque::Job')
- # custom job classes not supporting the same API calls must implement the #schedule method
- constantize(job_klass).scheduled(queue, item['class'], *item['args'])
- else
- Resque::Job.create(queue, item['class'], *item['args'])
- end
+ enqueue_from_config(item)
end
end
# continue processing until there are no more ready items in this timestamp
end while !item.nil?
end
@@ -148,22 +144,31 @@
yield
exit if @shutdown
end
# Enqueues a job based on a config hash
- def enqueue_from_config(config)
- args = config['args'] || config[:args]
- klass_name = config['class'] || config[:class]
+ def enqueue_from_config(job_config)
+ args = job_config['args'] || job_config[:args]
+ klass_name = job_config['class'] || job_config[:class]
params = args.is_a?(Hash) ? [args] : Array(args)
- queue = config['queue'] || config[:queue] || Resque.queue_from_class(constantize(klass_name))
- # Support custom job classes like job with status
- if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job')
- # custom job classes not supporting the same API calls must implement the #schedule method
- constantize(job_klass).scheduled(queue, klass_name, *params)
+ queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(constantize(klass_name))
+ # Support custom job classes like those that inherit from Resque::JobWithStatus (resque-status)
+ if (job_klass = job_config['custom_job_class']) && (job_klass != 'Resque::Job')
+ # The custom job class API must offer a static "scheduled" method. If the custom
+ # job class can not be constantized (via a requeue call from the web perhaps), fall
+ # back to enqueing normally via Resque::Job.create.
+ begin
+ constantize(job_klass).scheduled(queue, klass_name, *params)
+ rescue NameError
+ # Note that the custom job class (job_config['custom_job_class']) is the one enqueued
+ Resque::Job.create(queue, job_klass, *params)
+ end
else
Resque::Job.create(queue, klass_name, *params)
end
+ rescue
+ log! "Failed to enqueue #{klass_name}:\n #{$!}"
end
def rufus_scheduler
@rufus_scheduler ||= Rufus::Scheduler.start_new
end
@@ -178,11 +183,10 @@
end
def reload_schedule!
procline "Reloading Schedule"
clear_schedule!
- Resque.reload_schedule!
load_schedule!
end
def update_schedule
if Resque.redis.scard(:schedules_changed) > 0
@@ -230,11 +234,11 @@
# add "verbose" logic later
log!(msg) if verbose
end
def procline(string)
+ log! string
$0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}"
- log! $0
end
end
end