module CanvasSync module Importers class BulkImporter # The batch import size can be customized by setting # the 'BULK_IMPORTER_BATCH_SIZE' environment variable DEFAULT_BATCH_SIZE = 10_000 # Does a bulk import of a set of models using the activerecord-import gem. # # @param report_file_path [String] path to the report CSV # @param mapping [Hash] a hash of the values to import. See `model_mappings.yml` for a # format example # @param klass [Object] e.g., User # @param conflict_target [Symbol] the csv column name that maps to the database column # that will determine if we need to update or insert a given row. e.g.,: canvas_user_id # @param import_args [Hash] Any arguments passed here will be passed through to ActiveRecord::BulkImport. # Note: passing the key [:on_duplicate_key_ignore] will override the default behavior of [:on_duplicate_key_update] # @yieldparam [Array] row if a block is passed in it will yield the current row from the CSV. # This can be used if you need to filter or massage the data in any way. def self.import(report_file_path, mapping, klass, conflict_target, import_args: {}, &blk) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/LineLength ClassCallbackExecutor.run_if_defined(klass, :sync_import) do perform_in_batches(report_file_path, mapping, klass, conflict_target, import_args: import_args, &blk) end end def self.perform_in_batches(report_file_path, raw_mapping, klass, conflict_target, import_args: {}) mapping = {}.with_indifferent_access raw_mapping.each do |db_col, opts| next if opts[:deprecated] && !klass.column_names.include?(db_col.to_s) mapping[db_col] = opts end csv_column_names = mapping.values.map { |value| value[:report_column].to_s } database_column_names = mapping.keys conflict_target = Array(conflict_target).map(&:to_s) conflict_target_indices = conflict_target.map{|ct| database_column_names.index(ct) } row_ids = {} batcher = CanvasSync::BatchProcessor.new(of: batch_size) do |batch| row_ids = {} perform_import(klass, database_column_names, batch, conflict_target, import_args) end row_buffer_out = ->(row) { formatted_row = mapping.map do |db_col, col_def| value = nil value = row[col_def[:report_column]] if col_def[:report_column] if col_def[:type] if col_def[:type].to_sym == :datetime # TODO: add some timezone config to the mapping. # In cases where the timestamp or date doesn't include a timezone, you should be able to specify one value = DateTime.parse(value).utc rescue nil # rubocop:disable Style/RescueModifier end end value = col_def[:transform].call(value, row) if col_def[:transform] value end if conflict_target.present? key = conflict_target_indices.map{|ct| formatted_row[ct] } next if row_ids[key] row_ids[key] = true end batcher << formatted_row } row_buffer = nil if defined?(User) && klass == User && csv_column_names.include?('user_id') row_buffer = UserRowBuffer.new(&row_buffer_out) else row_buffer = NullRowBuffer.new(&row_buffer_out) end CSV.foreach(report_file_path, headers: true, header_converters: :symbol) do |row| row = yield(row) if block_given? next if row.nil? row_buffer << row end row_buffer.flush batcher.flush end def self.perform_import(klass, columns, rows, conflict_target, import_args={}) return if rows.length.zero? columns = columns.dup update_conditions = { condition: condition_sql(klass, columns, import_args[:sync_start_time]), columns: columns } update_conditions[:conflict_target] = conflict_target if conflict_target.present? options = { validate: false, on_duplicate_key_update: update_conditions }.merge(import_args) options.delete(:on_duplicate_key_update) if options.key?(:on_duplicate_key_ignore) result = klass.import(columns, rows, options) global_updates = { canvas_synced_at: DateTime.now, canvas_sync_batch_id: JobBatches::Batch.current_context[:sync_batch_id], } global_updates.slice!(*klass.column_names.map(&:to_sym)) if global_updates.present? && result.ids.present? klass.where(id: result.ids).update_all(global_updates) end result end # This method generates SQL that looks like: # (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email) # # This prevents activerecord-import from setting the `updated_at` column for # rows that haven't actually changed. This allows you to query for rows that have changed # by doing something like: # # started_at = Time.now # run_the_users_sync! # changed = User.where("updated_at >= ?", started_at) def self.condition_sql(klass, columns, report_start = nil) columns_str = columns.map { |c| "#{klass.quoted_table_name}.#{c}" }.join(", ") excluded_str = columns.map { |c| "EXCLUDED.#{c}" }.join(", ") condition_sql = "(#{columns_str}) IS DISTINCT FROM (#{excluded_str})" if klass.column_names.include?("canvas_synced_at") && report_start condition_sql += " AND #{klass.quoted_table_name}.canvas_synced_at < '#{report_start}'" elsif klass.column_names.include?("updated_at") && report_start condition_sql += " AND #{klass.quoted_table_name}.updated_at < '#{report_start}'" end condition_sql end def self.batch_size batch_size = ENV["BULK_IMPORTER_BATCH_SIZE"].to_i batch_size > 0 ? batch_size : DEFAULT_BATCH_SIZE end class RowBuffer def initialize(&block) @flush_out = block @buffered_rows = [] end def <<(v) @buffered_rows << v end def flush(value = @buffered_rows) if value.is_a?(Array) value.each do |v| @flush_out.call(v) end else @flush_out.call(value) end @buffered_rows = [] end end class NullRowBuffer def initialize(&block) @flush_out = block end def <<(v) @flush_out.call(v) end def flush; end end # Ensures that, if a User has multiple rows, one with a SIS ID is preferred. # This is mainly to fix issues in legacy apps - the suggested approach for new apps # is to sync and use the Pseudonymes table class UserRowBuffer < RowBuffer def <<(v) flush if @buffered_rows[0] && @buffered_rows[0][:canvas_user_id] != v[:canvas_user_id] super end def flush row = @buffered_rows.find{|r| r[:user_id].present? } || @buffered_rows.last super(row.present? ? [row] : []) end end end end end