lib/rocket_job/batch/worker.rb in rocketjob-5.4.1 vs lib/rocket_job/batch/worker.rb in rocketjob-6.0.0.rc1
- old
+ new
@@ -21,13 +21,10 @@
#
# Returns [true|false] whether any work was performed.
#
# Slices are destroyed after their records are successfully processed
#
- # Results are stored in the output collection if `collect_output?`
- # `nil` results from workers are kept if `collect_nil_output`
- #
# If an exception was thrown the entire slice of records is marked as failed.
#
# Thread-safe, can be called by multiple threads at the same time
def rocket_job_work(worker, re_raise_exceptions = false)
raise "Job must be started before calling #rocket_job_work" unless running?
@@ -38,11 +35,12 @@
return false unless running?
end
SemanticLogger.named_tagged(job: id.to_s) do
until worker.shutdown?
- if slice = input.next_slice(worker.name)
+ slice = input.next_slice(worker.name)
+ if slice
# Grab a slice before checking the throttle to reduce concurrency race condition.
return true if slice.fail_on_exception!(re_raise_exceptions) { rocket_job_batch_throttled?(slice, worker) }
next if slice.failed?
slice.fail_on_exception!(re_raise_exceptions) { rocket_job_process_slice(slice) }
@@ -95,11 +93,11 @@
# Returns [Array<ActiveWorker>] All workers actively working on this job
def rocket_job_active_workers(server_name = nil)
servers = []
case sub_state
when :before, :after
- unless server_name && !worker_on_server?(server_name)
+ if running? && (server_name.nil? || worker_on_server?(server_name))
servers << ActiveWorker.new(worker_name, started_at, self) if running?
end
when :processing
query = input.running
query = query.where(worker_name: /\A#{server_name}/) if server_name
@@ -141,23 +139,27 @@
count
end
# Perform individual slice without callbacks
def rocket_job_perform_slice(slice, &block)
- count = 0
- RocketJob::Sliced::Writer::Output.collect(self, slice) do |writer|
- records = slice.records
+ slice.processing_record_number ||= 0
+ records = []
+ append = false
- # Skip records already processed, if any.
- # slice.processing_record_number ||= 0
- # TODO: Must append to existing output slices before this can be enabled.
- # if !collect_output && (slice.processing_record_number > 1)
- # records = records[slice.processing_record_number - 1..-1]
- # end
- # Until the changes above have been implemented, reprocess all records in the slice.
+ # Skip processed records in this slice if it has no output categpries.
+ if slice.processing_record_number > 1
+ records = slice.records[slice.processing_record_number - 1..-1]
+ append = true
+ logger.info("Resuming previously incomplete slice from record number #{slice.processing_record_number}")
+ else
+ # Reprocess all records in this slice.
slice.processing_record_number = 0
+ records = slice.records
+ end
+ count = 0
+ RocketJob::Sliced::Writer::Output.collect(self, input_slice: slice, append: append) do |writer|
records.each do |record|
slice.processing_record_number += 1
SemanticLogger.named_tagged(record: slice.current_record_number) do
writer << rocket_job_batch_perform(slice, record, &block)
count += 1
@@ -172,25 +174,25 @@
@rocket_job_record_number = slice.current_record_number
return block_given? ? yield(record) : perform(record) if _perform_callbacks.empty?
# @rocket_job_input and @rocket_job_output can be modified by before/around callbacks
- @rocket_job_input = record
- @rocket_job_output = nil
+ @rocket_job_input = record
+ @rocket_job_output = nil
run_callbacks(:perform) do
@rocket_job_output =
if block_given?
yield(@rocket_job_input)
else
perform(@rocket_job_input)
end
end
- @rocket_job_input = nil
- result = @rocket_job_output
- @rocket_job_output = nil
+ @rocket_job_input = nil
+ result = @rocket_job_output
+ @rocket_job_output = nil
result
end
# Checks for completion and runs after_batch if defined
# Returns true if the job is now complete/aborted/failed
@@ -242,11 +244,11 @@
fail_job = true
unless new_record?
# Fail job iff no other worker has already finished it
# Must set write concern to at least 1 since we need the nModified back
- result = self.class.with(write: {w: 1}) do |query|
+ result = self.class.with(write: {w: 1}) do |query|
query.
where(id: id, state: :running, sub_state: :processing).
update({"$set" => {state: :failed, worker_name: worker_name}})
end
fail_job = false unless result.modified_count.positive?
@@ -303,14 +305,15 @@
end
# Run Batch before and after callbacks
def rocket_job_batch_callbacks(worker)
# If this is the first worker to pickup this job
- if sub_state == :before
+ case sub_state
+ when :before
rocket_job_batch_run_before_callbacks
# Check for 0 record jobs
rocket_job_batch_complete?(worker.name) if running?
- elsif sub_state == :after
+ when sub_state == :after
rocket_job_batch_run_after_callbacks
end
end
end
end