module InstDataShipper module Destinations class HostedData < Base include Concerns::Chunking def initialize_dump dump = hosted_data_client.post( 'api/v1/custom_dumps/', reference_id: tracker.id, schema: convert_schema, ).body.with_indifferent_access redis.hset(rk(:state), :dump_id, dump[:id]) redis.expire(rk(:state), 30.days.to_i) end def chunk_data(generator, table:, extra: nil) warehouse_name = table_def[:warehouse_name] super(generator) do |batch, idx| bits = [warehouse_name, extra, idx].compact temp_file = "#{working_dir}/#{bits.join('.')}.tsv.gz" Zlib::GzipWriter.open(temp_file) do |gz| batch.each do |row| row = row.join("\t") if row.is_a?(Array) gz.puts(row) end end yield temp_file File.delete(temp_file) end end def upload_data_chunk(table_def, chunk) hosted_data_client.put("api/v1/custom_dumps/#{hd_dump_id}/", artifacts: { table_def[:warehouse_name] => [Faraday::UploadIO.new(chunk, 'application/gzip')], }) end def finalize_dump hosted_data_client.put("api/v1/custom_dumps/#{hd_dump_id}/", start_import: true) if hd_dump_id.present? redis.delete(rk(:state)) end def cleanup_fatal_error hosted_data_client.delete("api/v1/custom_dumps/#{hd_dump_id}/", reason: 'Failure during extraction or transformation') if hd_dump_id.present? redis.delete(rk(:state)) end # TODO Support/allow single-table fatal errors? protected def hd_dump_id @hd_dump_id ||= redis.hget(rk(:state), :dump_id) end def convert_schema table_prefix = config[:table_prefix] table_prefix = table_prefix.present? ? "#{table_prefix}_" : nil definititions = {} table_schemas.each do |ts| ts = ts.dup table_name = ts[:warehouse_name] table_name = table_prefix + table_name if table_prefix.present? definititions[ts[:warehouse_name]] = { dw_type: 'dimension', description: ts[:description], incremental: !!ts[:incremental], incremental_on: ts[:incremental] && ts[:incremental] != true ? ts[:incremental] : nil, # indexed_columns tableName: table_name, columns: ts[:columns].map do |col| { name: col[:warehouse_name], description: col[:description], type: col[:type] || ts[:model].column_for_attribute(col[:local_name]).sql_type, } end, } end { version: "#{dumper.export_genre.downcase}-#{Digest::MD5.hexdigest(definititions.to_json)[0...6]}", definition: definititions, } end def hosted_data_client @hosted_data_client ||= begin token = config[:token] host = config[:host] unless host.present? tok_content = JWT.decode(token, nil, false).first host = tok_content['host'] end Faraday.new(url: host) do |faraday| faraday.request :multipart faraday.request :json faraday.response :raise_error faraday.response :follow_redirects faraday.response :json, :content_type => /\bjson$/ faraday.headers[:accept] = 'application/json' faraday.headers[:authorization] = "Bearer #{token}" faraday.adapter Faraday.default_adapter end end end def parse_configuration(uri) super do |parsed_uri, cfg| if parsed_uri.username.present? # hosted-data://: cfg[:token] = parsed_uri.username cfg[:host] = parsed_uri.host else # hosted-data:// cfg[:token] = parsed_uri.host end end end end end end