lib/rocket_job/batch/worker.rb in rocketjob-5.4.1 vs lib/rocket_job/batch/worker.rb in rocketjob-6.0.0.rc1

- old
+ new

@@ -21,13 +21,10 @@ # # Returns [true|false] whether any work was performed. # # Slices are destroyed after their records are successfully processed # - # Results are stored in the output collection if `collect_output?` - # `nil` results from workers are kept if `collect_nil_output` - # # If an exception was thrown the entire slice of records is marked as failed. # # Thread-safe, can be called by multiple threads at the same time def rocket_job_work(worker, re_raise_exceptions = false) raise "Job must be started before calling #rocket_job_work" unless running? @@ -38,11 +35,12 @@ return false unless running? end SemanticLogger.named_tagged(job: id.to_s) do until worker.shutdown? - if slice = input.next_slice(worker.name) + slice = input.next_slice(worker.name) + if slice # Grab a slice before checking the throttle to reduce concurrency race condition. return true if slice.fail_on_exception!(re_raise_exceptions) { rocket_job_batch_throttled?(slice, worker) } next if slice.failed? slice.fail_on_exception!(re_raise_exceptions) { rocket_job_process_slice(slice) } @@ -95,11 +93,11 @@ # Returns [Array<ActiveWorker>] All workers actively working on this job def rocket_job_active_workers(server_name = nil) servers = [] case sub_state when :before, :after - unless server_name && !worker_on_server?(server_name) + if running? && (server_name.nil? || worker_on_server?(server_name)) servers << ActiveWorker.new(worker_name, started_at, self) if running? end when :processing query = input.running query = query.where(worker_name: /\A#{server_name}/) if server_name @@ -141,23 +139,27 @@ count end # Perform individual slice without callbacks def rocket_job_perform_slice(slice, &block) - count = 0 - RocketJob::Sliced::Writer::Output.collect(self, slice) do |writer| - records = slice.records + slice.processing_record_number ||= 0 + records = [] + append = false - # Skip records already processed, if any. - # slice.processing_record_number ||= 0 - # TODO: Must append to existing output slices before this can be enabled. - # if !collect_output && (slice.processing_record_number > 1) - # records = records[slice.processing_record_number - 1..-1] - # end - # Until the changes above have been implemented, reprocess all records in the slice. + # Skip processed records in this slice if it has no output categpries. + if slice.processing_record_number > 1 + records = slice.records[slice.processing_record_number - 1..-1] + append = true + logger.info("Resuming previously incomplete slice from record number #{slice.processing_record_number}") + else + # Reprocess all records in this slice. slice.processing_record_number = 0 + records = slice.records + end + count = 0 + RocketJob::Sliced::Writer::Output.collect(self, input_slice: slice, append: append) do |writer| records.each do |record| slice.processing_record_number += 1 SemanticLogger.named_tagged(record: slice.current_record_number) do writer << rocket_job_batch_perform(slice, record, &block) count += 1 @@ -172,25 +174,25 @@ @rocket_job_record_number = slice.current_record_number return block_given? ? yield(record) : perform(record) if _perform_callbacks.empty? # @rocket_job_input and @rocket_job_output can be modified by before/around callbacks - @rocket_job_input = record - @rocket_job_output = nil + @rocket_job_input = record + @rocket_job_output = nil run_callbacks(:perform) do @rocket_job_output = if block_given? yield(@rocket_job_input) else perform(@rocket_job_input) end end - @rocket_job_input = nil - result = @rocket_job_output - @rocket_job_output = nil + @rocket_job_input = nil + result = @rocket_job_output + @rocket_job_output = nil result end # Checks for completion and runs after_batch if defined # Returns true if the job is now complete/aborted/failed @@ -242,11 +244,11 @@ fail_job = true unless new_record? # Fail job iff no other worker has already finished it # Must set write concern to at least 1 since we need the nModified back - result = self.class.with(write: {w: 1}) do |query| + result = self.class.with(write: {w: 1}) do |query| query. where(id: id, state: :running, sub_state: :processing). update({"$set" => {state: :failed, worker_name: worker_name}}) end fail_job = false unless result.modified_count.positive? @@ -303,14 +305,15 @@ end # Run Batch before and after callbacks def rocket_job_batch_callbacks(worker) # If this is the first worker to pickup this job - if sub_state == :before + case sub_state + when :before rocket_job_batch_run_before_callbacks # Check for 0 record jobs rocket_job_batch_complete?(worker.name) if running? - elsif sub_state == :after + when sub_state == :after rocket_job_batch_run_after_callbacks end end end end