# frozen_string_literal: true require_relative 'aws_sdk_v3/query' require_relative 'aws_sdk_v3/scan' require_relative 'aws_sdk_v3/create_table' require_relative 'aws_sdk_v3/batch_get_item' require_relative 'aws_sdk_v3/item_updater' require_relative 'aws_sdk_v3/table' require_relative 'aws_sdk_v3/until_past_table_status' module Dynamoid module AdapterPlugin # The AwsSdkV3 adapter provides support for the aws-sdk version 2 for ruby. class AwsSdkV3 EQ = 'EQ' RANGE_MAP = { range_greater_than: 'GT', range_less_than: 'LT', range_gte: 'GE', range_lte: 'LE', range_begins_with: 'BEGINS_WITH', range_between: 'BETWEEN', range_eq: 'EQ' }.freeze FIELD_MAP = { eq: 'EQ', ne: 'NE', gt: 'GT', lt: 'LT', gte: 'GE', lte: 'LE', begins_with: 'BEGINS_WITH', between: 'BETWEEN', in: 'IN', contains: 'CONTAINS', not_contains: 'NOT_CONTAINS', null: 'NULL', not_null: 'NOT_NULL', }.freeze HASH_KEY = 'HASH' RANGE_KEY = 'RANGE' STRING_TYPE = 'S' NUM_TYPE = 'N' BINARY_TYPE = 'B' TABLE_STATUSES = { creating: 'CREATING', updating: 'UPDATING', deleting: 'DELETING', active: 'ACTIVE' }.freeze PARSE_TABLE_STATUS = lambda { |resp, lookup = :table| # lookup is table for describe_table API # lookup is table_description for create_table API # because Amazon, damnit. resp.send(lookup).table_status } BATCH_WRITE_ITEM_REQUESTS_LIMIT = 25 CONNECTION_CONFIG_OPTIONS = %i[endpoint region http_continue_timeout http_idle_timeout http_open_timeout http_read_timeout].freeze attr_reader :table_cache # Establish the connection to DynamoDB. # # @return [Aws::DynamoDB::Client] the DynamoDB connection def connect! @client = Aws::DynamoDB::Client.new(connection_config) @table_cache = {} end def connection_config @connection_hash = {} (Dynamoid::Config.settings.compact.keys & CONNECTION_CONFIG_OPTIONS).each do |option| @connection_hash[option] = Dynamoid::Config.send(option) end if Dynamoid::Config.access_key? @connection_hash[:access_key_id] = Dynamoid::Config.access_key end if Dynamoid::Config.secret_key? @connection_hash[:secret_access_key] = Dynamoid::Config.secret_key end # https://github.com/aws/aws-sdk-ruby/blob/master/gems/aws-sdk-core/lib/aws-sdk-core/plugins/logging.rb # https://github.com/aws/aws-sdk-ruby/blob/master/gems/aws-sdk-core/lib/aws-sdk-core/log/formatter.rb formatter = Aws::Log::Formatter.new(':operation | Request :http_request_body | Response :http_response_body') @connection_hash[:logger] = Dynamoid::Config.logger @connection_hash[:log_level] = :debug @connection_hash[:log_formatter] = formatter @connection_hash end # Return the client object. # # @since 1.0.0 def client @client end # Puts multiple items in one table # # If optional block is passed it will be called for each written batch of items, meaning once per batch. # Block receives boolean flag which is true if there are some unprocessed items, otherwise false. # # @example Saves several items to the table testtable # Dynamoid::AdapterPlugin::AwsSdkV3.batch_write_item('table1', [{ id: '1', name: 'a' }, { id: '2', name: 'b'}]) # # @example Pass block # Dynamoid::AdapterPlugin::AwsSdkV3.batch_write_item('table1', items) do |bool| # if bool # puts 'there are unprocessed items' # end # end # # @param [String] table_name the name of the table # @param [Array] items to be processed # @param [Hash] additional options # @param [Proc] optional block # # See: # * http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html # * http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#batch_write_item-instance_method def batch_write_item(table_name, objects, options = {}) items = objects.map { |o| sanitize_item(o) } begin while items.present? batch = items.shift(BATCH_WRITE_ITEM_REQUESTS_LIMIT) requests = batch.map { |item| { put_request: { item: item } } } response = client.batch_write_item( { request_items: { table_name => requests }, return_consumed_capacity: 'TOTAL', return_item_collection_metrics: 'SIZE' }.merge!(options) ) yield(response.unprocessed_items.present?) if block_given? if response.unprocessed_items.present? items += response.unprocessed_items[table_name].map { |r| r.put_request.item } end end rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e raise Dynamoid::Errors::ConditionalCheckFailedException, e end end # Get many items at once from DynamoDB. More efficient than getting each item individually. # # If optional block is passed `nil` will be returned and the block will be called for each read batch of items, # meaning once per batch. # # Block receives parameters: # * hash with items like `{ table_name: [items]}` # * and boolean flag is true if there are some unprocessed keys, otherwise false. # # @example Retrieve IDs 1 and 2 from the table testtable # Dynamoid::AdapterPlugin::AwsSdkV3.batch_get_item('table1' => ['1', '2']) # # @example Pass block to receive each batch # Dynamoid::AdapterPlugin::AwsSdkV3.batch_get_item('table1' => ids) do |hash, bool| # puts hash['table1'] # # if bool # puts 'there are unprocessed keys' # end # end # # @param [Hash] table_ids the hash of tables and IDs to retrieve # @param [Hash] options to be passed to underlying BatchGet call # @param [Proc] optional block can be passed to handle each batch of items # # @return [Hash] a hash where keys are the table names and the values are the retrieved items # # See: # * http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#batch_get_item-instance_method # # @since 1.0.0 # # @todo: Provide support for passing options to underlying batch_get_item def batch_get_item(table_names_with_ids, options = {}, &block) tables_with_ids = table_names_with_ids.transform_keys do |name| describe_table(name) end BatchGetItem.new(client, tables_with_ids, options).call(&block) end # Delete many items at once from DynamoDB. More efficient than delete each item individually. # # @example Delete IDs 1 and 2 from the table testtable # Dynamoid::AdapterPlugin::AwsSdk.batch_delete_item('table1' => ['1', '2']) # or # Dynamoid::AdapterPlugin::AwsSdkV3.batch_delete_item('table1' => [['hk1', 'rk2'], ['hk1', 'rk2']]])) # # @param [Hash] options the hash of tables and IDs to delete # # See: # * http://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_BatchWriteItem.html # * http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#batch_write_item-instance_method # # TODO handle rejections because of internal processing failures def batch_delete_item(options) requests = [] options.each_pair do |table_name, ids| table = describe_table(table_name) ids.each_slice(BATCH_WRITE_ITEM_REQUESTS_LIMIT) do |sliced_ids| delete_requests = sliced_ids.map do |id| { delete_request: { key: key_stanza(table, *id) } } end requests << { table_name => delete_requests } end end begin requests.map do |request_items| client.batch_write_item( request_items: request_items, return_consumed_capacity: 'TOTAL', return_item_collection_metrics: 'SIZE' ) end rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e raise Dynamoid::Errors::ConditionalCheckFailedException, e end end # Create a table on DynamoDB. This usually takes a long time to complete. # # @param [String] table_name the name of the table to create # @param [Symbol] key the table's primary key (defaults to :id) # @param [Hash] options provide a range key here if the table has a composite key # @option options [Array] local_secondary_indexes # @option options [Array] global_secondary_indexes # @option options [Symbol] hash_key_type The type of the hash key # @option options [Boolean] sync Wait for table status to be ACTIVE? # @since 1.0.0 def create_table(table_name, key = :id, options = {}) Dynamoid.logger.info "Creating #{table_name} table. This could take a while." CreateTable.new(client, table_name, key, options).call rescue Aws::DynamoDB::Errors::ResourceInUseException => e Dynamoid.logger.error "Table #{table_name} cannot be created as it already exists" end # Create a table on DynamoDB *synchronously*. # This usually takes a long time to complete. # CreateTable is normally an asynchronous operation. # You can optionally define secondary indexes on the new table, # as part of the CreateTable operation. # If you want to create multiple tables with secondary indexes on them, # you must create the tables sequentially. # Only one table with secondary indexes can be # in the CREATING state at any given time. # See: http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#create_table-instance_method # # @param [String] table_name the name of the table to create # @param [Symbol] key the table's primary key (defaults to :id) # @param [Hash] options provide a range key here if the table has a composite key # @option options [Array] local_secondary_indexes # @option options [Array] global_secondary_indexes # @option options [Symbol] hash_key_type The type of the hash key # @since 1.2.0 def create_table_synchronously(table_name, key = :id, options = {}) create_table(table_name, key, options.merge(sync: true)) end # Removes an item from DynamoDB. # # @param [String] table_name the name of the table # @param [String] key the hash key of the item to delete # @param [Hash] options provide a range key here if the table has a composite key # # @since 1.0.0 # # @todo: Provide support for various options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#delete_item-instance_method def delete_item(table_name, key, options = {}) options ||= {} range_key = options[:range_key] conditions = options[:conditions] table = describe_table(table_name) client.delete_item( table_name: table_name, key: key_stanza(table, key, range_key), expected: expected_stanza(conditions) ) rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e raise Dynamoid::Errors::ConditionalCheckFailedException, e end # Deletes an entire table from DynamoDB. # # @param [String] table_name the name of the table to destroy # @option options [Boolean] sync Wait for table status check to raise ResourceNotFoundException # # @since 1.0.0 def delete_table(table_name, options = {}) resp = client.delete_table(table_name: table_name) if options[:sync] status = PARSE_TABLE_STATUS.call(resp, :table_description) if status == TABLE_STATUSES[:deleting] UntilPastTableStatus.new(client, table_name, :deleting).call end end table_cache.delete(table_name) rescue Aws::DynamoDB::Errors::ResourceInUseException => e Dynamoid.logger.error "Table #{table_name} cannot be deleted as it is in use" raise e end def delete_table_synchronously(table_name, options = {}) delete_table(table_name, options.merge(sync: true)) end # @todo Add a DescribeTable method. # Fetches an item from DynamoDB. # # @param [String] table_name the name of the table # @param [String] key the hash key of the item to find # @param [Hash] options provide a range key here if the table has a composite key # # @return [Hash] a hash representing the raw item in DynamoDB # # @since 1.0.0 # # @todo Provide support for various options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#get_item-instance_method def get_item(table_name, key, options = {}) options = options.dup options ||= {} table = describe_table(table_name) range_key = options.delete(:range_key) consistent_read = options.delete(:consistent_read) item = client.get_item(table_name: table_name, key: key_stanza(table, key, range_key), consistent_read: consistent_read)[:item] item ? result_item_to_hash(item) : nil end # Edits an existing item's attributes, or adds a new item to the table if it does not already exist. You can put, delete, or add attribute values # # @param [String] table_name the name of the table # @param [String] key the hash key of the item to find # @param [Hash] options provide a range key here if the table has a composite key # # @return new attributes for the record # # @todo Provide support for various options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#update_item-instance_method def update_item(table_name, key, options = {}) options = options.dup range_key = options.delete(:range_key) conditions = options.delete(:conditions) table = describe_table(table_name) yield(iu = ItemUpdater.new(table, key, range_key)) raise "non-empty options: #{options}" unless options.empty? begin result = client.update_item(table_name: table_name, key: key_stanza(table, key, range_key), attribute_updates: iu.to_h, expected: expected_stanza(conditions), return_values: 'ALL_NEW') result_item_to_hash(result[:attributes]) rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e raise Dynamoid::Errors::ConditionalCheckFailedException, e end end # List all tables on DynamoDB. # # @since 1.0.0 def list_tables [].tap do |result| start_table_name = nil loop do result_page = client.list_tables exclusive_start_table_name: start_table_name start_table_name = result_page.last_evaluated_table_name result.concat result_page.table_names break unless start_table_name end end end # Persists an item on DynamoDB. # # @param [String] table_name the name of the table # @param [Object] object a hash or Dynamoid object to persist # # @since 1.0.0 # # See: http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#put_item-instance_method def put_item(table_name, object, options = {}) options ||= {} item = sanitize_item(object) begin client.put_item( { table_name: table_name, item: item, expected: expected_stanza(options) }.merge!(options) ) rescue Aws::DynamoDB::Errors::ConditionalCheckFailedException => e raise Dynamoid::Errors::ConditionalCheckFailedException, e end end # Query the DynamoDB table. This employs DynamoDB's indexes so is generally faster than scanning, but is # only really useful for range queries, since it can only find by one hash key at once. Only provide # one range key to the hash. # # @param [String] table_name the name of the table # @param [Hash] options the options to query the table with # @option options [String] :hash_value the value of the hash key to find # @option options [Number, Number] :range_between find the range key within this range # @option options [Number] :range_greater_than find range keys greater than this # @option options [Number] :range_less_than find range keys less than this # @option options [Number] :range_gte find range keys greater than or equal to this # @option options [Number] :range_lte find range keys less than or equal to this # # @return [Enumerable] matching items # # @since 1.0.0 # # @todo Provide support for various other options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#query-instance_method def query(table_name, options = {}) Enumerator.new do |yielder| table = describe_table(table_name) Query.new(client, table, options).call.each do |page| yielder.yield( page.items.map { |row| result_item_to_hash(row) }, last_evaluated_key: page.last_evaluated_key ) end end end def query_count(table_name, options = {}) table = describe_table(table_name) options[:select] = 'COUNT' Query.new(client, table, options).call .map(&:count) .reduce(:+) end # Scan the DynamoDB table. This is usually a very slow operation as it naively filters all data on # the DynamoDB servers. # # @param [String] table_name the name of the table # @param [Hash] conditions a hash of attributes: matching records will be returned by the scan # # @return [Enumerable] matching items # # @since 1.0.0 # # @todo: Provide support for various options http://docs.aws.amazon.com/sdkforruby/api/Aws/DynamoDB/Client.html#scan-instance_method def scan(table_name, conditions = {}, options = {}) Enumerator.new do |yielder| table = describe_table(table_name) Scan.new(client, table, conditions, options).call.each do |page| yielder.yield( page.items.map { |row| result_item_to_hash(row) }, last_evaluated_key: page.last_evaluated_key ) end end end def scan_count(table_name, conditions = {}, options = {}) table = describe_table(table_name) options[:select] = 'COUNT' Scan.new(client, table, conditions, options).call .map(&:count) .reduce(:+) end # # Truncates all records in the given table # # @param [String] table_name the name of the table # # @since 1.0.0 def truncate(table_name) table = describe_table(table_name) hk = table.hash_key rk = table.range_key scan(table_name, {}, {}).flat_map{ |i| i }.each do |attributes| opts = {} opts[:range_key] = attributes[rk.to_sym] if rk delete_item(table_name, attributes[hk], opts) end end def count(table_name) describe_table(table_name, true).item_count end protected # # The key hash passed on get_item, put_item, delete_item, update_item, etc # def key_stanza(table, hash_key, range_key = nil) key = { table.hash_key.to_s => hash_key } key[table.range_key.to_s] = range_key if range_key key end # # @param [Hash] conditions Conditions to enforce on operation (e.g. { :if => { :count => 5 }, :unless_exists => ['id']}) # @return an Expected stanza for the given conditions hash # def expected_stanza(conditions = nil) expected = Hash.new { |h, k| h[k] = {} } return expected unless conditions conditions.delete(:unless_exists).try(:each) do |col| expected[col.to_s][:exists] = false end conditions.delete(:if_exists).try(:each) do |col, val| expected[col.to_s][:exists] = true expected[col.to_s][:value] = val end conditions.delete(:if).try(:each) do |col, val| expected[col.to_s][:value] = val end expected end # # New, semi-arbitrary API to get data on the table # def describe_table(table_name, reload = false) (!reload && table_cache[table_name]) || begin table_cache[table_name] = Table.new(client.describe_table(table_name: table_name).data) end end # # Converts a hash returned by get_item, scan, etc. into a key-value hash # def result_item_to_hash(item) {}.tap do |r| item.each { |k, v| r[k.to_sym] = v } end end # Build an array of values for Condition # Is used in ScanFilter and QueryFilter # https://docs.aws.amazon.com/amazondynamodb/latest/APIReference/API_Condition.html # @params [String] operator: value of RANGE_MAP or FIELD_MAP hash, e.g. "EQ", "LT" etc # @params [Object] value: scalar value or array/set def self.attribute_value_list(operator, value) # For BETWEEN and IN operators we should keep value as is (it should be already an array) # NULL and NOT_NULL require absence of attribute list # For all the other operators we wrap the value with array # https://docs.aws.amazon.com/en_us/amazondynamodb/latest/developerguide/LegacyConditionalParameters.Conditions.html if %w[BETWEEN IN].include?(operator) [value].flatten elsif %w[NULL NOT_NULL].include?(operator) nil else [value] end end def sanitize_item(attributes) config_value = Dynamoid.config.store_attribute_with_nil_value store_attribute_with_nil_value = config_value.nil? ? false : !!config_value attributes.reject do |_, v| ((v.is_a?(Set) || v.is_a?(String)) && v.empty?) || (!store_attribute_with_nil_value && v.nil?) end.transform_values do |v| v.is_a?(Hash) ? v.stringify_keys : v end end end end end