lib/rocket_job/plugins/job/worker.rb in rocketjob-2.1.3 vs lib/rocket_job/plugins/job/worker.rb in rocketjob-3.0.0.alpha
- old
+ new
@@ -6,19 +6,19 @@
module Plugins
module Job
module Worker
extend ActiveSupport::Concern
- included do
+ module ClassMethods
# Run this job later
#
# Saves it to the database for processing later by workers
- def self.perform_later(*args, &block)
+ def perform_later(args, &block)
if RocketJob::Config.inline_mode
- perform_now(*args, &block)
+ perform_now(args, &block)
else
- job = new(arguments: args)
+ job = new(args)
block.call(job) if block
job.save!
job
end
end
@@ -26,12 +26,12 @@
# Run this job now.
#
# The job is not saved to the database since it is processed entriely in memory
# As a result before_save and before_destroy callbacks will not be called.
# Validations are still called however prior to calling #perform
- def self.perform_now(*args, &block)
- job = new(arguments: args)
+ def perform_now(args, &block)
+ job = new(args)
block.call(job) if block
job.perform_now
job
end
@@ -46,11 +46,11 @@
# skip_job_ids [Array<BSON::ObjectId>]
# Job ids to exclude when looking for the next job
#
# Note:
# If a job is in queued state it will be started
- def self.rocket_job_next_job(worker_name, skip_job_ids = nil)
+ def rocket_job_next_job(worker_name, skip_job_ids = nil)
while (job = rocket_job_retrieve(worker_name, skip_job_ids))
case
when job.running?
# Batch Job
return job
@@ -65,20 +65,17 @@
return job if job.running?
end
end
end
- # Requeues all jobs that were running on worker that died
- def self.requeue_dead_worker(worker_name)
- # TODO Need to requeue paused, failed since user may have transitioned job before it finished
- running.each do |job|
- job.requeue!(worker_name) if job.may_requeue?(worker_name)
+ # Requeues all jobs that were running on a server that died
+ def requeue_dead_server(server_name)
+ # Need to requeue paused, failed since user may have transitioned job before it finished
+ where(:state.in => [:running, :paused, :faled]).each do |job|
+ job.requeue!(server_name) if job.may_requeue?(server_name)
end
end
-
- # Turn off embedded callbacks. Slow and not used for Jobs
- embedded_callbacks_off
end
# Runs the job now in the current thread.
#
# Validations are called prior to running the job.
@@ -89,18 +86,13 @@
# * before_create
# * after_create
#
# Exceptions are _not_ suppressed and should be handled by the caller.
def perform_now
- # Call validations
- if respond_to?(:validate!)
- validate!
- elsif invalid?
- raise(MongoMapper::DocumentNotValid, self)
- end
- worker = RocketJob::Worker.new(name: 'inline')
- worker.started
+ raise(Mongoid::Errors::Validations, self) unless valid?
+
+ worker = RocketJob::Worker.new(inline: true)
start if may_start?
# Re-Raise exceptions
rocket_job_work(worker, true) if running?
result
end
@@ -112,11 +104,11 @@
# Fail this job in the event of an exception.
#
# The job is automatically saved only if an exception is raised in the supplied block.
#
# worker_name: [String]
- # Name of the worker on which the exception has occurred
+ # Name of the server on which the exception has occurred
#
# re_raise_exceptions: [true|false]
# Re-raise the exception after updating the job
# Default: false
def rocket_job_fail_on_exception!(worker_name, re_raise_exceptions = false)
@@ -148,23 +140,29 @@
raise(ArgumentError, 'Job must be started before calling #rocket_job_work') unless running?
rocket_job_fail_on_exception!(worker.name, re_raise_exceptions) do
run_callbacks :perform do
# Allow callbacks to fail, complete or abort the job
if running?
- ret = perform(*arguments)
+ ret = perform
if collect_output?
# Result must be a Hash, if not put it in a Hash
- self.result = (ret.is_a?(Hash) || ret.is_a?(BSON::OrderedHash)) ? ret : {result: ret}
+ self.result = ret.is_a?(Hash) ? ret : {'result' => ret}
end
end
end
if new_record? || destroyed?
complete if may_complete?
else
may_complete? ? complete! : save!
end
end
false
+ end
+
+ # Returns [Hash<String:[Array<ActiveWorker>]>] All servers actively working on this job
+ def rocket_job_active_servers
+ return {} unless running?
+ {worker_name => [ActiveServer.new(worker_name, started_at, self)]}
end
end
end
end