app/jobs/bulkrax/create_relationships_job.rb in bulkrax-8.1.0 vs app/jobs/bulkrax/create_relationships_job.rb in bulkrax-8.2.0

- old
+ new

@@ -36,15 +36,17 @@ # # In config/initializers/bulkrax.rb # Bulkrax::CreateRelationshipsJob.update_child_records_works_file_sets = true # # @see https://github.com/scientist-softserv/louisville-hyku/commit/128a9ef class_attribute :update_child_records_works_file_sets, default: false + class_attribute :max_failure_count, default: 5 include DynamicRecordLookup queue_as Bulkrax.config.ingest_queue_name + attr_accessor :user, :importer_run, :errors ## # @param parent_identifier [String] Work/Collection ID or Bulkrax::Entry source_identifiers # @param importer_run [Bulkrax::ImporterRun] current importer run (needed to properly update counters) # # The entry_identifier is used to lookup the @base_entry for the job (a.k.a. the entry the job was called from). @@ -52,13 +54,14 @@ # Whether the @base_entry is the parent or the child in the relationship is determined by the presence of a # parent_identifier or child_identifier param. For example, if a parent_identifier is passed, we know @base_entry # is the child in the relationship, and vice versa if a child_identifier is passed. # # rubocop:disable Metrics/MethodLength - def perform(parent_identifier:, importer_run_id:) # rubocop:disable Metrics/AbcSize - @importer_run = Bulkrax::ImporterRun.find(importer_run_id) - ability = Ability.new(importer_run.user) + def perform(parent_identifier:, importer_run_id: nil, run_user: nil, failure_count: 0) # rubocop:disable Metrics/AbcSize + importer_run = Bulkrax::ImporterRun.find(importer_run_id) if importer_run_id + user = run_user || importer_run&.user + ability = Ability.new(user) parent_entry, parent_record = find_record(parent_identifier, importer_run_id) number_of_successes = 0 number_of_failures = 0 @@ -67,23 +70,25 @@ @child_members_added = [] if parent_record conditionally_acquire_lock_for(parent_record.id) do ActiveRecord::Base.uncached do - Bulkrax::PendingRelationship.where(parent_id: parent_identifier, importer_run_id: importer_run_id) + Bulkrax::PendingRelationship.where(parent_id: parent_identifier) .ordered.find_each do |rel| process(relationship: rel, importer_run_id: importer_run_id, parent_record: parent_record, ability: ability) number_of_successes += 1 + @parent_record_members_added = true rescue => e number_of_failures += 1 + rel.set_status_info(e, importer_run) errors << e end end # save record if members were added if @parent_record_members_added - Bulkrax.object_factory.save!(resource: parent_record, user: importer_run.user) + Bulkrax.object_factory.save!(resource: parent_record, user: user) Bulkrax.object_factory.publish(event: 'object.membership.updated', object: parent_record) Bulkrax.object_factory.update_index(resources: @child_members_added) end end else @@ -102,24 +107,29 @@ # rubocop:disable Rails/SkipsModelValidations ImporterRun.update_counters(importer_run_id, failed_relationships: number_of_failures) # rubocop:enable Rails/SkipsModelValidations parent_entry&.set_status_info(errors.last, importer_run) + failure_count += 1 - # TODO: This can create an infinite job cycle, consider a time to live tracker. - reschedule(parent_identifier: parent_identifier, importer_run_id: importer_run_id) - return false # stop current job from continuing to run after rescheduling + if failure_count < max_failure_count + reschedule( + parent_identifier: parent_identifier, + importer_run_id: importer_run_id, + run_user: run_user, + failure_count: failure_count + ) + end + return errors # stop current job from continuing to run after rescheduling else # rubocop:disable Rails/SkipsModelValidations ImporterRun.update_counters(importer_run_id, processed_relationships: number_of_successes) # rubocop:enable Rails/SkipsModelValidations end end # rubocop:enable Metrics/MethodLength - attr_reader :importer_run - private ## # We can use Hyrax's lock manager when we have one available. if defined?(::Hyrax) @@ -168,11 +178,11 @@ def add_to_collection(child_record, parent_record) Bulkrax.object_factory.add_resource_to_collection( collection: parent_record, resource: child_record, - user: importer_run.user + user: user ) end def add_to_work(child_record, parent_record) # NOTE: The .add_child_to_parent_work should not persist changes to the @@ -181,13 +191,10 @@ parent: parent_record, child: child_record ) end - def reschedule(parent_identifier:, importer_run_id:) - CreateRelationshipsJob.set(wait: 10.minutes).perform_later( - parent_identifier: parent_identifier, - importer_run_id: importer_run_id - ) + def reschedule(**kargs) + CreateRelationshipsJob.set(wait: 10.minutes).perform_later(**kargs) end end end