lib/remi/data_subjects/s3_file.rb in remi-0.3.2 vs lib/remi/data_subjects/s3_file.rb in remi-0.3.3
- old
+ new
@@ -1,15 +1,61 @@
module Remi
+ module DataSubject::S3File
+ attr_accessor :region
+ attr_accessor :aws_credentials
+
+ def init_aws_credentials(credentials)
+ @aws_credentials = Aws::Credentials.new(
+ credentials.fetch(:aws_access_key_id, ENV['AWS_ACCESS_KEY_ID']),
+ credentials.fetch(:aws_secret_access_key, ENV['AWS_SECRET_ACCESS_KEY'])
+ )
+ end
+
+ def s3
+ @s3 ||= Aws::S3::Resource.new(
+ credentials: aws_credentials,
+ region: region
+ )
+ end
+
+ def encrypt_args
+ @kms_args || {}
+ end
+
+ def init_kms(opt)
+ return nil unless opt
+
+ kms = Aws::KMS::Client.new(
+ region: @region,
+ credentials: @aws_credentials
+ )
+
+ ciphertext = opt.fetch(:ciphertext)
+ algorithm = opt.fetch(:algorithm, 'AES256')
+ key = kms.decrypt(ciphertext_blob: Base64.decode64(ciphertext)).plaintext
+
+ @kms_args = {
+ sse_customer_algorithm: algorithm,
+ sse_customer_key: key
+ }
+ end
+ end
+
# S3 File extractor
# Used to extract files from Amazon S3
#
- # @example
+ # @example Standard use
#
# class MyJob < Remi::Job
# source :some_file do
# extractor Remi::Extractor::S3File.new(
+ # credentials: {
+ # aws_access_key_id: ENV['AWS_ACCESS_KEY_ID'],
+ # aws_secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'],
+ # region: 'us-west-2'
+ # },
# bucket: 'my-awesome-bucket',
# remote_path: 'some_file-',
# most_recent_only: true
# )
# parser Remi::Parser::CsvFile.new(
@@ -26,25 +72,58 @@
# # =>#<Daru::DataFrame:70153153438500 @name = 4c59cfdd-7de7-4264-8666-83153f46a9e4 @size = 3>
# # id name
# # 0 1 Albert
# # 1 2 Betsy
# # 2 3 Camu
+ #
+ # @example Using AWS KMS
+ # To use AWS KMS, supply a :ciphertext and optional :algorithm (default is AES256).
+ # The encrypted key stored in the ciphertext must be the same as that used when the file was written.
+ #
+ # class MyJob < Remi::Job
+ # source :some_file do
+ # extractor Remi::Extractor::S3File.new(
+ # credentials: {
+ # aws_access_key_id: ENV['AWS_ACCESS_KEY_ID'],
+ # aws_secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'],
+ # region: 'us-west-2'
+ # },
+ # bucket: 'my-awesome-bucket',
+ # remote_path: 'some_file-',
+ # most_recent_only: true,
+ # kms_opt: {
+ # ciphertext: '<base64-encoded ciphertext>'
+ # }
+ # )
+ # parser Remi::Parser::CsvFile.new(
+ # csv_options: {
+ # headers: true,
+ # col_sep: '|'
+ # }
+ # )
+ # end
+ # end
class Extractor::S3File < Extractor::FileSystem
+ include Remi::DataSubject::S3File
- # @param bucket_name [String] S3 bucket containing the files
+ # @param bucket [String] Name of S3 bucket containing the files
+ # @param kms_opt [Hash] Hash containing AWS KMS options
+ # @param credentials [Hash] Hash containing AWS credentials (must contain :aws_access_key_id, :aws_secret_access_key, :region)
def initialize(*args, **kargs, &block)
super
init_s3_file(*args, **kargs, &block)
end
# Called to extract files from the source filesystem.
# @return [Array<String>] An array of paths to a local copy of the files extacted
def extract
+ init_kms(@kms_opt)
+
entries.map do |entry|
local_file = File.join(@local_path, entry.name)
logger.info "Downloading #{entry.pathname} from S3 to #{local_file}"
- File.open(local_file, 'wb') { |file| entry.raw.get(response_target: file) }
+ File.open(local_file, 'wb') { |file| entry.raw.get({ response_target: file }.merge(encrypt_args)) }
local_file
end
end
# @return [Array<Extractor::FileSystemEntry>] (Memoized) list of objects in the bucket/prefix
@@ -53,32 +132,140 @@
end
# @return [Array<Extractor::FileSystemEntry>] List of objects in the bucket/prefix
def all_entries!
# S3 does not track anything like a create time, so use last modified for both
- bucket.objects(prefix: @remote_path.to_s).map do |entry|
+ s3.bucket(@bucket_name).objects(prefix: @remote_path.to_s).map do |entry|
Extractor::FileSystemEntry.new(
pathname: entry.key,
create_time: entry.last_modified,
modified_time: entry.last_modified,
raw: entry
)
end
end
- # @return [Aws::S3::Client] The S3 client used
- def s3_client
- @s3_client ||= Aws::S3::Client.new
- end
-
private
- def init_s3_file(*args, bucket:, **kargs)
+ def init_s3_file(*args, credentials: {}, bucket:, kms_opt: nil, **kargs)
+ @region = credentials.fetch(:region, 'us-west-2')
+ @kms_opt = kms_opt
+ init_aws_credentials(credentials)
+
@bucket_name = bucket
end
+ end
- def bucket
- @bucket ||= Aws::S3::Bucket.new(@bucket_name, client: s3_client)
+
+
+ # S3 File loader
+ # Used to post files to Amazon S3
+ #
+ # @example Standard use
+ #
+ # class MyJob < Remi::Job
+ # target :some_file do
+ # encoder Remi::Encoder::CsvFile.new
+ # loader Remi::Loader::S3File.new(
+ # credentials: {
+ # aws_access_key_id: ENV['AWS_ACCESS_KEY_ID'],
+ # aws_secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'],
+ # region: 'us-west-2'
+ # },
+ # bucket: 'itk-de-archive',
+ # remote_path: 'awesome.csv'
+ # )
+ # end
+ # end
+ #
+ # job = MyJob.new
+ # job.some_file.df = Daru::DataFrame.new(
+ # {
+ # numbers: [1,2,3],
+ # words: ['one', 'two', 'three']
+ # }
+ # )
+ # job.some_file.load
+ #
+ # @example Using AWS KMS
+ # To use AWS KMS, supply a :ciphertext and optional :algorithm (default is AES256).
+ # The encrypted key stored in the ciphertext must be the same as that used for reading the file.
+ #
+ # class MyJob < Remi::Job
+ # target :some_file do
+ # encoder Remi::Encoder::CsvFile.new
+ # loader Remi::Loader::S3File.new(
+ # credentials: {
+ # aws_access_key_id: ENV['AWS_ACCESS_KEY_ID'],
+ # aws_secret_access_key: ENV['AWS_SECRET_ACCESS_KEY'],
+ # region: 'us-west-2'
+ # },
+ # bucket: 'itk-de-archive',
+ # remote_path: 'awesome.csv',
+ # kms_opt: {
+ # ciphertext: '<base64-encoded ciphertext>'
+ # }
+ # )
+ # end
+ # end
+ #
+ # @example Generating a ciphertext
+ # A ciphertext can be generated using the AWS SDK
+ #
+ # require 'aws-sdk'
+ # require 'base64'
+ #
+ # aws_credentials = Aws::Credentials.new(
+ # ENV['AWS_ACCESS_KEY_ID'],
+ # ENV['AWS_SECRET_ACCESS_KEY']
+ # )
+ #
+ # kms = Aws::KMS::Client.new(
+ # region: 'us-west-2',
+ # credentials: aws_credentials
+ # )
+ #
+ # # See AWS docs for creating keys: http://docs.aws.amazon.com/kms/latest/developerguide/create-keys.html
+ # data_key = kms.generate_data_key(
+ # key_id: 'alias/alias-of-kms-key',
+ # key_spec: 'AES_256'
+ # )
+ #
+ # ciphertext = Base64.strict_encode64(data_key.ciphertext_blob)
+ # #=> "AQIDAHjmmRVcBAdMHsA9VUoJKgbW8niK2qL1qPcQ2OWEUlh5XAFw0vfl+QIgawB8cbAZ2OqXAAAAfjB8BgkqhkiG9w0BBwagbzBtAgEAMGgGCSqGSIb3DQEHATAeBglghkgBZQMEAS4wEQQMIUIFFh++2w4d9al7AgEQgDvSRXQCOPLSMOjRS/lM5uxuyRV47qInlKKBIezIaYzXuFu1sRU+L46HqRyS0XqR4flFJ/fc8yEj3pU1UA=="
+ class Loader::S3File < Loader
+ include Remi::DataSubject::S3File
+
+ # @param bucket [String] Name of S3 bucket containing the files
+ # @param kms_opt [Hash] Hash containing AWS KMS options
+ # @param credentials [Hash] Hash containing AWS credentials (must contain :aws_access_key_id, :aws_secret_access_key, :region)
+ def initialize(*args, **kargs, &block)
+ super
+ init_s3_loader(*args, **kargs, &block)
end
+ attr_reader :remote_path
+ attr_reader :bucket_name
+
+ # Copies data to S3
+ # @param data [Object] The path to the file in the temporary work location
+ # @return [true] On success
+ def load(data)
+ init_kms(@kms_opt)
+
+ @logger.info "Writing file #{data} to S3 #{@bucket_name} as #{@remote_path}"
+ s3.bucket(@bucket_name).object(@remote_path).upload_file(data, encrypt_args)
+ true
+ end
+
+ private
+
+ def init_s3_loader(*args, credentials:{}, bucket:, remote_path:, kms_opt: nil, **kargs, &block)
+ @region = credentials.fetch(:region, 'us-west-2')
+ @kms_opt = kms_opt
+ init_aws_credentials(credentials)
+
+ @bucket_name = bucket
+ @remote_path = remote_path
+ end
end
end