lib/dynamoid/adapter.rb in dynamoid-0.7.1 vs lib/dynamoid/adapter.rb in dynamoid-1.0.0

- old
+ new

@@ -1,30 +1,44 @@ +# require only 'concurrent/atom' once this issue is resolved: +# https://github.com/ruby-concurrency/concurrent-ruby/pull/377 +require 'concurrent' + # encoding: utf-8 module Dynamoid - # Adapter provides a generic, write-through class that abstracts variations in the underlying connections to provide a uniform response - # to Dynamoid. - module Adapter - extend self - attr_accessor :tables + # Adapter's value-add: + # 1) For the rest of Dynamoid, the gateway to DynamoDB. + # 2) Allows switching `config.adapter` to ease development of a new adapter. + # 3) Caches the list of tables Dynamoid knows about. + class Adapter + def initialize + @adapter_ = Concurrent::Atom.new(nil) + @tables_ = Concurrent::Atom.new(nil) + end - # The actual adapter currently in use: presently AwsSdk. + def tables + if !@tables_.value + @tables_.swap{|value, args| benchmark('Cache Tables') {list_tables}} + end + @tables_.value + end + + # The actual adapter currently in use. # # @since 0.2.0 def adapter - reconnect! unless @adapter - @adapter + if !@adapter_.value + adapter = self.class.adapter_plugin_class.new + adapter.connect! if adapter.respond_to?(:connect!) + @adapter_.compare_and_set(nil, adapter) + clear_cache! + end + @adapter_.value end - # Establishes a connection to the underyling adapter and caches all its tables for speedier future lookups. Issued when the adapter is first called. - # - # @since 0.2.0 - def reconnect! - require "dynamoid/adapter/#{Dynamoid::Config.adapter}" unless Dynamoid::Adapter.const_defined?(Dynamoid::Config.adapter.camelcase) - @adapter = Dynamoid::Adapter.const_get(Dynamoid::Config.adapter.camelcase) - @adapter.connect! if @adapter.respond_to?(:connect!) - self.tables = benchmark('Cache Tables') {list_tables} + def clear_cache! + @tables_.swap{|value, args| nil} end # Shows how long it takes a method to run on the adapter. Useful for generating logged output. # # @param [Symbol] method the name of the method to appear in the log @@ -39,62 +53,48 @@ result = yield Dynamoid.logger.info "(#{((Time.now - start) * 1000.0).round(2)} ms) #{method.to_s.split('_').collect(&:upcase).join(' ')}#{ " - #{args.inspect}" unless args.nil? || args.empty? }" return result end - # Write an object to the adapter. Partition it to a randomly selected key first if necessary. + # Write an object to the adapter. # # @param [String] table the name of the table to write the object to # @param [Object] object the object itself # @param [Hash] options Options that are passed to the put_item call # # @return [Object] the persisted object # # @since 0.2.0 def write(table, object, options = nil) - if Dynamoid::Config.partitioning? && object[:id] - object[:id] = "#{object[:id]}.#{Random.rand(Dynamoid::Config.partition_size)}" - object[:updated_at] = Time.now.to_f - end put_item(table, object, options) end - # Read one or many keys from the selected table. This method intelligently calls batch_get or get on the underlying adapter depending on - # whether ids is a range or a single key: additionally, if partitioning is enabled, it batch_gets all keys in the partition space - # automatically. Finally, if a range key is present, it will also interpolate that into the ids so that the batch get will acquire the - # correct record. + # Read one or many keys from the selected table. + # This method intelligently calls batch_get or get on the underlying adapter + # depending on whether ids is a range or a single key. + # If a range key is present, it will also interpolate that into the ids so + # that the batch get will acquire the correct record. # # @param [String] table the name of the table to write the object to # @param [Array] ids to fetch, can also be a string of just one id # @param [Hash] options: Passed to the underlying query. The :range_key option is required whenever the table has a range key, - # unless multiple ids are passed in and Dynamoid::Config.partitioning? is turned off. + # unless multiple ids are passed in. # # @since 0.2.0 def read(table, ids, options = {}) range_key = options.delete(:range_key) if ids.respond_to?(:each) ids = ids.collect{|id| range_key ? [id, range_key] : id} - if Dynamoid::Config.partitioning? - results = batch_get_item({table => id_with_partitions(ids)}, options) - {table => result_for_partition(results[table],table)} - else - batch_get_item({table => ids}, options) - end + batch_get_item({table => ids}, options) else - if Dynamoid::Config.partitioning? - ids = range_key ? [[ids, range_key]] : ids - results = batch_get_item({table => id_with_partitions(ids)}, options) - result_for_partition(results[table],table).first - else - options[:range_key] = range_key if range_key - get_item(table, ids, options) - end + options[:range_key] = range_key if range_key + get_item(table, ids, options) end end - # Delete an item from a table. If partitioning is turned on, deletes all partitioned keys as well. + # Delete an item from a table. # # @param [String] table the name of the table to write the object to # @param [Array] ids to delete, can also be a string of just one id # @param [Array] range_key of the record to delete, can also be a string of just one range_key # @@ -105,130 +105,54 @@ #turn ids into array of arrays each element being hash_key, range_key ids = ids.each_with_index.map{|id,i| [id,range_key[i]]} else ids = range_key ? [[ids, range_key]] : ids end - - if Dynamoid::Config.partitioning? - batch_delete_item(table => id_with_partitions(ids)) - else - batch_delete_item(table => ids) - end + + batch_delete_item(table => ids) else - if Dynamoid::Config.partitioning? - ids = range_key ? [[ids, range_key]] : ids - batch_delete_item(table => id_with_partitions(ids)) - else - delete_item(table, ids, options) - end + delete_item(table, ids, options) end end # Scans a table. Generally quite slow; try to avoid using scan if at all possible. # # @param [String] table the name of the table to write the object to # @param [Hash] scan_hash a hash of attributes: matching records will be returned by the scan # # @since 0.2.0 def scan(table, query, opts = {}) - if Dynamoid::Config.partitioning? - results = benchmark('Scan', table, query) {adapter.scan(table, query, opts)} - result_for_partition(results,table) - else - benchmark('Scan', table, query) {adapter.scan(table, query, opts)} + benchmark('Scan', table, query) {adapter.scan(table, query, opts)} + end + + def create_table(table_name, key, options = {}) + if !tables.include?(table_name) + benchmark('Create Table') { adapter.create_table(table_name, key, options) } + tables << table_name end end - [:batch_get_item, :create_table, :delete_item, :delete_table, :get_item, :list_tables, :put_item].each do |m| + [:batch_get_item, :delete_item, :delete_table, :get_item, :list_tables, :put_item].each do |m| # Method delegation with benchmark to the underlying adapter. Faster than relying on method_missing. # # @since 0.2.0 define_method(m) do |*args| benchmark("#{m.to_s}", args) {adapter.send(m, *args)} end end - # Takes a list of ids and returns them with partitioning added. If an array of arrays is passed, we assume the second key is the range key - # and pass it in unchanged. - # - # @example Partition id 1 - # Dynamoid::Adapter.id_with_partitions(['1']) # ['1.0', '1.1', '1.2', ..., '1.199'] - # @example Partition id 1 and range_key 1.0 - # Dynamoid::Adapter.id_with_partitions([['1', 1.0]]) # [['1.0', 1.0], ['1.1', 1.0], ['1.2', 1.0], ..., ['1.199', 1.0]] - # - # @param [Array] ids array of ids to partition - # - # @since 0.2.0 - def id_with_partitions(ids) - Array(ids).collect {|id| (0...Dynamoid::Config.partition_size).collect{|n| id.is_a?(Array) ? ["#{id.first}.#{n}", id.last] : "#{id}.#{n}"}}.flatten(1) - end - - #Get original id (hash_key) and partiton number from a hash_key - # - # @param [String] id the id or hash_key of a record, ex. xxxxx.13 - # - # @return [String,String] original_id and the partition number, ex original_id = xxxxx partition = 13 - def get_original_id_and_partition id - partition = id.split('.').last - id = id.split(".#{partition}").first - - return id, partition - end - - # Takes an array of query results that are partitioned, find the most recently updated ones that share an id and range_key, and return only the most recently updated. Compares each result by - # their id and updated_at attributes; if the updated_at is the greatest, then it must be the correct result. - # - # @param [Array] returned partitioned results from a query - # @param [String] table_name the name of the table - # - # @since 0.2.0 - def result_for_partition(results, table_name) - table = Dynamoid::Adapter::AwsSdk.get_table(table_name) - - if table.range_key - range_key_name = table.range_key.name.to_sym - - final_hash = {} - - results.each do |record| - test_record = final_hash[record[range_key_name]] - - if test_record.nil? || ((record[range_key_name] == test_record[range_key_name]) && (record[:updated_at] > test_record[:updated_at])) - #get ride of our partition and put it in the array with the range key - record[:id], partition = get_original_id_and_partition record[:id] - final_hash[record[range_key_name]] = record - end - end - - return final_hash.values - else - {}.tap do |hash| - Array(results).each do |result| - next if result.nil? - #Need to find the value of id with out the . and partition number - id, partition = get_original_id_and_partition result[:id] - - if !hash[id] || (result[:updated_at] > hash[id][:updated_at]) - result[:id] = id - hash[id] = result - end - end - end.values - end - end - # Delegate all methods that aren't defind here to the underlying adapter. # # @since 0.2.0 def method_missing(method, *args, &block) - return benchmark(method, *args) {adapter.send(method, *args, &block)} if @adapter.respond_to?(method) + return benchmark(method, *args) {adapter.send(method, *args, &block)} if adapter.respond_to?(method) super end - + # Query the DynamoDB table. This employs DynamoDB's indexes so is generally faster than scanning, but is # only really useful for range queries, since it can only find by one hash key at once. Only provide - # one range key to the hash. If paritioning is on, will run a query for every parition and join the results + # one range key to the hash. # # @param [String] table_name the name of the table # @param [Hash] opts the options to query the table with # @option opts [String] :hash_value the value of the hash key to find # @option opts [Range] :range_value find the range key within this range @@ -238,30 +162,20 @@ # @option opts [Number] :range_lte find range keys less than or equal to this # # @return [Array] an array of all matching items # def query(table_name, opts = {}) - - unless Dynamoid::Config.partitioning? - #no paritioning? just pass to the standard query method - Dynamoid::Adapter::AwsSdk.query(table_name, opts) - else - #get all the hash_values that could be possible - ids = id_with_partitions(opts[:hash_value]) + adapter.query(table_name, opts) + end - #lets not overwrite with the original options - modified_options = opts.clone - results = [] - - #loop and query on each of the partition ids - ids.each do |id| - modified_options[:hash_value] = id + private - query_result = Dynamoid::Adapter::AwsSdk.query(table_name, modified_options) - results += query_result.inject([]){|array, result| array += [result]} if query_result.any? - end - - result_for_partition results, table_name + def self.adapter_plugin_class + unless Dynamoid.const_defined?(:AdapterPlugin) && Dynamoid::AdapterPlugin.const_defined?(Dynamoid::Config.adapter.camelcase) + require "dynamoid/adapter_plugin/#{Dynamoid::Config.adapter}" end + + Dynamoid::AdapterPlugin.const_get(Dynamoid::Config.adapter.camelcase) end + end end