module InstDataShipper class Dumper include Hooks define_hook :initialize_dump_batch define_hook :finalize_dump_batch def self.perform_dump(destinations) raise "Must subclass Dumper to use perform_dump" if self == Dumper dumper = new(destinations) dumper.begin_dump dumper.tracker end def self.define(include: [], schema: , &blk) Class.new(self) do include(*include) define_method(:enqueue_tasks, &blk) define_method(:table_schemas) { schema } end end public def begin_dump raise "Dump already begun" unless @raw_destinations.present? @tracker = tracker = DumpBatch.create(job_class: self.class.to_s, genre: export_genre, status: 'in_progress') @batch_context = context = { # TODO Allow to be hooked by Destination, likely via initialize_dump_batch and batch_context, so that if an earlier destination fails we can resend data # TODO Consider behavior if last is still running incremental_since: DumpBatch.where(genre: export_genre, status: 'completed').order(created_at: :desc).first&.created_at, } begin begin destinations.each do |dest| dest.initialize_dump() end run_hook(:initialize_dump_batch, context) ensure @batch_context = nil context[:tracker_id] = tracker.id context[:origin_class] = batch_context[:origin_class] || self.class.to_s context[:destinations] = @raw_destinations end Sidekiq::Batch.new.tap do |batch| context[:root_bid] = batch.bid batch.description = "HD #{export_genre} Export #{tracker.id} Root" batch.context = context batch.on(:success, "#{self.class}#finalize_dump") batch.on(:death, "#{self.class}#cleanup_fatal_error!") batch.jobs do enqueue_tasks rescue => ex delayed :cleanup_fatal_error! InstDataShipper.handle_suppressed_error(ex) end end rescue => ex if context batch ||= Sidekiq::Batch.new.tap do |batch| batch.description = "HD #{export_genre} Export #{tracker.id} Early Failure Cleanup" batch.context = context batch.jobs do delayed :cleanup_fatal_error! end end end raise ex end end def tracker @tracker ||= batch_context[:tracker_id].present? ? DumpBatch.find(batch_context[:tracker_id]) : nil end def export_genre self.class.to_s.gsub(/HD|ExportJob/, '') end def origin_class batch_context[:origin_class]&.constantize || self.class end def table_is_incremental?(table_def) if (inc = table_def[:incremental]).present? differ = inc[:if] return !!incremental_since if differ.nil? differ = :"#{differ}".to_proc if differ.is_a?(Symbol) differ = instance_exec(&differ) if differ.is_a?(Proc) return !!differ end false end def incremental_since batch_context[:incremental_since] end def lookup_table_schema(*identifiers) identifiers.compact.each do |ident| if ident.is_a?(Hash) key = ident.keys.first value = ident.values.first else key = :warehouse_name value = ident end value = Array(value).compact table_schemas.each do |ts| return ts if value.include?(ts[key]) end end nil end def lookup_table_schema!(*identifiers) lookup_table_schema(*identifiers) || raise("No table schema found for #{identifiers.inspect}") end protected attr_reader :executor def initialize(destinations = nil, executor: nil) @raw_destinations = Array(destinations) @executor = executor end def enqueue_tasks raise NotImplementedError end def upload_data(table_def, extra: nil, &datagen) # Allow muxing, allowing a hook to prevent some files going to certain destinations dests = destinations_for_table(table_def) dest_groups = dests.group_by do |dest| catch(:not_groupable) do next dest.group_key end :not_groupable end not_groupable = dest_groups.delete(:not_groupable) # If multiple destinations will produce the same data, only generate the chunk once... dest_groups.each do |group_key, dests| dests[0].chunk_data(datagen, table: table_def, extra: extra) do |chunk| # ...and upload it to each dests.each do |dest| dest.upload_data_chunk(table_def, chunk) end end end not_groupable&.each do |dest| dest.chunk_data(datagen, table: table_def, extra: extra) do |chunk| dest.upload_data_chunk(table_def, chunk) end end # TODO Consider how to handle errors in this method. # Retriable errors must not be allowed to bubble - if dest 1 succeeds and dest 2 fails, dest 1 must not be retried # Each destination should handle its own retries # If a destination errors-out, we should continue with the other destinations end def finalize_dump(_status, _opts) run_hook(:finalize_dump_batch) destinations.each do |dest| dest.finalize_dump end DumpBatch.find(batch_context[:tracker_id]).update(status: 'completed') end def cleanup_fatal_error!(*_args) return unless batch.present? run_hook(:finalize_dump_batch) destinations.each do |dest| dest.cleanup_fatal_error rescue => ex InstDataShipper.handle_suppressed_error(ex) end DumpBatch.find(batch_context[:tracker_id]).update(status: 'failed') CanvasSync::JobBatches::Batch.delete_prematurely!(batch_context[:root_bid]) if batch_context[:root_bid].present? end # Helper Methods def table_schemas return origin_class::TABLE_SCHEMAS if defined?(origin_class::TABLE_SCHEMAS) raise NotImplementedError end def delayed(mthd, *args, **kwargs) Jobs::AsyncCaller.perform_later(self.class.to_s, mthd.to_s, *args, **kwargs) end delegate :working_dir, to: :executor def batch Thread.current[CanvasSync::JobBatches::CURRENT_BATCH_THREAD_KEY] end def batch_context @batch_context || batch&.context || {} end def destinations_for_table(table_def) destinations end def destinations @destinations ||= (@raw_destinations.presence || batch_context[:destinations]).map.with_index do |dest, i| dcls = InstDataShipper.resolve_destination(dest) dcls.new("#{InstDataShipper.redis_prefix}:dump#{tracker.id}:dest#{i}", dest, self) end end end end