module CanvasSync module Importers class BulkImporter # The batch import size can be customized by setting # the 'BULK_IMPORTER_BATCH_SIZE' environment variable DEFAULT_BATCH_SIZE = 10_000 # Does a bulk import of a set of models using the activerecord-import gem. # # @param report_file_path [String] path to the report CSV # @param mapping [Hash] a hash of the values to import. See `model_mappings.yml` for a # format example # @param klass [Object] e.g., User # @param conflict_target [Symbol] the csv column name that maps to the database column # that will determine if we need to update or insert a given row. e.g.,: canvas_user_id # @param import_args [Hash] Any arguments passed here will be passed through to ActiveRecord::BulkImport. # Note: passing the key [:on_duplicate_key_ignore] will override the default behavior of [:on_duplicate_key_update] # @yieldparam [Array] row if a block is passed in it will yield the current row from the CSV. # This can be used if you need to filter or massage the data in any way. def self.import(report_file_path, mapping, klass, conflict_target, import_args: {}) # rubocop:disable Metrics/AbcSize, Metrics/CyclomaticComplexity, Metrics/PerceivedComplexity, Metrics/LineLength csv_column_names = mapping.keys database_column_names = mapping.values.map { |value| value[:database_column_name] } rows = [] row_ids = {} database_conflict_column_name = conflict_target ? mapping[conflict_target][:database_column_name] : nil CSV.foreach(report_file_path, headers: true, header_converters: :symbol) do |row| row = yield(row) if block_given? next if row.nil? if conflict_target next if row_ids[row[conflict_target]] row_ids[row[conflict_target]] = true end rows << csv_column_names.map do |column| if mapping[column][:type].to_sym == :datetime # TODO: add some timezone config to the mapping. # In cases where the timestamp or date doesn't include a timezone, you should be able to specify one DateTime.parse(row[column]).utc rescue nil # rubocop:disable Style/RescueModifier else row[column] end end if rows.length >= batch_size perform_import(klass, database_column_names, rows, database_conflict_column_name, import_args) rows = [] row_ids = {} end end perform_import(klass, database_column_names, rows, database_conflict_column_name, import_args) end def self.perform_import(klass, columns, rows, conflict_target, import_args={}) return if rows.length.zero? columns = columns.dup update_conditions = { condition: condition_sql(klass, columns), columns: columns } update_conditions[:conflict_target] = conflict_target if conflict_target options = { validate: false, on_duplicate_key_update: update_conditions }.merge(import_args) options.delete(:on_duplicate_key_update) if options.key?(:on_duplicate_key_ignore) klass.import(columns, rows, options) end # This method generates SQL that looks like: # (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email) # # This prevents activerecord-import from setting the `updated_at` column for # rows that haven't actually changed. This allows you to query for rows that have changed # by doing something like: # # started_at = Time.now # run_the_users_sync! # changed = User.where("updated_at >= ?", started_at) def self.condition_sql(klass, columns) columns_str = columns.map { |c| "#{klass.quoted_table_name}.#{c}" }.join(", ") excluded_str = columns.map { |c| "EXCLUDED.#{c}" }.join(", ") "(#{columns_str}) IS DISTINCT FROM (#{excluded_str})" end def self.batch_size batch_size = ENV["BULK_IMPORTER_BATCH_SIZE"].to_i batch_size > 0 ? batch_size : DEFAULT_BATCH_SIZE end end end end