lib/rocket_job/batch/statistics.rb in rocketjob-5.4.1 vs lib/rocket_job/batch/statistics.rb in rocketjob-6.0.0.rc1

- old
+ new

@@ -1,10 +1,14 @@ require "active_support/concern" module RocketJob module Batch - # Allow statistics to be gathered while a batch job is running + # Allow statistics to be gathered while a batch job is running. + # + # Notes: + # - Statistics for successfully processed records within a slice are saved. + # - Statistics gathered during a perform that then results in an exception are discarded. module Statistics extend ActiveSupport::Concern class Stats attr_reader :stats, :in_memory @@ -43,42 +47,60 @@ def inc_in_memory(key, increment) paths = key.to_s.split(".") last = paths.pop return unless last - target = paths.inject(in_memory) { |target, key| target.key?(key) ? target[key] : target[key] = Hash.new(0) } - target[last] += increment + last_target = paths.inject(in_memory) do |target, sub_key| + target.key?(sub_key) ? target[sub_key] : target[sub_key] = Hash.new(0) + end + last_target[last] += increment end end included do field :statistics, type: Hash, default: -> { Hash.new(0) } - around_slice :statistics_capture + around_slice :rocket_job_statistics_capture + after_perform :rocket_job_statistics_commit end # Increment a statistic def statistics_inc(key, increment = 1) return if key.nil? || key == "" - # Being called within tests outside of a perform - @slice_statistics ||= Stats.new(new_record? ? statistics : nil) - key.is_a?(Hash) ? @slice_statistics.inc(key) : @slice_statistics.inc_key(key, increment) + (@rocket_job_perform_statistics ||= []) << (key.is_a?(Hash) ? key : [key, increment]) end private - # Capture the number of successful and failed tradelines - # as well as those with notices and alerts. - def statistics_capture - @slice_statistics = Stats.new(new_record? ? statistics : nil) + def rocket_job_statistics_capture + @rocket_job_perform_statistics = nil + @rocket_job_slice_statistics = nil yield - collection.update_one({_id: id}, {"$inc" => @slice_statistics.stats}) unless @slice_statistics.empty? + ensure + if @rocket_job_slice_statistics && !@rocket_job_slice_statistics.empty? + collection.update_one({_id: id}, {"$inc" => @rocket_job_slice_statistics.stats}) + end end + def rocket_job_slice_statistics + @rocket_job_slice_statistics ||= Stats.new(new_record? ? statistics : nil) + end + + # Apply stats gathered during the perform to the slice level stats + def rocket_job_statistics_commit + return unless @rocket_job_perform_statistics + + @rocket_job_perform_statistics.each do |key| + key.is_a?(Hash) ? rocket_job_slice_statistics.inc(key) : rocket_job_slice_statistics.inc_key(*key) + end + + @rocket_job_perform_statistics = nil + end + # Overrides RocketJob::Batch::Logger#rocket_job_batch_log_payload def rocket_job_batch_log_payload - h = { + h = { from: aasm.from_state, to: aasm.to_state, event: aasm.current_event } h[:statistics] = statistics.dup if statistics.present? && (completed? || failed?)