module CanvasSync module Importers class BulkImporter # The batch import size can be customized by setting # the 'BULK_IMPORTER_BATCH_SIZE' environment variable DEFAULT_BATCH_SIZE = 10_000 # Does a bulk import of a set of models using the activerecord-import gem. # # @param report_file_path [String] path to the report CSV # @param mapping [Hash] a hash of the values to import. See `model_mappings.yml` for a # format example # @param klass [Object] e.g., User # @param conflict_target [Symbol] the csv column name that maps to the database column # that will determine if we need to update or insert a given row. e.g.,: canvas_user_id # @param import_args [Hash] Any arguments passed here will be passed through to ActiveRecord::BulkImport. # Note: passing the key [:on_duplicate_key_ignore] will override the default behavior of [:on_duplicate_key_update] # @yieldparam [Array] row if a block is passed in it will yield the current row from the CSV. # This can be used if you need to filter or massage the data in any way. def self.import(report_file_path, mapping, klass, conflict_target, import_args: {}) csv_column_names = mapping.keys database_column_names = mapping.values.map {|value| value[:database_column_name]} rows = [] row_ids = {} CSV.foreach(report_file_path, headers: true, header_converters: :symbol) do |row| row = yield(row) if block_given? next if row.nil? next if row_ids[row[conflict_target]] row_ids[row[conflict_target]] = true rows << csv_column_names.map do |column| if mapping[column][:type].to_sym == :datetime # todo - add some timezone config to the mapping. # In cases where the timestamp or date doesn't include a timezone, you should be able to specify one DateTime.parse(row[column]).utc rescue nil else row[column] end end if rows.length >= batch_size perform_import(klass, database_column_names, rows, conflict_target, import_args) rows = [] row_ids = {} end end perform_import(klass, database_column_names, rows, mapping[conflict_target][:database_column_name], import_args) end private def self.perform_import(klass, columns, rows, conflict_target, import_args={}) return if rows.length == 0 columns = columns.dup options = {validate: false, on_duplicate_key_update: { conflict_target: conflict_target, condition: condition_sql(klass, columns), columns: columns }}.merge(import_args) options.delete(:on_duplicate_key_update) if options.has_key?(:on_duplicate_key_ignore) klass.import(columns, rows, options) end # This method generates SQL that looks like: # (users.sis_id, users.email) IS DISTINCT FROM (EXCLUDED.sis_id, EXCLUDED.email) # # This prevents activerecord-import from setting the `updated_at` column for # rows that haven't actually changed. This allows you to query for rows that have changed # by doing something like: # # started_at = Time.now # run_the_users_sync! # changed = User.where("updated_at >= ?", started_at) def self.condition_sql(klass, columns) columns_str = columns.map {|c| "#{klass.quoted_table_name}.#{c}"}.join(", ") excluded_str = columns.map {|c| "EXCLUDED.#{c}"}.join(", ") "(#{columns_str}) IS DISTINCT FROM (#{excluded_str})" end def self.batch_size batch_size = ENV['BULK_IMPORTER_BATCH_SIZE'].to_i batch_size > 0 ? batch_size : DEFAULT_BATCH_SIZE end end end end