module InstDataShipper class Dumper include Hooks define_hook :initialize_dump_batch define_hook :finalize_dump_batch def self.perform_dump(destinations) raise "Must subclass Dumper to use perform_dump" if self == Dumper dumper = new(destinations) dumper.begin_dump dumper.tracker end def self.define(include: [], schema: , &blk) Class.new(self) do include(*include) if blk.nil? && schema[:tables].any? { |t| t[:sourcer].present? } blk = -> { auto_enqueue_from_schema } elsif blk.nil? raise ArgumentError, "Must provide a block or a schema with source definitions" end define_method(:enqueue_tasks, &blk) define_method(:schema) { schema } end end def self.current(executor: nil) cur_batch = Thread.current[CanvasSync::JobBatches::CURRENT_BATCH_THREAD_KEY] ctx = cur_batch&.context || {} return nil unless ctx[:origin_class].present? && ctx[:tracker_id].present? clazz = ctx[:origin_class] clazz = clazz.constantize if clazz.is_a?(String) clazz.new(executor: executor) end if defined?(Rails) && Rails.env.test? def for_specs! @raw_destinations = ["speccable://nil"] @executor = InstDataShipper::Jobs::AsyncCaller.new @tracker = DumpBatch.new(job_class: self.class.to_s, genre: export_genre, status: 'in_progress') define_singleton_method(:spec_destination) { destinations.first } self end end public def begin_dump raise "Dump already begun" unless @raw_destinations.present? @tracker = tracker = DumpBatch.create(job_class: self.class.to_s, genre: export_genre, status: 'in_progress') @batch_context = context = { # TODO Consider behavior if last is still running incremental_since: last_successful_tracker&.created_at, } destinations.each do |dest| dest.preinitialize_dump(context) end begin begin destinations.each do |dest| dest.initialize_dump(context) end run_hook(:initialize_dump_batch, context) ensure @batch_context = nil context[:tracker_id] = tracker.id context[:origin_class] = batch_context[:origin_class] || self.class.to_s context[:destinations] = @raw_destinations end Sidekiq::Batch.new.tap do |batch| context[:root_bid] = batch.bid tracker.update(batch_id: batch.bid) batch.description = "HD #{export_genre} Export #{tracker.id} Root" batch.context = context batch.on(:success, "#{self.class}#finalize_dump") batch.on(:death, "#{self.class}#cleanup_fatal_error!") batch.jobs do enqueue_tasks rescue => ex delayed :cleanup_fatal_error! InstDataShipper.handle_suppressed_error(ex) tracker.update(status: 'failed', exception: ex.message, backtrace: ex.backtrace.join("\n")) end end rescue => ex if context batch ||= Sidekiq::Batch.new.tap do |batch| batch.description = "HD #{export_genre} Export #{tracker.id} Early Failure Cleanup" batch.context = context batch.jobs do delayed :cleanup_fatal_error! end end end tracker.update(status: 'failed', exception: ex.message, backtrace: ex.backtrace.join("\n")) raise ex end end def tracker @tracker ||= batch_context[:tracker_id].present? ? DumpBatch.find(batch_context[:tracker_id]) : nil end def last_successful_tracker @last_successful_tracker ||= DumpBatch.where(job_class: self.class.to_s, genre: export_genre, status: 'completed').order(created_at: :desc).first end def export_genre self.class.to_s end def origin_class batch_context[:origin_class]&.constantize || self.class end def schema return origin_class::SCHEMA if defined?(origin_class::SCHEMA) raise NotImplementedError end def schema_digest Digest::MD5.hexdigest(schema.to_json)[0...8] end def table_is_incremental?(table_def) return false unless incremental_since.present? # TODO Return false if table's schema changes if (inc = table_def[:incremental]).present? differ = inc[:if] return !!incremental_since if differ.nil? differ = :"#{differ}".to_proc if differ.is_a?(Symbol) differ = instance_exec(&differ) if differ.is_a?(Proc) return !!differ end false end def incremental_since batch_context[:incremental_since] end def lookup_table_schema(*identifiers) identifiers.compact.each do |ident| if ident.is_a?(Hash) key = ident.keys.first value = ident.values.first else key = :warehouse_name value = ident end value = Array(value).compact schema[:tables].each do |ts| return ts if value.include?(ts[key]) end end nil end def lookup_table_schema!(*identifiers) lookup_table_schema(*identifiers) || raise("No table schema found for #{identifiers.inspect}") end protected attr_reader :executor def initialize(destinations = nil, executor: nil) @raw_destinations = Array(destinations) @executor = executor end def enqueue_tasks raise NotImplementedError end def auto_enqueue_from_schema schema[:tables].each do |table_def| src = table_def[:sourcer] next unless src.present? instance_exec(table_def, &src) end end def upload_data(table_def, extra: nil, &datagen) # Allow muxing, allowing a hook to prevent some files going to certain destinations dests = destinations_for_table(table_def) dest_groups = dests.group_by do |dest| catch(:not_groupable) do next dest.group_key end :not_groupable end not_groupable = dest_groups.delete(:not_groupable) # If multiple destinations will produce the same data, only generate the chunk once... dest_groups.each do |group_key, dests| dests[0].chunk_data(datagen, table: table_def, extra: extra) do |chunk| # ...and upload it to each dests.each do |dest| dest.upload_data_chunk(table_def, chunk) end end end not_groupable&.each do |dest| dest.chunk_data(datagen, table: table_def, extra: extra) do |chunk| dest.upload_data_chunk(table_def, chunk) end end # TODO Consider how to handle errors in this method. # Retriable errors must not be allowed to bubble - if dest 1 succeeds and dest 2 fails, dest 1 must not be retried # Each destination should handle its own retries # If a destination errors-out, we should continue with the other destinations end def finalize_dump(_status, _opts) run_hook(:finalize_dump_batch) destinations.each do |dest| dest.finalize_dump end DumpBatch.find(batch_context[:tracker_id]).update(status: 'completed') end def cleanup_fatal_error!(*_args) return unless batch.present? run_hook(:finalize_dump_batch) destinations.each do |dest| dest.cleanup_fatal_error rescue => ex InstDataShipper.handle_suppressed_error(ex) end DumpBatch.find(batch_context[:tracker_id]).update(status: 'failed') CanvasSync::JobBatches::Batch.delete_prematurely!(batch_context[:root_bid]) if batch_context[:root_bid].present? end # Helper Methods def delayed(mthd, *args, **kwargs) Jobs::AsyncCaller.perform_later(self.class.to_s, mthd.to_s, *args, **kwargs) end delegate :working_dir, to: :executor def batch Thread.current[CanvasSync::JobBatches::CURRENT_BATCH_THREAD_KEY] end def batch_context @batch_context || batch&.context || {} end def destinations_for_table(table_def) destinations end def destinations @destinations ||= (@raw_destinations.presence || batch_context[:destinations] || []).map.with_index do |dest, i| dcls = InstDataShipper.resolve_destination(dest) dcls.new("#{InstDataShipper.redis_prefix}:dump#{tracker.id}:dest#{i}", dest, self) end end end end