app/jobs/bulkrax/create_relationships_job.rb in bulkrax-8.1.0 vs app/jobs/bulkrax/create_relationships_job.rb in bulkrax-8.2.0
- old
+ new
@@ -36,15 +36,17 @@
# # In config/initializers/bulkrax.rb
# Bulkrax::CreateRelationshipsJob.update_child_records_works_file_sets = true
#
# @see https://github.com/scientist-softserv/louisville-hyku/commit/128a9ef
class_attribute :update_child_records_works_file_sets, default: false
+ class_attribute :max_failure_count, default: 5
include DynamicRecordLookup
queue_as Bulkrax.config.ingest_queue_name
+ attr_accessor :user, :importer_run, :errors
##
# @param parent_identifier [String] Work/Collection ID or Bulkrax::Entry source_identifiers
# @param importer_run [Bulkrax::ImporterRun] current importer run (needed to properly update counters)
#
# The entry_identifier is used to lookup the @base_entry for the job (a.k.a. the entry the job was called from).
@@ -52,13 +54,14 @@
# Whether the @base_entry is the parent or the child in the relationship is determined by the presence of a
# parent_identifier or child_identifier param. For example, if a parent_identifier is passed, we know @base_entry
# is the child in the relationship, and vice versa if a child_identifier is passed.
#
# rubocop:disable Metrics/MethodLength
- def perform(parent_identifier:, importer_run_id:) # rubocop:disable Metrics/AbcSize
- @importer_run = Bulkrax::ImporterRun.find(importer_run_id)
- ability = Ability.new(importer_run.user)
+ def perform(parent_identifier:, importer_run_id: nil, run_user: nil, failure_count: 0) # rubocop:disable Metrics/AbcSize
+ importer_run = Bulkrax::ImporterRun.find(importer_run_id) if importer_run_id
+ user = run_user || importer_run&.user
+ ability = Ability.new(user)
parent_entry, parent_record = find_record(parent_identifier, importer_run_id)
number_of_successes = 0
number_of_failures = 0
@@ -67,23 +70,25 @@
@child_members_added = []
if parent_record
conditionally_acquire_lock_for(parent_record.id) do
ActiveRecord::Base.uncached do
- Bulkrax::PendingRelationship.where(parent_id: parent_identifier, importer_run_id: importer_run_id)
+ Bulkrax::PendingRelationship.where(parent_id: parent_identifier)
.ordered.find_each do |rel|
process(relationship: rel, importer_run_id: importer_run_id, parent_record: parent_record, ability: ability)
number_of_successes += 1
+ @parent_record_members_added = true
rescue => e
number_of_failures += 1
+ rel.set_status_info(e, importer_run)
errors << e
end
end
# save record if members were added
if @parent_record_members_added
- Bulkrax.object_factory.save!(resource: parent_record, user: importer_run.user)
+ Bulkrax.object_factory.save!(resource: parent_record, user: user)
Bulkrax.object_factory.publish(event: 'object.membership.updated', object: parent_record)
Bulkrax.object_factory.update_index(resources: @child_members_added)
end
end
else
@@ -102,24 +107,29 @@
# rubocop:disable Rails/SkipsModelValidations
ImporterRun.update_counters(importer_run_id, failed_relationships: number_of_failures)
# rubocop:enable Rails/SkipsModelValidations
parent_entry&.set_status_info(errors.last, importer_run)
+ failure_count += 1
- # TODO: This can create an infinite job cycle, consider a time to live tracker.
- reschedule(parent_identifier: parent_identifier, importer_run_id: importer_run_id)
- return false # stop current job from continuing to run after rescheduling
+ if failure_count < max_failure_count
+ reschedule(
+ parent_identifier: parent_identifier,
+ importer_run_id: importer_run_id,
+ run_user: run_user,
+ failure_count: failure_count
+ )
+ end
+ return errors # stop current job from continuing to run after rescheduling
else
# rubocop:disable Rails/SkipsModelValidations
ImporterRun.update_counters(importer_run_id, processed_relationships: number_of_successes)
# rubocop:enable Rails/SkipsModelValidations
end
end
# rubocop:enable Metrics/MethodLength
- attr_reader :importer_run
-
private
##
# We can use Hyrax's lock manager when we have one available.
if defined?(::Hyrax)
@@ -168,11 +178,11 @@
def add_to_collection(child_record, parent_record)
Bulkrax.object_factory.add_resource_to_collection(
collection: parent_record,
resource: child_record,
- user: importer_run.user
+ user: user
)
end
def add_to_work(child_record, parent_record)
# NOTE: The .add_child_to_parent_work should not persist changes to the
@@ -181,13 +191,10 @@
parent: parent_record,
child: child_record
)
end
- def reschedule(parent_identifier:, importer_run_id:)
- CreateRelationshipsJob.set(wait: 10.minutes).perform_later(
- parent_identifier: parent_identifier,
- importer_run_id: importer_run_id
- )
+ def reschedule(**kargs)
+ CreateRelationshipsJob.set(wait: 10.minutes).perform_later(**kargs)
end
end
end