module InstDataShipper # This class ends up fill two roles - Schema and Mapping. # It makes for a clean API, but it's a little less canonical since, (eg) the S3 destination doesn't need column type annotations. class SchemaBuilder attr_reader :schema def initialize @schema = { tables: [], } end def self.build(&block) builder = new builder.instance_exec(&block) builder.schema end def version(version) @schema[:version] = version end def extend_table_builder(modul = nil, &block) @table_builder_class ||= Class.new(TableSchemaBuilder) @table_builder_class.class_eval(&block) if block.present? @table_builder_class.extend(modul) if modul.present? end def table(model_or_name, description = nil, model: nil, query: nil, **extra, &block) tdef = { warehouse_name: nil, description: description, columns: [], model: model, query: query, **extra, } if model_or_name.is_a?(ActiveRecord::Relation) raise "model specified twice" if model.present? raise "query specified twice" if query.present? tdef[:query] = model_or_name tdef[:model] = model_or_name.model tdef[:warehouse_name] = model_or_name.model.table_name elsif model_or_name.is_a?(Class) && model_or_name < ActiveRecord::Base tdef[:warehouse_name] = model_or_name.table_name tdef[:model] = model_or_name else tdef[:warehouse_name] = model_or_name end (@table_builder_class || TableSchemaBuilder).build(tdef, &block) @schema[:tables] << tdef tdef end class TableSchemaBuilder attr_reader :options, :columns def initialize(table) @options = table @columns = table[:columns] end def self.build(tdef, &block) builder = new(tdef) builder.instance_exec(&block) builder.options end def annotate(key, value) options[key] = value end def version(version) options[:version] = version end def incremental(scope=nil, **kwargs) if (extras = kwargs.keys - %i[on if]).present? raise ArgumentError, "Unsuppored options: #{extras.inspect}" end options[:incremental] = { on: Array(kwargs[:on]), scope: scope, if: kwargs[:if], } end def source(source, override_model=nil, **kwargs) raise "Source already set" if options[:sourcer].present? if source.is_a?(Symbol) mthd = :"import_#{source}" options = self.options source = ->(table_def) { send(mthd, override_model || options[:model] || options[:warehouse_name], schema_name: options[:warehouse_name], **kwargs) } end options[:sourcer] = source end def column(name, *args, refs: [], from: nil, **extra, &block) from ||= name.to_s cdef = { warehouse_name: name.to_s, from: from, **extra, } if args[0].is_a?(Symbol) cdef[:type] = args.shift() end if args[0].is_a?(String) cdef[:description] = args.shift() end if args.present? raise ArgumentError, "Received unexpected arguments: #{args.inspect}" end cdef[:references] = Array(refs) if options[:model].is_a?(Class) && cdef[:local_name].to_s.ends_with?('_id') rel_name = cdef[:local_name].to_s[0...-3] refl = options[:model].reflections[rel_name] cdef[:references] << "#{refl.klass}##{refl.options[:primary_key] || 'id'}" if refl.present? && !refl.polymorphic? end compiled_from = compile_transformer(from) cdef[:block] = ->(row) { value = instance_exec(row, &compiled_from) value = instance_exec(value, row, &block) if block.present? value } @columns << cdef cdef end protected def compile_transformer(from) if from.present? if from.is_a?(Symbol) ->(row) { row.send(from) } elsif from.is_a?(Proc) from elsif from.is_a?(String) ->(row) { row[from] } else raise ArgumentError, "Invalid transformer: #{from.inspect}" end else ->(row) { row } end end end end end