lib/rocket_job/plugins/job/worker.rb in rocketjob-2.1.3 vs lib/rocket_job/plugins/job/worker.rb in rocketjob-3.0.0.alpha

- old
+ new

@@ -6,19 +6,19 @@ module Plugins module Job module Worker extend ActiveSupport::Concern - included do + module ClassMethods # Run this job later # # Saves it to the database for processing later by workers - def self.perform_later(*args, &block) + def perform_later(args, &block) if RocketJob::Config.inline_mode - perform_now(*args, &block) + perform_now(args, &block) else - job = new(arguments: args) + job = new(args) block.call(job) if block job.save! job end end @@ -26,12 +26,12 @@ # Run this job now. # # The job is not saved to the database since it is processed entriely in memory # As a result before_save and before_destroy callbacks will not be called. # Validations are still called however prior to calling #perform - def self.perform_now(*args, &block) - job = new(arguments: args) + def perform_now(args, &block) + job = new(args) block.call(job) if block job.perform_now job end @@ -46,11 +46,11 @@ # skip_job_ids [Array<BSON::ObjectId>] # Job ids to exclude when looking for the next job # # Note: # If a job is in queued state it will be started - def self.rocket_job_next_job(worker_name, skip_job_ids = nil) + def rocket_job_next_job(worker_name, skip_job_ids = nil) while (job = rocket_job_retrieve(worker_name, skip_job_ids)) case when job.running? # Batch Job return job @@ -65,20 +65,17 @@ return job if job.running? end end end - # Requeues all jobs that were running on worker that died - def self.requeue_dead_worker(worker_name) - # TODO Need to requeue paused, failed since user may have transitioned job before it finished - running.each do |job| - job.requeue!(worker_name) if job.may_requeue?(worker_name) + # Requeues all jobs that were running on a server that died + def requeue_dead_server(server_name) + # Need to requeue paused, failed since user may have transitioned job before it finished + where(:state.in => [:running, :paused, :faled]).each do |job| + job.requeue!(server_name) if job.may_requeue?(server_name) end end - - # Turn off embedded callbacks. Slow and not used for Jobs - embedded_callbacks_off end # Runs the job now in the current thread. # # Validations are called prior to running the job. @@ -89,18 +86,13 @@ # * before_create # * after_create # # Exceptions are _not_ suppressed and should be handled by the caller. def perform_now - # Call validations - if respond_to?(:validate!) - validate! - elsif invalid? - raise(MongoMapper::DocumentNotValid, self) - end - worker = RocketJob::Worker.new(name: 'inline') - worker.started + raise(Mongoid::Errors::Validations, self) unless valid? + + worker = RocketJob::Worker.new(inline: true) start if may_start? # Re-Raise exceptions rocket_job_work(worker, true) if running? result end @@ -112,11 +104,11 @@ # Fail this job in the event of an exception. # # The job is automatically saved only if an exception is raised in the supplied block. # # worker_name: [String] - # Name of the worker on which the exception has occurred + # Name of the server on which the exception has occurred # # re_raise_exceptions: [true|false] # Re-raise the exception after updating the job # Default: false def rocket_job_fail_on_exception!(worker_name, re_raise_exceptions = false) @@ -148,23 +140,29 @@ raise(ArgumentError, 'Job must be started before calling #rocket_job_work') unless running? rocket_job_fail_on_exception!(worker.name, re_raise_exceptions) do run_callbacks :perform do # Allow callbacks to fail, complete or abort the job if running? - ret = perform(*arguments) + ret = perform if collect_output? # Result must be a Hash, if not put it in a Hash - self.result = (ret.is_a?(Hash) || ret.is_a?(BSON::OrderedHash)) ? ret : {result: ret} + self.result = ret.is_a?(Hash) ? ret : {'result' => ret} end end end if new_record? || destroyed? complete if may_complete? else may_complete? ? complete! : save! end end false + end + + # Returns [Hash<String:[Array<ActiveWorker>]>] All servers actively working on this job + def rocket_job_active_servers + return {} unless running? + {worker_name => [ActiveServer.new(worker_name, started_at, self)]} end end end end