lib/rocket_job/concerns/worker.rb in rocketjob-1.2.1 vs lib/rocket_job/concerns/worker.rb in rocketjob-1.3.0
- old
+ new
@@ -20,23 +20,11 @@
end
end
# Create a job and process it immediately in-line by this thread
def now(method, *args, &block)
- job = build(method, *args, &block)
- # Call validations
- if job.respond_to?(:validate!)
- job.validate!
- elsif job.invalid?
- raise(MongoMapper::DocumentNotValid, "Validation failed: #{job.errors.messages.join(', ')}")
- end
- worker = RocketJob::Worker.new(name: 'inline')
- worker.started
- job.start
- while job.running? && !job.work(worker)
- end
- job
+ build(method, *args, &block).work_now
end
# Build a Rocket Job instance
#
# Note:
@@ -132,26 +120,49 @@
def work(worker)
raise(ArgumentError, 'Job must be started before calling #work') unless running?
begin
# before_perform
call_method(perform_method, arguments, event: :before, log_level: log_level)
+ # Allow before perform to explicitly fail this job
+ return unless running?
# perform
ret = call_method(perform_method, arguments, log_level: log_level)
if self.collect_output?
self.result = (ret.is_a?(Hash) || ret.is_a?(BSON::OrderedHash)) ? ret : {result: ret}
end
+ # Only run after perform if perform did not explicitly fail the job
+ return unless running?
+
# after_perform
call_method(perform_method, arguments, event: :after, log_level: log_level)
- complete!
+ new_record? ? complete : complete!
rescue StandardError => exc
- fail!(worker.name, exc) unless failed?
+ fail(worker.name, exc) if may_fail?
logger.error("Exception running #{self.class.name}##{perform_method}", exc)
+ save! unless new_record?
raise exc if RocketJob::Config.inline_mode
end
false
+ end
+
+ # Validates and runs the work on this job now in the current thread
+ # Returns this job once it has finished running
+ def work_now
+ # Call validations
+ if respond_to?(:validate!)
+ validate!
+ elsif invalid?
+ raise(MongoMapper::DocumentNotValid, "Validation failed: #{errors.messages.join(', ')}")
+ end
+ worker = RocketJob::Worker.new(name: 'inline')
+ worker.started
+ start if may_start?
+ while running? && !work(worker)
+ end
+ self
end
protected
# Calls a method on this job, if it is defined