lib/resque/plugins/multi_step_task.rb in resque-multi-step-1.1.0 vs lib/resque/plugins/multi_step_task.rb in resque-multi-step-1.1.1

- old
+ new

@@ -2,22 +2,23 @@ require 'redis-namespace' require 'resque/plugins/multi_step_task/assure_finalization' require 'resque/plugins/multi_step_task/finalization_job' require 'resque/plugins/multi_step_task/constantization' require 'resque/plugins/multi_step_task/atomic_counters' +require 'logger' +require 'yajl' module Resque module Plugins + # @attr_reader normal_job_count + # @attr_reader finalize_job_count + # @attr_reader completed_count + # @attr_reader failed_count class MultiStepTask class NoSuchMultiStepTask < StandardError; end class NotReadyForFinalization < StandardError; end class FinalizationAlreadyBegun < StandardError; end - class StdOutLogger - def warn(*args); puts args; end - def info(*args); puts args; end - def debug(*args); puts args; end - end class << self include Constantization NONCE_CHARS = ('a'..'z').to_a + ('A'..'Z').to_a + ('0'..'9').to_a @@ -60,11 +61,11 @@ mst = new(task_id) mst.nuke redis.sadd("active-tasks", task_id) redis.sismember("active-tasks", task_id) if block_given? - yield pjg + yield mst mst.finalizable! end mst end @@ -92,18 +93,17 @@ end def perform_without_maybe_finalize(task_id, job_module_name, *args) task = MultiStepTask.find(task_id) begin - job_start_key = "#{task_id}_#{job_module_name}_#{args}-start-time-#{nonce}" - task.redis.set(job_start_key, Time.now.to_i) - logger.debug("[Resque Multi-Step-Task] Executing #{job_module_name} job for #{task_id} at #{Time.now} (args: #{args})") + start_time = Time.now + logger.debug("[Resque Multi-Step-Task] Executing #{job_module_name} job for #{task_id} at #{start_time} (args: #{args})") # perform the task constantize(job_module_name).perform(*args) - logger.debug("[Resque Multi-Step-Task] Finished executing #{job_module_name} job for #{task_id} at #{Time.now}, taking #{(Time.now - task.redis.get(job_start_key).to_i).to_i} seconds.") + logger.debug("[Resque Multi-Step-Task] Finished executing #{job_module_name} job for #{task_id} at #{Time.now}, taking #{(Time.now - start_time)} seconds.") rescue Exception => e logger.error("[Resque Multi-Step-Task] #{job_module_name} job failed for #{task_id} at #{Time.now} (args: #{args})") task.increment_failed_count raise end @@ -118,11 +118,11 @@ def logger=(logger) @@logger = logger end def logger - @@logger ||= RAILS_DEFAULT_LOGGER || StdOutLogger.new + @@logger ||= Logger.new(STDERR) end # Normally jobs that are part of a multi-step task are run # asynchronously by putting them on a queue. However, it is # often more convenient to just run the jobs synchronously as @@ -152,12 +152,14 @@ attr_accessor :logger extend AtomicCounters counter :normal_job_count + counter :finalize_job_count counter :completed_count + counter :failed_count # Initialize a newly instantiated parallel job group. #