lib/resque/scheduler.rb in resque-scheduler-2.0.0.a vs lib/resque/scheduler.rb in resque-scheduler-2.0.0.b

- old
+ new

@@ -60,10 +60,13 @@ end # Pulls the schedule from Resque.schedule and loads it into the # rufus scheduler instance def load_schedule! + # Need to load the schedule from redis for the first time if dynamic + Resque.reload_schedule! if dynamic + log! "Schedule empty! Set Resque.schedule" if Resque.schedule.empty? @@scheduled_jobs = {} Resque.schedule.each do |name, config| @@ -127,18 +130,11 @@ item = nil begin handle_shutdown do if item = Resque.next_item_for_timestamp(timestamp) log "queuing #{item['class']} [delayed]" - queue = item['queue'] || Resque.queue_from_class(constantize(item['class'])) - # Support custom job classes like job with status - if (job_klass = item['custom_job_class']) && (job_klass != 'Resque::Job') - # custom job classes not supporting the same API calls must implement the #schedule method - constantize(job_klass).scheduled(queue, item['class'], *item['args']) - else - Resque::Job.create(queue, item['class'], *item['args']) - end + enqueue_from_config(item) end end # continue processing until there are no more ready items in this timestamp end while !item.nil? end @@ -148,22 +144,31 @@ yield exit if @shutdown end # Enqueues a job based on a config hash - def enqueue_from_config(config) - args = config['args'] || config[:args] - klass_name = config['class'] || config[:class] + def enqueue_from_config(job_config) + args = job_config['args'] || job_config[:args] + klass_name = job_config['class'] || job_config[:class] params = args.is_a?(Hash) ? [args] : Array(args) - queue = config['queue'] || config[:queue] || Resque.queue_from_class(constantize(klass_name)) - # Support custom job classes like job with status - if (job_klass = config['custom_job_class']) && (job_klass != 'Resque::Job') - # custom job classes not supporting the same API calls must implement the #schedule method - constantize(job_klass).scheduled(queue, klass_name, *params) + queue = job_config['queue'] || job_config[:queue] || Resque.queue_from_class(constantize(klass_name)) + # Support custom job classes like those that inherit from Resque::JobWithStatus (resque-status) + if (job_klass = job_config['custom_job_class']) && (job_klass != 'Resque::Job') + # The custom job class API must offer a static "scheduled" method. If the custom + # job class can not be constantized (via a requeue call from the web perhaps), fall + # back to enqueing normally via Resque::Job.create. + begin + constantize(job_klass).scheduled(queue, klass_name, *params) + rescue NameError + # Note that the custom job class (job_config['custom_job_class']) is the one enqueued + Resque::Job.create(queue, job_klass, *params) + end else Resque::Job.create(queue, klass_name, *params) end + rescue + log! "Failed to enqueue #{klass_name}:\n #{$!}" end def rufus_scheduler @rufus_scheduler ||= Rufus::Scheduler.start_new end @@ -178,11 +183,10 @@ end def reload_schedule! procline "Reloading Schedule" clear_schedule! - Resque.reload_schedule! load_schedule! end def update_schedule if Resque.redis.scard(:schedules_changed) > 0 @@ -230,11 +234,11 @@ # add "verbose" logic later log!(msg) if verbose end def procline(string) + log! string $0 = "resque-scheduler-#{ResqueScheduler::Version}: #{string}" - log! $0 end end end