lib/rocket_job/concerns/worker.rb in rocketjob-1.0.0 vs lib/rocket_job/concerns/worker.rb in rocketjob-1.1.0
- old
+ new
@@ -5,13 +5,10 @@
module Concerns
module Worker
def self.included(base)
base.extend ClassMethods
base.class_eval do
- # While working on a slice, the current slice is available via this reader
- attr_reader :rocket_job_slice
-
@rocket_job_defaults = nil
end
end
module ClassMethods
@@ -26,11 +23,11 @@
end
end
# Create a job and process it immediately in-line by this thread
def now(method, *args, &block)
- job = build(method, *args, &block)
+ job = build(method, *args, &block)
worker = RocketJob::Worker.new(name: 'inline')
worker.started
job.start
while job.running? && !job.work(worker)
end
@@ -69,15 +66,65 @@
# Define job defaults
def rocket_job(&block)
@rocket_job_defaults = block
self
end
- end
- def rocket_job_csv_parser
- # TODO Change into an instance variable once CSV handling has been re-worked
- RocketJob::Utility::CSVRow.new
+ # Returns the next job to work on in priority based order
+ # Returns nil if there are currently no queued jobs, or processing batch jobs
+ # with records that require processing
+ #
+ # Parameters
+ # worker_name [String]
+ # Name of the worker that will be processing this job
+ #
+ # skip_job_ids [Array<BSON::ObjectId>]
+ # Job ids to exclude when looking for the next job
+ #
+ # Note:
+ # If a job is in queued state it will be started
+ def next_job(worker_name, skip_job_ids = nil)
+ query = {
+ '$and' => [
+ {
+ '$or' => [
+ {'state' => 'queued'}, # Jobs
+ {'state' => 'running', 'sub_state' => :processing} # Slices
+ ]
+ },
+ {
+ '$or' => [
+ {run_at: {'$exists' => false}},
+ {run_at: {'$lte' => Time.now}}
+ ]
+ }
+ ]
+ }
+ query['_id'] = {'$nin' => skip_job_ids} if skip_job_ids && skip_job_ids.size > 0
+
+ while (doc = find_and_modify(
+ query: query,
+ sort: [['priority', 'asc'], ['created_at', 'asc']],
+ update: {'$set' => {'worker_name' => worker_name, 'state' => 'running'}}
+ ))
+ job = load(doc)
+ if job.running?
+ return job
+ else
+ if job.expired?
+ job.destroy
+ logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}"
+ else
+ # Also update in-memory state and run call-backs
+ job.start
+ job.set(started_at: job.started_at)
+ return job
+ end
+ end
+ end
+ end
+
end
# Works on this job
#
# Returns [true|false] whether this job should be excluded from the next lookup
@@ -85,26 +132,28 @@
# If an exception is thrown the job is marked as failed and the exception
# is set in the job itself.
#
# Thread-safe, can be called by multiple threads at the same time
def work(worker)
- raise 'Job must be started before calling #work' unless running?
+ raise(ArgumentError, 'Job must be started before calling #work') unless running?
begin
# before_perform
call_method(perform_method, arguments, event: :before, log_level: log_level)
# perform
- call_method(perform_method, arguments, log_level: log_level)
+ ret = call_method(perform_method, arguments, log_level: log_level)
if self.collect_output?
- self.output = (result.is_a?(Hash) || result.is_a?(BSON::OrderedHash)) ? result : { result: result }
+ self.result = (ret.is_a?(Hash) || ret.is_a?(BSON::OrderedHash)) ? ret : {result: ret}
end
# after_perform
call_method(perform_method, arguments, event: :after, log_level: log_level)
+
complete!
- rescue Exception => exc
- set_exception(worker.name, exc)
+ rescue StandardError => exc
+ fail!(worker.name, exc) unless failed?
+ logger.error("Exception running #{self.class.name}##{perform_method}", exc)
raise exc if RocketJob::Config.inline_mode
end
false
end
@@ -129,29 +178,30 @@
#
# log_level: [Symbol]
# Log level to apply to silence logging during the call
# Default: nil ( no change )
#
- def call_method(method, arguments, options={})
- options = options.dup
- event = options.delete(:event)
- log_level = options.delete(:log_level)
+ def call_method(method, arguments, options = {})
+ options = options.dup
+ event = options.delete(:event)
+ log_level = options.delete(:log_level)
raise(ArgumentError, "Unknown #{self.class.name}#call_method options: #{options.inspect}") if options.size > 0
the_method = event.nil? ? method : "#{event}_#{method}".to_sym
if respond_to?(the_method)
method_name = "#{self.class.name}##{the_method}"
logger.info "Start #{method_name}"
- logger.benchmark_info("Completed #{method_name}",
+ logger.benchmark_info(
+ "Completed #{method_name}",
metric: "rocketjob/#{self.class.name.underscore}/#{the_method}",
log_exception: :full,
on_exception_level: :error,
silence: log_level
) do
- self.send(the_method, *arguments)
+ send(the_method, *arguments)
end
end
end
end
end
-end
\ No newline at end of file
+end