require 'rubygems' require 'dm-core' require 'digest/sha1' require 'dm-aggregates' require 'right_aws' require 'uuidtools' require File.expand_path('simpledb_adapter/sdb_array', File.dirname(__FILE__)) module DataMapper module Migrations #integrated from http://github.com/edward/dm-simpledb/tree/master module SimpledbAdapter module ClassMethods end def self.included(other) other.extend ClassMethods DataMapper.extend(::DataMapper::Migrations::SingletonMethods) [ :Repository, :Model ].each do |name| ::DataMapper.const_get(name).send(:include, Migrations.const_get(name)) end end # Returns whether the storage_name exists. # @param storage_name a String defining the name of a domain # @return true if the storage exists def storage_exists?(storage_name) domains = sdb.list_domains[:domains] domains.detect {|d| d == storage_name }!=nil end def create_model_storage(model) sdb.create_domain(@sdb_options[:domain]) end #On SimpleDB you probably don't want to destroy the whole domain #if you are just adding fields it is automatically supported #default to non destructive migrate, to destroy run #rake db:automigrate destroy=true def destroy_model_storage(model) if ENV['destroy']!=nil && ENV['destroy']=='true' sdb.delete_domain(@sdb_options[:domain]) end end end # module Migration end # module Migration module Adapters class SimpleDBAdapter < AbstractAdapter attr_reader :sdb_options # For testing purposes ONLY. Seriously, don't enable this for production # code. attr_accessor :consistency_policy def initialize(name, normalised_options) super @sdb_options = {} @sdb_options[:access_key] = options.fetch(:access_key) { options[:user] } @sdb_options[:secret_key] = options.fetch(:secret_key) { options[:password] } @sdb_options[:logger] = options.fetch(:logger) { DataMapper.logger } @sdb_options[:server] = options.fetch(:host) { 'sdb.amazonaws.com' } @sdb_options[:port] = options[:port] || 443 # port may be set but nil @sdb_options[:domain] = options.fetch(:domain) { options[:path].to_s.gsub(%r{(^/+)|(/+$)},"") # remove slashes } @consistency_policy = normalised_options.fetch(:wait_for_consistency) { false } end def create(resources) created = 0 time = Benchmark.realtime do resources.each do |resource| uuid = UUIDTools::UUID.timestamp_create initialize_serial(resource, uuid.to_i) item_name = item_name_for_resource(resource) sdb_type = simpledb_type(resource.model) attributes = resource.attributes.merge(:simpledb_type => sdb_type) attributes = adjust_to_sdb_attributes(attributes) attributes.reject!{|name, value| value.nil?} sdb.put_attributes(domain, item_name, attributes) created += 1 end end DataMapper.logger.debug(format_log_entry("(#{created}) INSERT #{resources.inspect}", time)) modified! created end def delete(collection) deleted = 0 time = Benchmark.realtime do collection.each do |resource| item_name = item_name_for_resource(resource) sdb.delete_attributes(domain, item_name) deleted += 1 end raise NotImplementedError.new('Only :eql on delete at the moment') if not_eql_query?(collection.query) end; DataMapper.logger.debug(format_log_entry("(#{deleted}) DELETE #{collection.query.conditions.inspect}", time)) modified! deleted end def read(query) maybe_wait_for_consistency sdb_type = simpledb_type(query.model) conditions, order, unsupported_conditions = set_conditions_and_sort_order(query, sdb_type) results = get_results(query, conditions, order) proto_resources = results.map do |result| name, attributes = *result.to_a.first proto_resource = query.fields.inject({}) do |proto_resource, property| value = attributes[property.field.to_s] if value != nil if value.size > 1 if property.type == String value = chunks_to_string(value) else value = value.map {|v| property.typecast(v) } end else value = property.typecast(value.first) end else value = property.typecast(nil) end proto_resource[property.name.to_s] = value proto_resource end proto_resource end query.conditions.operands.reject!{ |op| !unsupported_conditions.include?(op) } records = query.filter_records(proto_resources) records end def update(attributes, collection) updated = 0 attrs_to_update, attrs_to_delete = prepare_attributes(attributes) time = Benchmark.realtime do collection.each do |resource| item_name = item_name_for_resource(resource) unless attrs_to_update.empty? sdb.put_attributes(domain, item_name, attrs_to_update, :replace) end unless attrs_to_delete.empty? sdb.delete_attributes(domain, item_name, attrs_to_delete) end updated += 1 end raise NotImplementedError.new('Only :eql on delete at the moment') if not_eql_query?(collection.query) end DataMapper.logger.debug(format_log_entry("UPDATE #{collection.query.conditions.inspect} (#{updated} times)", time)) modified! updated end def query(query_call, query_limit = 999999999) select(query_call, query_limit).collect{|x| x.values[0]} end def aggregate(query) raise ArgumentError.new("Only count is supported") unless (query.fields.first.operator == :count) sdb_type = simpledb_type(query.model) conditions, order, unsupported_conditions = set_conditions_and_sort_order(query, sdb_type) query_call = "SELECT count(*) FROM #{domain} " query_call << "WHERE #{conditions.compact.join(' AND ')}" if conditions.length > 0 results = nil time = Benchmark.realtime do results = sdb.select(query_call) end; DataMapper.logger.debug(format_log_entry(query_call, time)) [results[:items][0].values.first["Count"].first.to_i] end # For testing purposes only. def wait_for_consistency return unless @current_consistency_token token = :none begin results = sdb.get_attributes(domain, '__dm_consistency_token', '__dm_consistency_token') tokens = results[:attributes]['__dm_consistency_token'] end until tokens.include?(@current_consistency_token) end private # hack for converting and storing strings longer than 1024 one thing to # note if you use string longer than 1019 chars you will loose the ability # to do full text matching on queries as the string can be broken at any # place during chunking def adjust_to_sdb_attributes(attrs) attrs.each_pair do |key, value| if value.kind_of?(String) # Strings need to be inside arrays in order to prevent RightAws from # inadvertantly splitting them on newlines when it calls # Array(value). attrs[key] = [value] end if value.is_a?(String) && value.length > 1019 chunked = string_to_chunks(value) attrs[key] = chunked end end attrs end def string_to_chunks(value) chunks = value.to_s.scan(%r/.{1,1019}/) # 1024 - '1024:'.size i = -1 fmt = '%04d:' chunks.map!{|chunk| [(fmt % (i += 1)), chunk].join} raise ArgumentError, 'that is just too big yo!' if chunks.size >= 256 chunks end def chunks_to_string(value) begin chunks = Array(value).flatten.map do |chunk| index, text = chunk.split(%r/:/, 2) [Float(index).to_i, text] end chunks.replace chunks.sort_by{|index, text| index} string_result = chunks.map!{|index, text| text}.join string_result rescue ArgumentError, TypeError #return original value, they could have put strings in the system not using the adapter or previous versions #that are larger than chunk size, but less than 1024 value end end # Returns the domain for the model def domain @sdb_options[:domain] end #sets the conditions and order for the SDB query def set_conditions_and_sort_order(query, sdb_type) unsupported_conditions = [] conditions = ["simpledb_type = '#{sdb_type}'"] # look for query.order.first and insure in conditions # raise if order if greater than 1 if query.order && query.order.length > 0 query_object = query.order[0] #anything sorted on must be a condition for SDB conditions << "#{query_object.target.name} IS NOT NULL" order = "ORDER BY #{query_object.target.name} #{query_object.operator}" else order = "" end query.conditions.each do |op| case op.slug when :regexp unsupported_conditions << op when :eql conditions << if op.value.nil? "#{op.subject.name} IS NULL" else "#{op.subject.name} = '#{op.value}'" end when :not then comp = op.operands.first if comp.slug == :like conditions << "#{comp.subject.name} not like '#{comp.value}'" next end case comp.value when Range, Set, Array, Regexp unsupported_conditions << op when nil conditions << "#{comp.subject.name} IS NOT NULL" else conditions << "#{comp.subject.name} != '#{comp.value}'" end when :gt then conditions << "#{op.subject.name} > '#{op.value}'" when :gte then conditions << "#{op.subject.name} >= '#{op.value}'" when :lt then conditions << "#{op.subject.name} < '#{op.value}'" when :lte then conditions << "#{op.subject.name} <= '#{op.value}'" when :like then conditions << "#{op.subject.name} like '#{op.value}'" when :in case op.value when Array, Set values = op.value.collect{|v| "'#{v}'"}.join(',') values = "'__NULL__'" if values.empty? conditions << "#{op.subject.name} IN (#{values})" when Range if op.value.exclude_end? unsupported_conditions << op else conditions << "#{op.subject.name} between '#{op.value.first}' and '#{op.value.last}'" end else raise ArgumentError, "Unsupported inclusion op: #{op.value.inspect}" end else raise "Invalid query op: #{op.inspect}" end end [conditions,order,unsupported_conditions] end def select(query_call, query_limit) items = [] time = Benchmark.realtime do sdb_continuation_key = nil while (results = sdb.select(query_call, sdb_continuation_key)) do sdb_continuation_key = results[:next_token] items += results[:items] break if items.length > query_limit break if sdb_continuation_key.nil? end end; DataMapper.logger.debug(format_log_entry(query_call, time)) items[0...query_limit] end #gets all results or proper number of results depending on the :limit def get_results(query, conditions, order) output_list = query.fields.map{|f| f.field}.join(', ') query_call = "SELECT #{output_list} FROM #{domain} " query_call << "WHERE #{conditions.compact.join(' AND ')}" if conditions.length > 0 query_call << " #{order}" if query.limit!=nil query_limit = query.limit query_call << " LIMIT #{query.limit}" else #on large items force the max limit query_limit = 999999999 #TODO hack for query.limit being nil #query_call << " limit 2500" #this doesn't work with continuation keys as it halts at the limit passed not just a limit per query. end records = select(query_call, query_limit) end # Creates an item name for a query def item_name_for_query(query) sdb_type = simpledb_type(query.model) item_name = "#{sdb_type}+" keys = keys_for_model(query.model) conditions = query.conditions.sort {|a,b| a[1].name.to_s <=> b[1].name.to_s } item_name += conditions.map do |property| property[2].to_s end.join('-') Digest::SHA1.hexdigest(item_name) end # Creates an item name for a resource def item_name_for_resource(resource) sdb_type = simpledb_type(resource.model) item_name = "#{sdb_type}+" keys = keys_for_model(resource.model) item_name += keys.map do |property| property.get(resource) end.join('-') Digest::SHA1.hexdigest(item_name) end # Returns the keys for model sorted in alphabetical order def keys_for_model(model) model.key(self.name).sort {|a,b| a.name.to_s <=> b.name.to_s } end def not_eql_query?(query) # Curosity check to make sure we are only dealing with a delete conditions = query.conditions.map {|c| c.slug }.uniq selectors = [ :gt, :gte, :lt, :lte, :not, :like, :in ] return (selectors - conditions).size != selectors.size end # Returns an SimpleDB instance to work with def sdb access_key = @sdb_options[:access_key] secret_key = @sdb_options[:secret_key] @sdb ||= RightAws::SdbInterface.new(access_key,secret_key,@sdb_options) @sdb end # Returns a string so we know what type of def simpledb_type(model) model.storage_name(model.repository.name) end def format_log_entry(query, ms = 0) 'SDB (%.1fs) %s' % [ms, query.squeeze(' ')] end def prepare_attributes(attributes) attributes = attributes.to_a.map {|a| [a.first.name.to_s, a.last]}.to_hash attributes = adjust_to_sdb_attributes(attributes) updates, deletes = attributes.partition{|name,value| !value.nil? && !(value.respond_to?(:to_ary) && value.to_ary.empty?) } attrs_to_update = Hash[updates] attrs_to_delete = Hash[deletes].keys [attrs_to_update, attrs_to_delete] end def update_consistency_token @current_consistency_token = UUIDTools::UUID.timestamp_create.to_s sdb.put_attributes( domain, '__dm_consistency_token', {'__dm_consistency_token' => [@current_consistency_token]}) end def maybe_wait_for_consistency if consistency_policy == :automatic && @current_consistency_token wait_for_consistency end end # SimpleDB supports "eventual consistency", which mean your data will be # there... eventually. Obviously this can make tests a little flaky. One # option is to just wait a fixed amount of time after every write, but # this can quickly add up to a lot of waiting. The strategy implemented # here is based on the theory that while consistency is only eventual, # chances are writes will at least be linear. That is, once the results of # write #2 show up we can probably assume that the results of write #1 are # in as well. # # When a consistency policy is enabled, the adapter writes a new unique # "consistency token" to the database after every write (i.e. every # create, update, or delete). If the policy is :manual, it only writes the # consistency token. If the policy is :automatic, writes will not return # until the token has been successfully read back. # # When waiting for the consistency token to show up, we use progressively # longer timeouts until finally giving up and raising an exception. def modified! case @consistency_policy when :manual, :automatic then update_consistency_token when false then # do nothing else raise "Invalid :wait_for_consistency option: #{@consistency_policy.inspect}" end end end # class SimpleDBAdapter # Required naming scheme. SimpledbAdapter = SimpleDBAdapter const_added(:SimpledbAdapter) end # module Adapters end # module DataMapper