Sha256: 0f401c4c6cfa770ad8ee4646a95637af156078528391d89a89adfd4993825fd7
Contents?: true
Size: 2 KB
Versions: 8
Compression:
Stored size: 2 KB
Contents
module InstDataShipper module Destinations class S3 < Base include Concerns::Chunking def chunk_data(generator, table:, extra: nil) warehouse_name = table[:warehouse_name] super(generator) do |batch, idx| bits = [warehouse_name, extra, idx].compact temp_file = "#{working_dir}/#{bits.join('.')}.csv" CSV.open(temp_file, 'w', headers: false) do |row| row << table[:columns].map { |c| c[:warehouse_name] } batch.each do |batch_row| row << batch_row end end yield temp_file File.delete(temp_file) end end def upload_data_chunk(table_def, chunk) s3 = Aws::S3::Resource.new(client: aws_client) dir_key = tracker.created_at.strftime("%Y-%m-%dT%H:%M") + "_#{tracker.id}" bucket = s3.bucket(config[:bucket]) subpath = config[:path].presence || "/" subpath = subpath[1..-1] if subpath.starts_with?("/") subpath = "instructure" unless subpath.present? obj_path = File.join(config[:path], dir_key, File.basename(chunk)) object = bucket.object(obj_path) File.open(chunk, 'rb') do |file| object.put(body: file) end end protected def aws_client @aws_client ||= Aws::S3::Client.new( region: config[:region], credentials: Aws::Credentials.new( config[:access_key_id], config[:access_key_secret], ) ) end def parse_configuration(uri) # s3://<access_key_id>:<access_key_secret>@<region>/<bucket>/<path> super do |parsed_uri, cfg| split_path = parsed_uri.path.split('/') cfg.merge!({ region: parsed_uri.host, bucket: split_path[0], access_key_id: parsed_uri.user, access_key_secret: parsed_uri.password, path: split_path[1..-1].join('/').presence, }) end end end end end
Version data entries
8 entries across 8 versions & 1 rubygems