lib/simpledb_adapter.rb in dm-adapter-simpledb-1.0.0 vs lib/simpledb_adapter.rb in dm-adapter-simpledb-1.1.0
- old
+ new
@@ -1,469 +1,2 @@
-require 'rubygems'
-require 'dm-core'
-require 'digest/sha1'
-require 'dm-aggregates'
-require 'right_aws'
-require 'uuidtools'
-require File.expand_path('simpledb_adapter/sdb_array', File.dirname(__FILE__))
-
-module DataMapper
-
- module Migrations
- #integrated from http://github.com/edward/dm-simpledb/tree/master
- module SimpledbAdapter
-
- module ClassMethods
-
- end
-
- def self.included(other)
- other.extend ClassMethods
-
- DataMapper.extend(::DataMapper::Migrations::SingletonMethods)
-
- [ :Repository, :Model ].each do |name|
- ::DataMapper.const_get(name).send(:include, Migrations.const_get(name))
- end
- end
-
- # Returns whether the storage_name exists.
- # @param storage_name<String> a String defining the name of a domain
- # @return <Boolean> true if the storage exists
- def storage_exists?(storage_name)
- domains = sdb.list_domains[:domains]
- domains.detect {|d| d == storage_name }!=nil
- end
-
- def create_model_storage(model)
- sdb.create_domain(@sdb_options[:domain])
- end
-
- #On SimpleDB you probably don't want to destroy the whole domain
- #if you are just adding fields it is automatically supported
- #default to non destructive migrate, to destroy run
- #rake db:automigrate destroy=true
- def destroy_model_storage(model)
- if ENV['destroy']!=nil && ENV['destroy']=='true'
- sdb.delete_domain(@sdb_options[:domain])
- end
- end
-
- end # module Migration
- end # module Migration
-
- module Adapters
- class SimpleDBAdapter < AbstractAdapter
-
- attr_reader :sdb_options
-
- # For testing purposes ONLY. Seriously, don't enable this for production
- # code.
- attr_accessor :consistency_policy
-
- def initialize(name, normalised_options)
- super
- @sdb_options = {}
- @sdb_options[:access_key] = options.fetch(:access_key) {
- options[:user]
- }
- @sdb_options[:secret_key] = options.fetch(:secret_key) {
- options[:password]
- }
- @sdb_options[:logger] = options.fetch(:logger) { DataMapper.logger }
- @sdb_options[:server] = options.fetch(:host) { 'sdb.amazonaws.com' }
- @sdb_options[:port] = options[:port] || 443 # port may be set but nil
- @sdb_options[:domain] = options.fetch(:domain) {
- options[:path].to_s.gsub(%r{(^/+)|(/+$)},"") # remove slashes
- }
- @consistency_policy =
- normalised_options.fetch(:wait_for_consistency) { false }
- end
-
- def create(resources)
- created = 0
- time = Benchmark.realtime do
- resources.each do |resource|
- uuid = UUIDTools::UUID.timestamp_create
- initialize_serial(resource, uuid.to_i)
- item_name = item_name_for_resource(resource)
- sdb_type = simpledb_type(resource.model)
- attributes = resource.attributes.merge(:simpledb_type => sdb_type)
- attributes = adjust_to_sdb_attributes(attributes)
- attributes.reject!{|name, value| value.nil?}
- sdb.put_attributes(domain, item_name, attributes)
- created += 1
- end
- end
- DataMapper.logger.debug(format_log_entry("(#{created}) INSERT #{resources.inspect}", time))
- modified!
- created
- end
-
- def delete(collection)
- deleted = 0
- time = Benchmark.realtime do
- collection.each do |resource|
- item_name = item_name_for_resource(resource)
- sdb.delete_attributes(domain, item_name)
- deleted += 1
- end
- raise NotImplementedError.new('Only :eql on delete at the moment') if not_eql_query?(collection.query)
- end; DataMapper.logger.debug(format_log_entry("(#{deleted}) DELETE #{collection.query.conditions.inspect}", time))
- modified!
- deleted
- end
-
- def read(query)
- maybe_wait_for_consistency
- sdb_type = simpledb_type(query.model)
-
- conditions, order, unsupported_conditions =
- set_conditions_and_sort_order(query, sdb_type)
- results = get_results(query, conditions, order)
- proto_resources = results.map do |result|
- name, attributes = *result.to_a.first
- proto_resource = query.fields.inject({}) do |proto_resource, property|
- value = attributes[property.field.to_s]
- if value != nil
- if value.size > 1
- if property.type == String
- value = chunks_to_string(value)
- else
- value = value.map {|v| property.typecast(v) }
- end
- else
- value = property.typecast(value.first)
- end
- else
- value = property.typecast(nil)
- end
- proto_resource[property.name.to_s] = value
- proto_resource
- end
- proto_resource
- end
- query.conditions.operands.reject!{ |op|
- !unsupported_conditions.include?(op)
- }
- records = query.filter_records(proto_resources)
-
- records
- end
-
- def update(attributes, collection)
- updated = 0
- attrs_to_update, attrs_to_delete = prepare_attributes(attributes)
- time = Benchmark.realtime do
- collection.each do |resource|
- item_name = item_name_for_resource(resource)
- unless attrs_to_update.empty?
- sdb.put_attributes(domain, item_name, attrs_to_update, :replace)
- end
- unless attrs_to_delete.empty?
- sdb.delete_attributes(domain, item_name, attrs_to_delete)
- end
- updated += 1
- end
- raise NotImplementedError.new('Only :eql on delete at the moment') if not_eql_query?(collection.query)
- end
- DataMapper.logger.debug(format_log_entry("UPDATE #{collection.query.conditions.inspect} (#{updated} times)", time))
- modified!
- updated
- end
-
- def query(query_call, query_limit = 999999999)
- select(query_call, query_limit).collect{|x| x.values[0]}
- end
-
- def aggregate(query)
- raise ArgumentError.new("Only count is supported") unless (query.fields.first.operator == :count)
- sdb_type = simpledb_type(query.model)
- conditions, order, unsupported_conditions = set_conditions_and_sort_order(query, sdb_type)
-
- query_call = "SELECT count(*) FROM #{domain} "
- query_call << "WHERE #{conditions.compact.join(' AND ')}" if conditions.length > 0
- results = nil
- time = Benchmark.realtime do
- results = sdb.select(query_call)
- end; DataMapper.logger.debug(format_log_entry(query_call, time))
- [results[:items][0].values.first["Count"].first.to_i]
- end
-
- # For testing purposes only.
- def wait_for_consistency
- return unless @current_consistency_token
- token = :none
- begin
- results = sdb.get_attributes(domain, '__dm_consistency_token', '__dm_consistency_token')
- tokens = results[:attributes]['__dm_consistency_token']
- end until tokens.include?(@current_consistency_token)
- end
-
- private
-
- # hack for converting and storing strings longer than 1024 one thing to
- # note if you use string longer than 1019 chars you will loose the ability
- # to do full text matching on queries as the string can be broken at any
- # place during chunking
- def adjust_to_sdb_attributes(attrs)
- attrs.each_pair do |key, value|
- if value.kind_of?(String)
- # Strings need to be inside arrays in order to prevent RightAws from
- # inadvertantly splitting them on newlines when it calls
- # Array(value).
- attrs[key] = [value]
- end
- if value.is_a?(String) && value.length > 1019
- chunked = string_to_chunks(value)
- attrs[key] = chunked
- end
- end
- attrs
- end
-
- def string_to_chunks(value)
- chunks = value.to_s.scan(%r/.{1,1019}/) # 1024 - '1024:'.size
- i = -1
- fmt = '%04d:'
- chunks.map!{|chunk| [(fmt % (i += 1)), chunk].join}
- raise ArgumentError, 'that is just too big yo!' if chunks.size >= 256
- chunks
- end
-
- def chunks_to_string(value)
- begin
- chunks =
- Array(value).flatten.map do |chunk|
- index, text = chunk.split(%r/:/, 2)
- [Float(index).to_i, text]
- end
- chunks.replace chunks.sort_by{|index, text| index}
- string_result = chunks.map!{|index, text| text}.join
- string_result
- rescue ArgumentError, TypeError
- #return original value, they could have put strings in the system not using the adapter or previous versions
- #that are larger than chunk size, but less than 1024
- value
- end
- end
-
- # Returns the domain for the model
- def domain
- @sdb_options[:domain]
- end
-
- #sets the conditions and order for the SDB query
- def set_conditions_and_sort_order(query, sdb_type)
- unsupported_conditions = []
- conditions = ["simpledb_type = '#{sdb_type}'"]
- # look for query.order.first and insure in conditions
- # raise if order if greater than 1
-
- if query.order && query.order.length > 0
- query_object = query.order[0]
- #anything sorted on must be a condition for SDB
- conditions << "#{query_object.target.name} IS NOT NULL"
- order = "ORDER BY #{query_object.target.name} #{query_object.operator}"
- else
- order = ""
- end
- query.conditions.each do |op|
- case op.slug
- when :regexp
- unsupported_conditions << op
- when :eql
- conditions << if op.value.nil?
- "#{op.subject.name} IS NULL"
- else
- "#{op.subject.name} = '#{op.value}'"
- end
- when :not then
- comp = op.operands.first
- if comp.slug == :like
- conditions << "#{comp.subject.name} not like '#{comp.value}'"
- next
- end
- case comp.value
- when Range, Set, Array, Regexp
- unsupported_conditions << op
- when nil
- conditions << "#{comp.subject.name} IS NOT NULL"
- else
- conditions << "#{comp.subject.name} != '#{comp.value}'"
- end
- when :gt then conditions << "#{op.subject.name} > '#{op.value}'"
- when :gte then conditions << "#{op.subject.name} >= '#{op.value}'"
- when :lt then conditions << "#{op.subject.name} < '#{op.value}'"
- when :lte then conditions << "#{op.subject.name} <= '#{op.value}'"
- when :like then conditions << "#{op.subject.name} like '#{op.value}'"
- when :in
- case op.value
- when Array, Set
- values = op.value.collect{|v| "'#{v}'"}.join(',')
- values = "'__NULL__'" if values.empty?
- conditions << "#{op.subject.name} IN (#{values})"
- when Range
- if op.value.exclude_end?
- unsupported_conditions << op
- else
- conditions << "#{op.subject.name} between '#{op.value.first}' and '#{op.value.last}'"
- end
- else
- raise ArgumentError, "Unsupported inclusion op: #{op.value.inspect}"
- end
- else raise "Invalid query op: #{op.inspect}"
- end
- end
- [conditions,order,unsupported_conditions]
- end
-
- def select(query_call, query_limit)
- items = []
- time = Benchmark.realtime do
- sdb_continuation_key = nil
- while (results = sdb.select(query_call, sdb_continuation_key)) do
- sdb_continuation_key = results[:next_token]
- items += results[:items]
- break if items.length > query_limit
- break if sdb_continuation_key.nil?
- end
- end; DataMapper.logger.debug(format_log_entry(query_call, time))
- items[0...query_limit]
- end
-
- #gets all results or proper number of results depending on the :limit
- def get_results(query, conditions, order)
- output_list = query.fields.map{|f| f.field}.join(', ')
- query_call = "SELECT #{output_list} FROM #{domain} "
- query_call << "WHERE #{conditions.compact.join(' AND ')}" if conditions.length > 0
- query_call << " #{order}"
- if query.limit!=nil
- query_limit = query.limit
- query_call << " LIMIT #{query.limit}"
- else
- #on large items force the max limit
- query_limit = 999999999 #TODO hack for query.limit being nil
- #query_call << " limit 2500" #this doesn't work with continuation keys as it halts at the limit passed not just a limit per query.
- end
- records = select(query_call, query_limit)
- end
-
- # Creates an item name for a query
- def item_name_for_query(query)
- sdb_type = simpledb_type(query.model)
-
- item_name = "#{sdb_type}+"
- keys = keys_for_model(query.model)
- conditions = query.conditions.sort {|a,b| a[1].name.to_s <=> b[1].name.to_s }
- item_name += conditions.map do |property|
- property[2].to_s
- end.join('-')
- Digest::SHA1.hexdigest(item_name)
- end
-
- # Creates an item name for a resource
- def item_name_for_resource(resource)
- sdb_type = simpledb_type(resource.model)
-
- item_name = "#{sdb_type}+"
- keys = keys_for_model(resource.model)
- item_name += keys.map do |property|
- property.get(resource)
- end.join('-')
-
- Digest::SHA1.hexdigest(item_name)
- end
-
- # Returns the keys for model sorted in alphabetical order
- def keys_for_model(model)
- model.key(self.name).sort {|a,b| a.name.to_s <=> b.name.to_s }
- end
-
- def not_eql_query?(query)
- # Curosity check to make sure we are only dealing with a delete
- conditions = query.conditions.map {|c| c.slug }.uniq
- selectors = [ :gt, :gte, :lt, :lte, :not, :like, :in ]
- return (selectors - conditions).size != selectors.size
- end
-
- # Returns an SimpleDB instance to work with
- def sdb
- access_key = @sdb_options[:access_key]
- secret_key = @sdb_options[:secret_key]
- @sdb ||= RightAws::SdbInterface.new(access_key,secret_key,@sdb_options)
- @sdb
- end
-
- # Returns a string so we know what type of
- def simpledb_type(model)
- model.storage_name(model.repository.name)
- end
-
- def format_log_entry(query, ms = 0)
- 'SDB (%.1fs) %s' % [ms, query.squeeze(' ')]
- end
-
- def prepare_attributes(attributes)
- attributes = attributes.to_a.map {|a| [a.first.name.to_s, a.last]}.to_hash
- attributes = adjust_to_sdb_attributes(attributes)
- updates, deletes = attributes.partition{|name,value|
- !value.nil? && !(value.respond_to?(:to_ary) && value.to_ary.empty?)
- }
- attrs_to_update = Hash[updates]
- attrs_to_delete = Hash[deletes].keys
- [attrs_to_update, attrs_to_delete]
- end
-
- def update_consistency_token
- @current_consistency_token = UUIDTools::UUID.timestamp_create.to_s
- sdb.put_attributes(
- domain,
- '__dm_consistency_token',
- {'__dm_consistency_token' => [@current_consistency_token]})
- end
-
- def maybe_wait_for_consistency
- if consistency_policy == :automatic && @current_consistency_token
- wait_for_consistency
- end
- end
-
- # SimpleDB supports "eventual consistency", which mean your data will be
- # there... eventually. Obviously this can make tests a little flaky. One
- # option is to just wait a fixed amount of time after every write, but
- # this can quickly add up to a lot of waiting. The strategy implemented
- # here is based on the theory that while consistency is only eventual,
- # chances are writes will at least be linear. That is, once the results of
- # write #2 show up we can probably assume that the results of write #1 are
- # in as well.
- #
- # When a consistency policy is enabled, the adapter writes a new unique
- # "consistency token" to the database after every write (i.e. every
- # create, update, or delete). If the policy is :manual, it only writes the
- # consistency token. If the policy is :automatic, writes will not return
- # until the token has been successfully read back.
- #
- # When waiting for the consistency token to show up, we use progressively
- # longer timeouts until finally giving up and raising an exception.
- def modified!
- case @consistency_policy
- when :manual, :automatic then
- update_consistency_token
- when false then
- # do nothing
- else
- raise "Invalid :wait_for_consistency option: #{@consistency_policy.inspect}"
- end
- end
-
- end # class SimpleDBAdapter
-
- # Required naming scheme.
- SimpledbAdapter = SimpleDBAdapter
-
- const_added(:SimpledbAdapter)
-
- end # module Adapters
-
-
-end # module DataMapper
+# This file is for backwards compatibility
+require 'dm-adapter-simpledb'