lib/rocket_job/concerns/worker.rb in rocketjob-1.0.0 vs lib/rocket_job/concerns/worker.rb in rocketjob-1.1.0

- old
+ new

@@ -5,13 +5,10 @@ module Concerns module Worker def self.included(base) base.extend ClassMethods base.class_eval do - # While working on a slice, the current slice is available via this reader - attr_reader :rocket_job_slice - @rocket_job_defaults = nil end end module ClassMethods @@ -26,11 +23,11 @@ end end # Create a job and process it immediately in-line by this thread def now(method, *args, &block) - job = build(method, *args, &block) + job = build(method, *args, &block) worker = RocketJob::Worker.new(name: 'inline') worker.started job.start while job.running? && !job.work(worker) end @@ -69,15 +66,65 @@ # Define job defaults def rocket_job(&block) @rocket_job_defaults = block self end - end - def rocket_job_csv_parser - # TODO Change into an instance variable once CSV handling has been re-worked - RocketJob::Utility::CSVRow.new + # Returns the next job to work on in priority based order + # Returns nil if there are currently no queued jobs, or processing batch jobs + # with records that require processing + # + # Parameters + # worker_name [String] + # Name of the worker that will be processing this job + # + # skip_job_ids [Array<BSON::ObjectId>] + # Job ids to exclude when looking for the next job + # + # Note: + # If a job is in queued state it will be started + def next_job(worker_name, skip_job_ids = nil) + query = { + '$and' => [ + { + '$or' => [ + {'state' => 'queued'}, # Jobs + {'state' => 'running', 'sub_state' => :processing} # Slices + ] + }, + { + '$or' => [ + {run_at: {'$exists' => false}}, + {run_at: {'$lte' => Time.now}} + ] + } + ] + } + query['_id'] = {'$nin' => skip_job_ids} if skip_job_ids && skip_job_ids.size > 0 + + while (doc = find_and_modify( + query: query, + sort: [['priority', 'asc'], ['created_at', 'asc']], + update: {'$set' => {'worker_name' => worker_name, 'state' => 'running'}} + )) + job = load(doc) + if job.running? + return job + else + if job.expired? + job.destroy + logger.info "Destroyed expired job #{job.class.name}, id:#{job.id}" + else + # Also update in-memory state and run call-backs + job.start + job.set(started_at: job.started_at) + return job + end + end + end + end + end # Works on this job # # Returns [true|false] whether this job should be excluded from the next lookup @@ -85,26 +132,28 @@ # If an exception is thrown the job is marked as failed and the exception # is set in the job itself. # # Thread-safe, can be called by multiple threads at the same time def work(worker) - raise 'Job must be started before calling #work' unless running? + raise(ArgumentError, 'Job must be started before calling #work') unless running? begin # before_perform call_method(perform_method, arguments, event: :before, log_level: log_level) # perform - call_method(perform_method, arguments, log_level: log_level) + ret = call_method(perform_method, arguments, log_level: log_level) if self.collect_output? - self.output = (result.is_a?(Hash) || result.is_a?(BSON::OrderedHash)) ? result : { result: result } + self.result = (ret.is_a?(Hash) || ret.is_a?(BSON::OrderedHash)) ? ret : {result: ret} end # after_perform call_method(perform_method, arguments, event: :after, log_level: log_level) + complete! - rescue Exception => exc - set_exception(worker.name, exc) + rescue StandardError => exc + fail!(worker.name, exc) unless failed? + logger.error("Exception running #{self.class.name}##{perform_method}", exc) raise exc if RocketJob::Config.inline_mode end false end @@ -129,29 +178,30 @@ # # log_level: [Symbol] # Log level to apply to silence logging during the call # Default: nil ( no change ) # - def call_method(method, arguments, options={}) - options = options.dup - event = options.delete(:event) - log_level = options.delete(:log_level) + def call_method(method, arguments, options = {}) + options = options.dup + event = options.delete(:event) + log_level = options.delete(:log_level) raise(ArgumentError, "Unknown #{self.class.name}#call_method options: #{options.inspect}") if options.size > 0 the_method = event.nil? ? method : "#{event}_#{method}".to_sym if respond_to?(the_method) method_name = "#{self.class.name}##{the_method}" logger.info "Start #{method_name}" - logger.benchmark_info("Completed #{method_name}", + logger.benchmark_info( + "Completed #{method_name}", metric: "rocketjob/#{self.class.name.underscore}/#{the_method}", log_exception: :full, on_exception_level: :error, silence: log_level ) do - self.send(the_method, *arguments) + send(the_method, *arguments) end end end end end -end \ No newline at end of file +end