require "faraday_middleware" module InstDataShipper module Destinations class HostedData < Base include Concerns::Chunking def preinitialize_dump(context) if context[:incremental_since].present? begin last_dump = hosted_data_client.get("api/v1/custom_dumps/last", { status: 'imported', tags: [ # We could also include app in the filter, but each app should already have a distinct key in HD "ids-genre=#{dumper.export_genre}", ], include: ["schema"], }).body.with_indifferent_access if last_dump[:created_at] < context[:incremental_since] InstDataShipper.logger.info("Last successful HostedData dump is older than incremental_since - bumping back incremental_since") context[:incremental_since] = last_dump[:created_at] end if (hd_tables = last_dump[:schema]).present? metadatas = hd_tables.values.map { |t| t[:ids_meta] }.compact.map { |t| [t[:table_warehouse_name], t] }.to_h schema[:tables].each do |ts| hd_meta = metadatas[ts[:warehouse_name]] if !dumper.table_schema_compatible?(ts, hd_meta) InstDataShipper.logger.info("Last successful HostedData dump of #{ts[:warehouse_name]} has a different schema - forcing full table") context[:force_full_tables] << ts[:warehouse_name] end end elsif !last_dump[:tags].include?("ids-schema=#{dumper.schema_digest}") InstDataShipper.logger.info("Last successful HostedData dump of the same genre has a different schema - not using incremental_since") context[:incremental_since] = nil end rescue Faraday::ResourceNotFound InstDataShipper.logger.info("No Last successful HostedData dump of the same genre - not using incremental_since") context[:incremental_since] = nil end end end def initialize_dump(context) tags = [ "ids-schema=#{dumper.schema_digest}", "ids-genre=#{dumper.export_genre}", ] tags << "ids-app=#{Rails.application.class.name.gsub(/::Application$/, '')}" if defined?(Rails) && Rails.application tags << "ids-schema-version=#{schema[:version]}" if schema[:version].present? dump = hosted_data_client.post( 'api/v1/custom_dumps/', reference_id: tracker.id, schema: convert_schema, tags: tags, ).body.with_indifferent_access redis.hset(rk(:state), :dump_id, dump[:id]) redis.expire(rk(:state), 30.days.to_i) end def chunk_data(generator, table:, extra: nil) warehouse_name = table[:warehouse_name] super(generator) do |batch, idx| bits = [warehouse_name, extra, idx].compact temp_file = "#{working_dir}/#{bits.join('.')}.tsv.gz" Zlib::GzipWriter.open(temp_file) do |gz| batch.each do |row| row = row.join("\t") if row.is_a?(Array) gz.puts(row) end end yield temp_file File.delete(temp_file) end end def upload_data_chunk(table_def, chunk) hosted_data_client.put("api/v1/custom_dumps/#{hd_dump_id}/", artifacts: { table_name(table_def) => [Faraday::UploadIO.new(chunk, 'application/gzip')], }) end def finalize_dump hosted_data_client.put("api/v1/custom_dumps/#{hd_dump_id}/", start_import: true) if hd_dump_id.present? redis.del(rk(:state)) end def cleanup_fatal_error hosted_data_client.delete("api/v1/custom_dumps/#{hd_dump_id}/", reason: 'Failure during extraction or transformation') if hd_dump_id.present? redis.del(rk(:state)) end # TODO Support/allow single-table fatal errors? protected def hd_dump_id @hd_dump_id ||= redis.hget(rk(:state), :dump_id) end def convert_schema definititions = {} table_schemas = schema[:tables] table_schemas.each do |ts| ts = ts.dup tname = table_name(ts) definititions[tname] = { dw_type: 'dimension', description: ts[:description], incremental: dumper.table_is_incremental?(ts), incremental_on: ts.dig(:incremental, :on), # indexed_columns tableName: tname, columns: ts[:columns].map do |col| coltype = col[:type] coltype ||= ts[:model].column_for_attribute(col[:from]).sql_type if col[:from].is_a?(String) { name: col[:warehouse_name], description: col[:description], type: coltype, } end, ids_meta: dumper.table_schema_metadata(ts), } end { version: "#{dumper.schema_digest}-#{Digest::MD5.hexdigest(definititions.to_json)[0...6]}", definition: definititions, } end def table_name(table_def) table_prefix = config[:table_prefix] table_prefix = table_prefix.present? ? "#{table_prefix}_" : nil table_name = table_def[:warehouse_name] table_name = table_prefix + table_name if table_prefix.present? table_name end def hosted_data_client @hosted_data_client ||= begin token = config[:token] host = config[:host] unless host.present? tok_content = JWT.decode(token, nil, false).first host = tok_content['host'] end host = "https://#{host}" unless host.include?('://') Faraday.new(url: host) do |faraday| faraday.request :multipart faraday.request :json faraday.response :raise_error faraday.response :follow_redirects faraday.response :json, :content_type => /\bjson$/ faraday.headers[:accept] = 'application/json' faraday.headers[:authorization] = "Bearer #{token}" faraday.adapter Faraday.default_adapter end end end def parse_configuration(uri) super do |parsed_uri, cfg| if parsed_uri.user.present? # hosted-data://: cfg[:token] = parsed_uri.user cfg[:host] = parsed_uri.host else # hosted-data:// cfg[:token] = parsed_uri.host end cfg[:table_prefix] = parsed_uri.params[:table_prefix] end end end end end