module InstDataShipper class Dumper include Hooks define_hook :initialize_dump_batch define_hook :finalize_dump_batch def self.perform_dump(destinations:) raise "Must subclass Dumper to use perform_dump" if self == Dumper dumper = new(destinations) dumper.begin_dump dumper.tracker end protected attr_reader :executor def initialize(destinations = nil, executor: nil) @raw_destinations = destinations @executor = executor end def enqueue_tasks raise NotImplementedError end def begin_dump raise "Dump already begun" unless @raw_destinations.present? @tracker = tracker = DumpBatch.create(job_class: self.class.to_s, status: 'in_progress') destinations.each do |dest| dest.initialize_dump() end context = {} run_hook(:initialize_dump_batch, context) Sidekiq::Batch.new.tap do |batch| batch.description = "HD #{export_genre} Export #{tracker.id} Root" batch.context = { **context, root_bid: batch.bid, tracker_id: tracker.id, origin_class: batch_context[:origin_class] || self.class.to_s, destinations: @raw_destinations, } batch.on(:success, "#{self.class}#finalize_dump") batch.on(:death, "#{self.class}#cleanup_fatal_error!") batch.jobs do enqueue_tasks end end # TODO Catch errors in here and cleanup as needed end def upload_data(table_def, extra: nil, &datagen) # Allow muxing, allowing a hook to prevent some files going to certain destinations dests = destinations_for_table(table_def) dest_groups = dests.group_by do |dest| catch(:not_groupable) do next dest.group_key end :not_groupable end not_groupable = dest_groups.delete(:not_groupable) # If multiple destinations will produce the same data, only generate the chunk once... dest_groups.each do |group_key, dests| dests[0].chunk_data(datagen, table: table_def, extra: extra) do |chunk| # ...and upload it to each dests.each do |dest| dest.upload_data_chunk(table_def, chunk) end end end not_groupable&.each do |dest| dest.chunk_data(datagen, table: table_def, extra: extra) do |chunk| dest.upload_data_chunk(table_def, chunk) end end # TODO Consider how to handle errors in this method. # Retriable errors must not be allowed to bubble - if dest 1 succeeds and dest 2 fails, dest 1 must not be retried # Each destination should handle its own retries # If a destination errors-out, we should continue with the other destinations end def finalize_dump(_status, _opts) run_hook(:finalize_dump_batch) destination.each do |dest| dest.finalize_dump end DumpBatch.find(batch_context[:tracker_id]).update(status: 'completed') end def cleanup_fatal_error!(*_args) return unless batch.present? run_hook(:finalize_dump_batch) destination.each do |dest| dest.cleanup_fatal_error rescue StandardError # rubocop:disable Lint/SuppressedException end DumpBatch.find(batch_context[:tracker_id]).update(status: 'failed') CanvasSync::JobBatches::Batch.delete_prematurely!(batch_context[:root_bid]) end # Helper Methods def table_schemas return origin_class::TABLE_SCHEMAS if defined?(origin_class::TABLE_SCHEMAS) raise NotImplementedError end def delayed(mthd, *args, **kwargs) AsyncCaller.perform_later(self.class.to_s, mthd.to_s, *args, **kwargs) end def tracker @tracker ||= batch_context[:tracker_id].present? ? DumpBatch.find(batch_context[:tracker_id]) : nil end def export_genre self.class.to_s.gsub(/HD|ExportJob/, '') end def origin_class batch_context[:origin_class]&.constantize || self.class end def working_dir executor.working_dir end def destinations_for_table(table_def) destinations end def destinations @destinations ||= (@raw_destinations || batch_context[:destinations]).map.with_index do |dest, i| dcls = InstDataShipper.resolve_destination(dest) dcls.new("#{InstDataShipper.redis_prefix}:dump#{tracker.id}:dest#{i}", dest, self) end end end end