lib/rocket_job/batch/worker.rb in rocketjob-5.2.0.beta1 vs lib/rocket_job/batch/worker.rb in rocketjob-5.2.0.beta2
- old
+ new
@@ -127,11 +127,11 @@
end
# Process a single slice from Mongo
# Once the slice has been successfully processed it will be removed from the input collection
# Returns [Integer] the number of records successfully processed
- def rocket_job_process_slice(slice)
+ def rocket_job_process_slice(slice, &block)
# TODO: Skip records already processed
@rocket_job_record_number = slice.first_record_number || 0
@rocket_job_slice = slice
processed_records = 0
@@ -140,11 +140,11 @@
return 0 unless running?
RocketJob::Sliced::Writer::Output.collect(self, slice) do |writer|
slice.each do |record|
SemanticLogger.named_tagged(record: @rocket_job_record_number) do
- writer << rocket_job_batch_perform(slice, record)
+ writer << rocket_job_batch_perform(slice, record, &block)
processed_records += 1
end
# JRuby thinks self.rocket_job_record_number= is private and cannot be accessed
@rocket_job_record_number += 1
end
@@ -200,10 +200,10 @@
if may_fail?
fail_job = true
unless new_record?
# Fail job iff no other worker has already finished it
# Must set write concern to at least 1 since we need the nModified back
- result = self.class.with(write: {w: 1}) do |query|
+ result = self.class.with(write: {w: 1}) do |query|
query.
where(id: id, state: :running, sub_state: :processing).
update({"$set" => {state: :failed, worker_name: worker_name}})
end
fail_job = false unless result.modified_count.positive?