module InstDataShipper class Dumper include Hooks define_hook :initialize_dump_batch define_hook :finalize_dump_batch def self.perform_dump(destinations, force_full_tables: nil) raise "Must subclass Dumper to use perform_dump" if self == Dumper dumper = new(destinations) dumper.begin_dump(force_full_tables: force_full_tables) dumper.tracker end def self.define(include: [], schema: , &blk) do include(*include) if include.present? if blk.nil? && schema[:tables].any? { |t| t[:sourcer].present? } blk = -> { auto_enqueue_from_schema } elsif blk.nil? raise ArgumentError, "Must provide a block or a schema with source definitions" end define_method(:enqueue_tasks, &blk) define_method(:schema) { schema } end end def self.current(executor: nil) cur_batch = Thread.current[CanvasSync::JobBatches::CURRENT_BATCH_THREAD_KEY] ctx = cur_batch&.context || {} return nil unless ctx[:origin_class].present? && ctx[:tracker_id].present? clazz = ctx[:origin_class] clazz = clazz.constantize if clazz.is_a?(String) executor) end if defined?(Rails) && Rails.env.test? def for_specs! @raw_destinations = ["speccable://nil"] @executor = @tracker = self.class.to_s, genre: export_genre, status: 'in_progress') define_singleton_method(:spec_destination) { destinations.first } self end end public def begin_dump(force_full_tables: nil) raise "Dump already begun" unless @raw_destinations.present? @tracker = tracker = DumpBatch.create(job_class: self.class.to_s, genre: export_genre, status: 'in_progress') @batch_context = context = { # TODO Consider behavior if last is still running incremental_since: last_successful_tracker&.created_at, force_full_tables: force_full_tables || [], } destinations.each do |dest| dest.preinitialize_dump(context) end begin begin destinations.each do |dest| dest.initialize_dump(context) end run_hook(:initialize_dump_batch, context) ensure @batch_context = nil context[:tracker_id] = context[:origin_class] = batch_context[:origin_class] || self.class.to_s context[:destinations] = @raw_destinations end context.delete(:force_full_tables) if context[:force_full_tables].empty? do |batch| context[:root_bid] = tracker.update(batch_id: batch.description = "HD #{export_genre} Export #{} Root" batch.context = context batch.on(:success, "#{self.class}#finalize_dump") batch.on(:death, "#{self.class}#cleanup_fatal_error!") do enqueue_tasks rescue => ex delayed :cleanup_fatal_error! InstDataShipper.handle_suppressed_error(ex) tracker.update(status: 'failed', exception: ex.message, backtrace: ex.backtrace.join("\n")) end end rescue => ex if context batch ||= do |batch| batch.description = "HD #{export_genre} Export #{} Early Failure Cleanup" batch.context = context do delayed :cleanup_fatal_error! end end end tracker.update(status: 'failed', exception: ex.message, backtrace: ex.backtrace.join("\n")) raise ex end end def tracker @tracker ||= batch_context[:tracker_id].present? ? DumpBatch.find(batch_context[:tracker_id]) : nil end def last_successful_tracker @last_successful_tracker ||= DumpBatch.where(job_class: self.class.to_s, genre: export_genre, status: 'completed').order(created_at: :desc).first end def export_genre self.class.to_s end def origin_class batch_context[:origin_class]&.constantize || self.class end def schema return origin_class::SCHEMA if defined?(origin_class::SCHEMA) raise NotImplementedError end def schema_digest Digest::MD5.hexdigest(schema.to_json)[0...8] end def table_schema_metadata(table_def) meta = { table_warehouse_name: table_def[:warehouse_name], table_schema_hash: table_schema_hash(table_def), } meta[:table_schema_version] = table_def[:version] if table_def[:version].present? meta end def table_schema_compatible?(table_def, meta_hash) # Force full-table-upload if: # - The table is not present in the last dump return false unless meta_hash # - The table's explicitly-set versions do not match return false if meta_hash[:table_schema_version] != table_def[:version] # - The table does not have an explicitly-set version and the schema hash does not match return false if !table_def[:version].present? && meta_hash[:table_schema_hash] != table_schema_hash(table_def) true end def table_schema_hash(table_def) Digest::MD5.hexdigest(table_def.to_json)[0...8] end def table_is_incremental?(table_def) return false unless incremental_since.present? table_def = lookup_table_schema!(table_def) if table_def.is_a?(String) return false if batch_context[:force_full_tables]&.include?(table_def[:warehouse_name]) # TODO Return false if table's schema changes if (inc = table_def[:incremental]).present? differ = inc[:if] return !!incremental_since if differ.nil? differ = :"#{differ}".to_proc if differ.is_a?(Symbol) differ = instance_exec(&differ) if differ.is_a?(Proc) return !!differ end false end def incremental_since batch_context[:incremental_since] end def lookup_table_schema(*identifiers) identifiers.compact.each do |ident| if ident.is_a?(Hash) key = ident.keys.first value = ident.values.first else key = :warehouse_name value = ident end value = Array(value).compact schema[:tables].each do |ts| return ts if value.include?(ts[key]) end end nil end def lookup_table_schema!(*identifiers) lookup_table_schema(*identifiers) || raise("No table schema found for #{identifiers.inspect}") end protected attr_reader :executor def initialize(destinations = nil, executor: nil) @raw_destinations = Array(destinations) @executor = executor end def enqueue_tasks raise NotImplementedError end def enqueue_table_from_schema(table_def) table_def = lookup_table_schema!(table_def) if table_def.is_a?(String) instance_exec(table_def, &table_def[:sourcer]) end def auto_enqueue_from_schema schema[:tables].each do |table_def| src = table_def[:sourcer] next unless src.present? enqueue_table_from_schema(table_def) end end def upload_data(table_def, extra: nil, &datagen) # Allow muxing, allowing a hook to prevent some files going to certain destinations dests = destinations_for_table(table_def) dest_groups = dests.group_by do |dest| catch(:not_groupable) do next dest.group_key end :not_groupable end not_groupable = dest_groups.delete(:not_groupable) # If multiple destinations will produce the same data, only generate the chunk once... dest_groups.each do |group_key, dests| dests[0].chunk_data(datagen, table: table_def, extra: extra) do |chunk| # ...and upload it to each dests.each do |dest| dest.upload_data_chunk(table_def, chunk) end end end not_groupable&.each do |dest| dest.chunk_data(datagen, table: table_def, extra: extra) do |chunk| dest.upload_data_chunk(table_def, chunk) end end # TODO Consider how to handle errors in this method. # Retriable errors must not be allowed to bubble - if dest 1 succeeds and dest 2 fails, dest 1 must not be retried # Each destination should handle its own retries # If a destination errors-out, we should continue with the other destinations end def finalize_dump(_status, _opts) run_hook(:finalize_dump_batch) destinations.each do |dest| dest.finalize_dump end DumpBatch.find(batch_context[:tracker_id]).update(status: 'completed') end def cleanup_fatal_error!(*_args) return unless batch.present? run_hook(:finalize_dump_batch) destinations.each do |dest| dest.cleanup_fatal_error rescue => ex InstDataShipper.handle_suppressed_error(ex) end DumpBatch.find(batch_context[:tracker_id]).update(status: 'failed') CanvasSync::JobBatches::Batch.delete_prematurely!(batch_context[:root_bid]) if batch_context[:root_bid].present? end # Helper Methods def delayed(mthd, *args, **kwargs) Jobs::AsyncCaller.perform_later(self.class.to_s, mthd.to_s, *args, **kwargs) end def call_in_pool(pool, mthd, *args, **kwargs) pool = CanvasSync::JobBatches::Pool.from_pid(pool) if pool.is_a?(String) Jobs::AsyncCaller.call_from_pool(pool, self.class, mthd, *args, **kwargs) end delegate :working_dir, to: :executor def batch Thread.current[CanvasSync::JobBatches::CURRENT_BATCH_THREAD_KEY] end def batch_context @batch_context || batch&.context || {} end def destinations_for_table(table_def) destinations end def destinations @destinations ||= (@raw_destinations.presence || batch_context[:destinations] || []).map.with_index do |dest, i| dcls = InstDataShipper.resolve_destination(dest)"#{InstDataShipper.redis_prefix}:dump#{}:dest#{i}", dest, self) end end end end