lib/rocket_job/batch/worker.rb in rocketjob-5.2.0.beta1 vs lib/rocket_job/batch/worker.rb in rocketjob-5.2.0.beta2

- old
+ new

@@ -127,11 +127,11 @@ end # Process a single slice from Mongo # Once the slice has been successfully processed it will be removed from the input collection # Returns [Integer] the number of records successfully processed - def rocket_job_process_slice(slice) + def rocket_job_process_slice(slice, &block) # TODO: Skip records already processed @rocket_job_record_number = slice.first_record_number || 0 @rocket_job_slice = slice processed_records = 0 @@ -140,11 +140,11 @@ return 0 unless running? RocketJob::Sliced::Writer::Output.collect(self, slice) do |writer| slice.each do |record| SemanticLogger.named_tagged(record: @rocket_job_record_number) do - writer << rocket_job_batch_perform(slice, record) + writer << rocket_job_batch_perform(slice, record, &block) processed_records += 1 end # JRuby thinks self.rocket_job_record_number= is private and cannot be accessed @rocket_job_record_number += 1 end @@ -200,10 +200,10 @@ if may_fail? fail_job = true unless new_record? # Fail job iff no other worker has already finished it # Must set write concern to at least 1 since we need the nModified back - result = self.class.with(write: {w: 1}) do |query| + result = self.class.with(write: {w: 1}) do |query| query. where(id: id, state: :running, sub_state: :processing). update({"$set" => {state: :failed, worker_name: worker_name}}) end fail_job = false unless result.modified_count.positive?