lib/dynamoid/adapter.rb in dynamoid-0.1.1 vs lib/dynamoid/adapter.rb in dynamoid-0.1.2
- old
+ new
@@ -1,29 +1,102 @@
# encoding: utf-8
module Dynamoid #:nodoc:
module Adapter
extend self
+ attr_accessor :tables
def adapter
reconnect! unless @adapter
@adapter
end
def reconnect!
require "dynamoid/adapter/#{Dynamoid::Config.adapter}" unless Dynamoid::Adapter.const_defined?(Dynamoid::Config.adapter.camelcase)
@adapter = Dynamoid::Adapter.const_get(Dynamoid::Config.adapter.camelcase)
@adapter.connect! if @adapter.respond_to?(:connect!)
+ self.tables = benchmark('Cache Tables') {list_tables}
end
- def method_missing(method, *args)
- if @adapter.respond_to?(method)
- start = Time.now
- result = @adapter.send(method, *args)
- Dynamoid.logger.info "(#{((Time.now - start) * 1000.0).round(2)} ms) #{method.to_s.split('_').collect(&:upcase).join(' ')}#{ " - #{args.join(',')}" unless args.empty? }"
- return result
+ def benchmark(method, *args)
+ start = Time.now
+ result = yield
+ Dynamoid.logger.info "(#{((Time.now - start) * 1000.0).round(2)} ms) #{method.to_s.split('_').collect(&:upcase).join(' ')}#{ " - #{args.inspect}" unless args.nil? || args.empty? }"
+ return result
+ end
+
+ def write(table, object)
+ if Dynamoid::Config.partitioning? && object[:id]
+ object[:id] = "#{object[:id]}.#{Random.rand(Dynamoid::Config.partition_size)}"
+ object[:updated_at] = Time.now.to_f
end
+ benchmark('Put Item', object) {put_item(table, object)}
+ end
+
+ def read(table, ids)
+ if ids.respond_to?(:each)
+ if Dynamoid::Config.partitioning?
+ results = benchmark('Partitioned Batch Get Item', ids) {batch_get_item(table => id_with_partitions(ids))}
+ {table => result_for_partition(results[table])}
+ else
+ benchmark('Batch Get Item', ids) {batch_get_item(table => ids)}
+ end
+ else
+ if Dynamoid::Config.partitioning?
+ results = benchmark('Partitioned Get Item', ids) {batch_get_item(table => id_with_partitions(ids))}
+ result_for_partition(results[table]).first
+ else
+ benchmark('Get Item', ids) {get_item(table, ids)}
+ end
+ end
+ end
+
+ def delete(table, id)
+ if Dynamoid::Config.partitioning?
+ benchmark('Delete Item', id) do
+ id_with_partitions(id).each {|i| delete_item(table, i)}
+ end
+ else
+ benchmark('Delete Item', id) {delete_item(table, id)}
+ end
+ end
+
+ def scan(table, query)
+ if Dynamoid::Config.partitioning?
+ results = benchmark('Scan', table, query) {adapter.scan(table, query)}
+ result_for_partition(results)
+ else
+ adapter.scan(table, query)
+ end
+ end
+
+ [:batch_get_item, :create_table, :delete_item, :delete_table, :get_item, :list_tables, :put_item, :query].each do |m|
+ define_method(m) do |*args|
+ benchmark("#{m.to_s}", args) {adapter.send(m, *args)}
+ end
+ end
+
+ def id_with_partitions(ids)
+ Array(ids).collect {|id| (0...Dynamoid::Config.partition_size).collect{|n| "#{id}.#{n}"}}.flatten
+ end
+
+ def result_for_partition(results)
+ Hash.new.tap do |hash|
+ Array(results).each do |result|
+ next if result.nil?
+ id = result[:id].split('.').first
+ if !hash[id] || (result[:updated_at] > hash[id][:updated_at])
+ result[:id] = id
+ hash[id] = result
+ end
+ end
+ end.values
+ end
+
+ def method_missing(method, *args)
+ return benchmark(method, *args) {adapter.send(method, *args)} if @adapter.respond_to?(method)
super
end
+
end
end
\ No newline at end of file