lib/rocket_job/batch/worker.rb in rocketjob-5.1.1 vs lib/rocket_job/batch/worker.rb in rocketjob-5.2.0.beta1
- old
+ new
@@ -1,6 +1,6 @@
-require 'active_support/concern'
+require "active_support/concern"
module RocketJob
module Batch
module Worker
extend ActiveSupport::Concern
@@ -14,14 +14,14 @@
attr_writer :rocket_job_slice, :rocket_job_record_number
end
# Processes records in each available slice for this job. Slices are processed
# one at a time to allow for concurrent calls to this method to increase
- # throughput. Processing will continue until there are no more jobs available
+ # throughput. Processing will continue until there are no more slices available
# for this job.
#
- # Returns [true|false] whether this job should be excluded from the next lookup
+ # Returns [true|false] whether any work was performed.
#
# Slices are destroyed after their records are successfully processed
#
# Results are stored in the output collection if `collect_output?`
# `nil` results from workers are kept if `collect_nil_output`
@@ -31,40 +31,38 @@
# If the mongo_ha gem has been loaded, then the connection to mongo is
# automatically re-established and the job will resume anytime a
# Mongo connection failure occurs.
#
# Thread-safe, can be called by multiple threads at the same time
- def rocket_job_work(worker, re_raise_exceptions = false, filter = {})
- raise 'Job must be started before calling #rocket_job_work' unless running?
+ def rocket_job_work(worker, re_raise_exceptions = false)
+ raise "Job must be started before calling #rocket_job_work" unless running?
+
start_time = Time.now
if sub_state != :processing
- rocket_job_handle_callbacks(worker, re_raise_exceptions)
+ fail_on_exception!(re_raise_exceptions) { rocket_job_batch_callbacks(worker) }
return false unless running?
end
- while !worker.shutdown?
- if slice = input.next_slice(worker.name)
- # Grab a slice before checking the throttle to reduce concurrency race condition.
- if new_filter = rocket_job_batch_evaluate_throttles(slice)
- # Restore retrieved slice so that other workers can process it later.
- slice.set(worker_name: nil, state: :queued, started_at: nil)
- self.class.send(:rocket_job_merge_filter, filter, new_filter)
+ SemanticLogger.named_tagged(job: id.to_s) do
+ until worker.shutdown?
+ if slice = input.next_slice(worker.name)
+ # Grab a slice before checking the throttle to reduce concurrency race condition.
+ return true if slice.fail_on_exception!(re_raise_exceptions) { rocket_job_batch_throttled?(slice, worker) }
+ next if slice.failed?
+
+ slice.fail_on_exception!(re_raise_exceptions) { rocket_job_process_slice(slice) }
+ elsif record_count && rocket_job_batch_complete?(worker.name)
+ return false
+ else
+ logger.debug "No more work available for this job"
+ worker.add_to_current_filter(throttle_filter_id)
return true
end
- SemanticLogger.named_tagged(slice: slice.id.to_s) do
- rocket_job_process_slice(slice, re_raise_exceptions)
- end
- else
- break if record_count && rocket_job_batch_complete?(worker.name)
- logger.debug 'No more work available for this job'
- self.class.send(:rocket_job_merge_filter, filter, throttle_filter_id)
- return true
+ # Allow new jobs with a higher priority to interrupt this job
+ break if (Time.now - start_time) >= Config.re_check_seconds
end
-
- # Allow new jobs with a higher priority to interrupt this job
- break if (Time.now - start_time) >= Config.re_check_seconds
end
false
end
# Prior to a job being made available for processing it can be processed one
@@ -74,31 +72,30 @@
#
# Returns [Integer] the number of records processed in the slice
#
# Note: The slice will be removed from processing when this method completes
def work_first_slice(&block)
- raise '#work_first_slice can only be called from within before_batch callbacks' unless sub_state == :before
- # TODO Make these settings configurable
+ raise "#work_first_slice can only be called from within before_batch callbacks" unless sub_state == :before
+
+ # TODO: Make these settings configurable
count = 0
wait_seconds = 5
- while (slice = input.first).nil?
+ while input.first.nil?
break if count > 10
+
logger.info "First slice has not arrived yet, sleeping for #{wait_seconds} seconds"
sleep wait_seconds
count += 1
end
- if slice = input.first
- SemanticLogger.named_tagged(slice: slice.id.to_s) do
- # TODO Persist that the first slice is being processed by this worker
- slice.start
- rocket_job_process_slice(slice, true, &block)
- end
- else
- # No records processed
- 0
- end
+ slice = input.first
+ # No records processed
+ return 0 unless slice
+
+ # TODO: Persist that the first slice is being processed by this worker
+ slice.start
+ rocket_job_process_slice(slice, &block)
end
# Returns [Array<ActiveWorker>] All workers actively working on this job
def rocket_job_active_workers(server_name = nil)
servers = []
@@ -117,79 +114,102 @@
servers
end
private
+ def rocket_job_batch_throttled?(slice, worker)
+ filter = self.class.rocket_job_batch_throttles.matching_filter(self, slice)
+ return false unless filter
+
+ # Restore retrieved slice so that other workers can process it later.
+ slice.set(worker_name: nil, state: :queued, started_at: nil)
+ worker.add_to_current_filter(filter)
+ true
+ end
+
# Process a single slice from Mongo
# Once the slice has been successfully processed it will be removed from the input collection
# Returns [Integer] the number of records successfully processed
- def rocket_job_process_slice(slice, re_raise_exceptions)
- slice_record_number = 0
+ def rocket_job_process_slice(slice)
+ # TODO: Skip records already processed
@rocket_job_record_number = slice.first_record_number || 0
@rocket_job_slice = slice
- run_callbacks :slice do
+
+ processed_records = 0
+ run_callbacks(:slice) do
+ # Allow before_slice callbacks to fail, complete or abort this slice.
+ return 0 unless running?
+
RocketJob::Sliced::Writer::Output.collect(self, slice) do |writer|
slice.each do |record|
- slice_record_number += 1
SemanticLogger.named_tagged(record: @rocket_job_record_number) do
- if _perform_callbacks.empty?
- @rocket_job_output = block_given? ? yield(record) : perform(record)
- else
- # Allows @rocket_job_input to be modified by before/around callbacks
- @rocket_job_input = record
- # Allow callbacks to fail, complete or abort the job
- if running?
- if block_given?
- run_callbacks(:perform) { @rocket_job_output = yield(@rocket_job_input) }
- else
- # Allows @rocket_job_output to be modified by after/around callbacks
- run_callbacks(:perform) { @rocket_job_output = perform(@rocket_job_input) }
- end
- end
- end
- writer << @rocket_job_output
+ writer << rocket_job_batch_perform(slice, record)
+ processed_records += 1
end
- # JRuby says self.rocket_job_record_number= is private and cannot be accessed
+ # JRuby thinks self.rocket_job_record_number= is private and cannot be accessed
@rocket_job_record_number += 1
end
end
- @rocket_job_input = @rocket_job_slice = @rocket_job_output = nil
+ @rocket_job_slice = nil
+ @rocket_job_record_number = nil
end
# On successful completion remove the slice from the input queue
- # TODO Option to complete slice instead of destroying it to retain input data
+ # TODO: Add option to complete slice instead of destroying it to retain input data.
slice.destroy
- slice_record_number
- rescue Exception => exc
- slice.fail!(exc, slice_record_number)
- raise exc if re_raise_exceptions
- slice_record_number > 0 ? slice_record_number - 1 : 0
+ processed_records
end
+ # Perform a single record within the current slice.
+ def rocket_job_batch_perform(slice, record)
+ slice.processing_record_number ||= 0
+ slice.processing_record_number += 1
+
+ return block_given? ? yield(record) : perform(record) if _perform_callbacks.empty?
+
+ # @rocket_job_input and @rocket_job_output can be modified by before/around callbacks
+ @rocket_job_input = record
+ @rocket_job_output = nil
+
+ run_callbacks(:perform) do
+ @rocket_job_output =
+ if block_given?
+ yield(@rocket_job_input)
+ else
+ perform(@rocket_job_input)
+ end
+ end
+
+ @rocket_job_input = nil
+ result = @rocket_job_output
+ @rocket_job_output = nil
+ result
+ end
+
# Checks for completion and runs after_batch if defined
# Returns true if the job is now complete/aborted/failed
def rocket_job_batch_complete?(worker_name)
return true unless running?
return false unless record_count
# Only failed slices left?
input_count = input.count
failed_count = input.failed.count
- if (failed_count > 0) && (input_count == failed_count)
+ if failed_count.positive? && (input_count == failed_count)
# Reload to pull in any counters or other data that was modified.
reload unless new_record?
if may_fail?
fail_job = true
unless new_record?
# Fail job iff no other worker has already finished it
# Must set write concern to at least 1 since we need the nModified back
result = self.class.with(write: {w: 1}) do |query|
query.
where(id: id, state: :running, sub_state: :processing).
- update({'$set' => {state: :failed, worker_name: worker_name}})
+ update({"$set" => {state: :failed, worker_name: worker_name}})
end
- fail_job = false unless result.modified_count > 0
+ fail_job = false unless result.modified_count.positive?
end
if fail_job
message = "#{failed_count} slices failed to process"
self.exception = JobException.new(message: message)
fail!(worker_name, message)
@@ -197,11 +217,11 @@
end
return true
end
# Any work left?
- return false if input_count > 0
+ return false if input_count.positive?
# If the job was not saved to the queue, do not save any changes
if new_record?
rocket_job_batch_run_after_callbacks(false)
return true
@@ -210,16 +230,16 @@
# Complete job iff no other worker has already completed it
# Must set write concern to at least 1 since we need the nModified back
result = self.class.with(write: {w: 1}) do |query|
query.
where(id: id, state: :running, sub_state: :processing).
- update('$set' => {sub_state: :after, worker_name: worker_name})
+ update("$set" => {sub_state: :after, worker_name: worker_name})
end
# Reload to pull in any counters or other data that was modified.
reload
- if result.modified_count > 0
+ if result.modified_count.positive?
rocket_job_batch_run_after_callbacks(false)
else
# Repeat cleanup in case this worker was still running when the job was aborted
cleanup! if aborted?
end
@@ -231,11 +251,11 @@
def rocket_job_batch_run_before_callbacks
unless _before_batch_callbacks.empty?
self.sub_state = :before
save! unless new_record? || destroyed?
logger.measure_info(
- 'before_batch',
+ "before_batch",
metric: "#{self.class.name}/before_batch",
log_exception: :full,
on_exception_level: :error,
silence: log_level
) do
@@ -251,11 +271,11 @@
def rocket_job_batch_run_after_callbacks(save_before = true)
unless _after_batch_callbacks.empty?
self.sub_state = :after
save! if save_before && !new_record? && !destroyed?
logger.measure_info(
- 'after_batch',
+ "after_batch",
metric: "#{self.class.name}/after_batch",
log_exception: :full,
on_exception_level: :error,
silence: log_level
) do
@@ -267,22 +287,19 @@
else
may_complete? ? complete! : save!
end
end
- # Handle before and after callbacks
- def rocket_job_handle_callbacks(worker, re_raise_exceptions)
- rocket_job_fail_on_exception!(worker.name, re_raise_exceptions) do
- # If this is the first worker to pickup this job
- if sub_state == :before
- rocket_job_batch_run_before_callbacks
- # Check for 0 record jobs
- rocket_job_batch_complete?(worker.name) if running?
- elsif sub_state == :after
- rocket_job_batch_run_after_callbacks
- end
+ # Run Batch before and after callbacks
+ def rocket_job_batch_callbacks(worker)
+ # If this is the first worker to pickup this job
+ if sub_state == :before
+ rocket_job_batch_run_before_callbacks
+ # Check for 0 record jobs
+ rocket_job_batch_complete?(worker.name) if running?
+ elsif sub_state == :after
+ rocket_job_batch_run_after_callbacks
end
end
-
end
end
end