module CanvasSync module Importers class BulkImporter # The batch import size can be customized by setting # the 'BULK_IMPORTER_BATCH_SIZE' environment variable DEFAULT_BATCH_SIZE = 10_000 # Does a bulk import of a set of models using the activerecord-import gem. # # @param report_file_path [String] path to the report CSV # @param mapping [Hash] a hash of the values to import. The keys are the CSV column names and # the values are the database column names. {CanvasSync::Processors::ProvisioningReportProcessor::USERS_CSV_MAPPING Example} # @param klass [Object] e.g., User # @param conflict_target [Symbol] represents the database column that will determine if we need to update # or insert a given row. e.g.,: canvas_user_id # @param exclude_duplicates [Boolean] importing will break if the file has any duplicate rows. Set this # to true in order to have the bulk importer filter those out. # @yieldparam [Array] row if a block is passed in it will yield the current row from the CSV. # This can be used if you need to filter or massage the data in any way. def self.import(report_file_path, mapping, klass, conflict_target, exclude_duplicates=false) csv_column_names = mapping.keys database_column_names = mapping.values rows = [] row_ids = {} CSV.foreach(report_file_path, headers: true, header_converters: :symbol) do |row| row = yield(row) if block_given? next if row.nil? if exclude_duplicates next if row_ids[row[conflict_target]] row_ids[row[conflict_target]] = true end rows << csv_column_names.map { |column| row[column] } if rows.length >= batch_size perform_import(klass, database_column_names, rows, conflict_target) rows = [] end end perform_import(klass, database_column_names, rows, conflict_target) end private def self.perform_import(klass, columns, rows, conflict_target) return if rows.length == 0 columns = columns.dup klass.import(columns, rows, validate: false, on_duplicate_key_update: { conflict_target: conflict_target, condition: condition_sql(klass, columns), columns: columns }) end # This method generates SQL that looks like: # (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email) # # This prevents activerecord-import from setting the `updated_at` column for # rows that haven't actually changed. This allows you to query for rows that have changed # by doing something like: # # started_at = Time.now # run_the_users_sync! # changed = User.where("updated_at >= ?", started_at) def self.condition_sql(klass, columns) columns_str = columns.map { |c| "#{klass.quoted_table_name}.#{c}" }.join(", ") excluded_str = columns.map { |c| "EXCLUDED.#{c}" }.join(", ") "(#{columns_str}) IS DISTINCT FROM (#{excluded_str})" end def self.batch_size batch_size = ENV['BULK_IMPORTER_BATCH_SIZE'].to_i batch_size > 0 ? batch_size : DEFAULT_BATCH_SIZE end end end end