Sha256: 0f401c4c6cfa770ad8ee4646a95637af156078528391d89a89adfd4993825fd7

Contents?: true

Size: 2 KB

Versions: 8

Compression:

Stored size: 2 KB

Contents

module InstDataShipper
  module Destinations
    class S3 < Base
      include Concerns::Chunking

      def chunk_data(generator, table:, extra: nil)
        warehouse_name = table[:warehouse_name]

        super(generator) do |batch, idx|
          bits = [warehouse_name, extra, idx].compact
          temp_file = "#{working_dir}/#{bits.join('.')}.csv"

          CSV.open(temp_file, 'w', headers: false) do |row|
            row << table[:columns].map { |c| c[:warehouse_name] }
            batch.each do |batch_row|
              row << batch_row
            end
          end

          yield temp_file

          File.delete(temp_file)
        end
      end

      def upload_data_chunk(table_def, chunk)
        s3 = Aws::S3::Resource.new(client: aws_client)
        dir_key = tracker.created_at.strftime("%Y-%m-%dT%H:%M") + "_#{tracker.id}"
        bucket = s3.bucket(config[:bucket])

        subpath = config[:path].presence || "/"
        subpath = subpath[1..-1] if subpath.starts_with?("/")
        subpath = "instructure" unless subpath.present?

        obj_path = File.join(config[:path], dir_key, File.basename(chunk))
        object = bucket.object(obj_path)

        File.open(chunk, 'rb') do |file|
          object.put(body: file)
        end
      end

      protected

      def aws_client
        @aws_client ||= Aws::S3::Client.new(
          region: config[:region],
          credentials: Aws::Credentials.new(
            config[:access_key_id],
            config[:access_key_secret],
          )
        )
      end

      def parse_configuration(uri)
        # s3://<access_key_id>:<access_key_secret>@<region>/<bucket>/<path>
        super do |parsed_uri, cfg|
          split_path = parsed_uri.path.split('/')

          cfg.merge!({
            region: parsed_uri.host,
            bucket: split_path[0],
            access_key_id: parsed_uri.user,
            access_key_secret: parsed_uri.password,
            path: split_path[1..-1].join('/').presence,
          })
        end
      end

    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
inst_data_shipper-0.2.6 lib/inst_data_shipper/destinations/s3.rb
inst_data_shipper-0.2.5 lib/inst_data_shipper/destinations/s3.rb
inst_data_shipper-0.2.4 lib/inst_data_shipper/destinations/s3.rb
inst_data_shipper-0.2.3 lib/inst_data_shipper/destinations/s3.rb
inst_data_shipper-0.2.2 lib/inst_data_shipper/destinations/s3.rb
inst_data_shipper-0.2.1 lib/inst_data_shipper/destinations/s3.rb
inst_data_shipper-0.2.0 lib/inst_data_shipper/destinations/s3.rb
inst_data_shipper-0.1.0.beta2 lib/inst_data_shipper/destinations/s3.rb