lib/dynamoid/adapter.rb in dynamoid-0.5.0 vs lib/dynamoid/adapter.rb in dynamoid-0.6.0

- old
+ new

@@ -43,20 +43,21 @@ # Write an object to the adapter. Partition it to a randomly selected key first if necessary. # # @param [String] table the name of the table to write the object to # @param [Object] object the object itself + # @param [Hash] options Options that are passed to the put_item call # # @return [Object] the persisted object # # @since 0.2.0 - def write(table, object) + def write(table, object, options = nil) if Dynamoid::Config.partitioning? && object[:id] object[:id] = "#{object[:id]}.#{Random.rand(Dynamoid::Config.partition_size)}" object[:updated_at] = Time.now.to_f end - put_item(table, object) + put_item(table, object, options) end # Read one or many keys from the selected table. This method intelligently calls batch_get or get on the underlying adapter depending on # whether ids is a range or a single key: additionally, if partitioning is enabled, it batch_gets all keys in the partition space # automatically. Finally, if a range key is present, it will also interpolate that into the ids so that the batch get will acquire the @@ -71,37 +72,53 @@ range_key = options[:range_key] if ids.respond_to?(:each) ids = ids.collect{|id| range_key ? [id, range_key] : id} if Dynamoid::Config.partitioning? results = batch_get_item(table => id_with_partitions(ids)) - {table => result_for_partition(results[table])} + {table => result_for_partition(results[table],table)} else batch_get_item(table => ids) end else if Dynamoid::Config.partitioning? ids = range_key ? [[ids, range_key]] : ids results = batch_get_item(table => id_with_partitions(ids)) - result_for_partition(results[table]).first + result_for_partition(results[table],table).first else get_item(table, ids, options) end end end # Delete an item from a table. If partitioning is turned on, deletes all partitioned keys as well. # # @param [String] table the name of the table to write the object to - # @param [String] id the id of the record - # @param [Number] range_key the range key of the record + # @param [Array] ids to delete, can also be a string of just one id + # @param [Array] range_key of the record to delete, can also be a string of just one range_key # - # @since 0.2.0 - def delete(table, id, options = {}) - if Dynamoid::Config.partitioning? - id_with_partitions(id).each {|i| delete_item(table, i, options)} + def delete(table, ids, options = {}) + range_key = options[:range_key] #array of range keys that matches the ids passed in + if ids.respond_to?(:each) + if range_key.respond_to?(:each) + #turn ids into array of arrays each element being hash_key, range_key + ids = ids.each_with_index.map{|id,i| [id,range_key[i]]} + else + ids = range_key ? [[ids, range_key]] : ids + end + + if Dynamoid::Config.partitioning? + batch_delete_item(table => id_with_partitions(ids)) + else + batch_delete_item(table => ids) + end else - delete_item(table, id, options) + if Dynamoid::Config.partitioning? + ids = range_key ? [[ids, range_key]] : ids + batch_delete_item(table => id_with_partitions(ids)) + else + delete_item(table, ids, options) + end end end # Scans a table. Generally quite slow; try to avoid using scan if at all possible. # @@ -110,11 +127,11 @@ # # @since 0.2.0 def scan(table, query, opts = {}) if Dynamoid::Config.partitioning? results = benchmark('Scan', table, query) {adapter.scan(table, query, opts)} - result_for_partition(results) + result_for_partition(results,table) else benchmark('Scan', table, query) {adapter.scan(table, query, opts)} end end @@ -139,36 +156,111 @@ # # @since 0.2.0 def id_with_partitions(ids) Array(ids).collect {|id| (0...Dynamoid::Config.partition_size).collect{|n| id.is_a?(Array) ? ["#{id.first}.#{n}", id.last] : "#{id}.#{n}"}}.flatten(1) end + + #Get original id (hash_key) and partiton number from a hash_key + # + # @param [String] id the id or hash_key of a record, ex. xxxxx.13 + # + # @return [String,String] original_id and the partition number, ex original_id = xxxxx partition = 13 + def get_original_id_and_partition id + partition = id.split('.').last + id = id.split(".#{partition}").first - # Takes an array of results that are partitioned, find the most recently updated one, and return only it. Compares each result by + return id, partition + end + + # Takes an array of query results that are partitioned, find the most recently updated ones that share an id and range_key, and return only the most recently updated. Compares each result by # their id and updated_at attributes; if the updated_at is the greatest, then it must be the correct result. # # @param [Array] returned partitioned results from a query + # @param [String] table_name the name of the table # # @since 0.2.0 - def result_for_partition(results) - {}.tap do |hash| - Array(results).each do |result| - next if result.nil? - id = result[:id].split('.').first - if !hash[id] || (result[:updated_at] > hash[id][:updated_at]) - result[:id] = id - hash[id] = result + def result_for_partition(results, table_name) + table = Dynamoid::Adapter::AwsSdk.get_table(table_name) + + if table.range_key + range_key_name = table.range_key.name.to_sym + + final_hash = {} + + results.each do |record| + test_record = final_hash[record[range_key_name]] + + if test_record.nil? || ((record[range_key_name] == test_record[range_key_name]) && (record[:updated_at] > test_record[:updated_at])) + #get ride of our partition and put it in the array with the range key + record[:id], partition = get_original_id_and_partition record[:id] + final_hash[record[range_key_name]] = record end end - end.values + + return final_hash.values + else + {}.tap do |hash| + Array(results).each do |result| + next if result.nil? + #Need to find the value of id with out the . and partition number + id, partition = get_original_id_and_partition result[:id] + + if !hash[id] || (result[:updated_at] > hash[id][:updated_at]) + result[:id] = id + hash[id] = result + end + end + end.values + end end # Delegate all methods that aren't defind here to the underlying adapter. # # @since 0.2.0 def method_missing(method, *args, &block) return benchmark(method, *args) {adapter.send(method, *args, &block)} if @adapter.respond_to?(method) super end + + # Query the DynamoDB table. This employs DynamoDB's indexes so is generally faster than scanning, but is + # only really useful for range queries, since it can only find by one hash key at once. Only provide + # one range key to the hash. If paritioning is on, will run a query for every parition and join the results + # + # @param [String] table_name the name of the table + # @param [Hash] opts the options to query the table with + # @option opts [String] :hash_value the value of the hash key to find + # @option opts [Range] :range_value find the range key within this range + # @option opts [Number] :range_greater_than find range keys greater than this + # @option opts [Number] :range_less_than find range keys less than this + # @option opts [Number] :range_gte find range keys greater than or equal to this + # @option opts [Number] :range_lte find range keys less than or equal to this + # + # @return [Array] an array of all matching items + # + def query(table_name, opts = {}) + + unless Dynamoid::Config.partitioning? + #no paritioning? just pass to the standard query method + Dynamoid::Adapter::AwsSdk.query(table_name, opts) + else + #get all the hash_values that could be possible + ids = id_with_partitions(opts[:hash_value]) - end + #lets not overwrite with the original options + modified_options = opts.clone + results = [] + + #loop and query on each of the partition ids + ids.each do |id| + modified_options[:hash_value] = id + query_result = Dynamoid::Adapter::AwsSdk.query(table_name, modified_options) + query_result = [query_result] if !query_result.is_a?(Array) + + results = results + query_result unless query_result.nil? + end + + result_for_partition results, table_name + end + end + end end