lib/resque/plugins/multi_step_task.rb in resque-multi-step-1.1.0 vs lib/resque/plugins/multi_step_task.rb in resque-multi-step-1.1.1
- old
+ new
@@ -2,22 +2,23 @@
require 'redis-namespace'
require 'resque/plugins/multi_step_task/assure_finalization'
require 'resque/plugins/multi_step_task/finalization_job'
require 'resque/plugins/multi_step_task/constantization'
require 'resque/plugins/multi_step_task/atomic_counters'
+require 'logger'
+require 'yajl'
module Resque
module Plugins
+ # @attr_reader normal_job_count
+ # @attr_reader finalize_job_count
+ # @attr_reader completed_count
+ # @attr_reader failed_count
class MultiStepTask
class NoSuchMultiStepTask < StandardError; end
class NotReadyForFinalization < StandardError; end
class FinalizationAlreadyBegun < StandardError; end
- class StdOutLogger
- def warn(*args); puts args; end
- def info(*args); puts args; end
- def debug(*args); puts args; end
- end
class << self
include Constantization
NONCE_CHARS = ('a'..'z').to_a + ('A'..'Z').to_a + ('0'..'9').to_a
@@ -60,11 +61,11 @@
mst = new(task_id)
mst.nuke
redis.sadd("active-tasks", task_id)
redis.sismember("active-tasks", task_id)
if block_given?
- yield pjg
+ yield mst
mst.finalizable!
end
mst
end
@@ -92,18 +93,17 @@
end
def perform_without_maybe_finalize(task_id, job_module_name, *args)
task = MultiStepTask.find(task_id)
begin
- job_start_key = "#{task_id}_#{job_module_name}_#{args}-start-time-#{nonce}"
- task.redis.set(job_start_key, Time.now.to_i)
- logger.debug("[Resque Multi-Step-Task] Executing #{job_module_name} job for #{task_id} at #{Time.now} (args: #{args})")
+ start_time = Time.now
+ logger.debug("[Resque Multi-Step-Task] Executing #{job_module_name} job for #{task_id} at #{start_time} (args: #{args})")
# perform the task
constantize(job_module_name).perform(*args)
- logger.debug("[Resque Multi-Step-Task] Finished executing #{job_module_name} job for #{task_id} at #{Time.now}, taking #{(Time.now - task.redis.get(job_start_key).to_i).to_i} seconds.")
+ logger.debug("[Resque Multi-Step-Task] Finished executing #{job_module_name} job for #{task_id} at #{Time.now}, taking #{(Time.now - start_time)} seconds.")
rescue Exception => e
logger.error("[Resque Multi-Step-Task] #{job_module_name} job failed for #{task_id} at #{Time.now} (args: #{args})")
task.increment_failed_count
raise
end
@@ -118,11 +118,11 @@
def logger=(logger)
@@logger = logger
end
def logger
- @@logger ||= RAILS_DEFAULT_LOGGER || StdOutLogger.new
+ @@logger ||= Logger.new(STDERR)
end
# Normally jobs that are part of a multi-step task are run
# asynchronously by putting them on a queue. However, it is
# often more convenient to just run the jobs synchronously as
@@ -152,12 +152,14 @@
attr_accessor :logger
extend AtomicCounters
counter :normal_job_count
+
counter :finalize_job_count
counter :completed_count
+
counter :failed_count
# Initialize a newly instantiated parallel job group.
#