module SdbDal $KCODE = 'u' require "aws_sdb" require "sdb_dal/storage.rb" require 'ya2yaml' require File.dirname(__FILE__) +"/memory_repository.rb" require File.dirname(__FILE__) +"/domain_object_cache_item.rb" require File.dirname(__FILE__) +"/sdb_monkey_patch.rb" class Repository attr_accessor :use_cache attr_accessor :storage def initialize( sdb_domain_prefix,#MyDevPayApp clob_bucket, aws_key_id, aws_secret_key, memcache_servers = nil , a_storage=nil, append_table_to_domain=true, options={} ) @use_cache=true @storage=a_storage @storage||=Storage.new(aws_key_id,aws_secret_key,memcache_servers) @sdb_domain_prefix = sdb_domain_prefix @clob_bucket = clob_bucket @memcache_servers = memcache_servers @aws_key_id = aws_key_id @aws_secret_key = aws_secret_key @append_table_to_domain=append_table_to_domain @logger = options[:logger] if !@logger @logger = Logger.new(STDOUT) @logger.level = Logger::ERROR end @sdb=AwsSdb::Service.new(:access_key_id=>aws_key_id,:secret_access_key=>aws_secret_key,:url=>"http://sdb.amazonaws.com",:logger=>@logger) @session_cache=MemoryRepository.new # #create_domain() end def clear_session_cache @session_cache.clear end def flatten_key(key) if key.is_a?( Array) flattened_key="" key.each do |key_part| flattened_key< true) # # @storage.put( # @clob_bucket, # make_cache_key(table_name,primary_key), # yaml , # {}) # end # end def create_domain 20.times do |i| begin @sdb.create_domain(@sdb_domain_prefix) return rescue => e s= "#{e.message}\n#{e.backtrace}" @logger.warn(s) if @logger sleep(i*i) end end end def put_attributes(table_name,primary_key, formatted_attributes,options={}) 20.times do |i| begin @sdb.put_attributes(make_domain_name(table_name),make_cache_key(table_name,primary_key) , formatted_attributes, true ) return rescue Exception => e s= "#{e.message}\n#{e.backtrace}" @logger.warn(s) if @logger sleep(i*i) end end end def extend_query(query,new_clause) if query.length>0 query << " intersection " end query << new_clause end def escape_quotes(value) return nil unless value value.gsub( "'","\\'") end def build_query(table_name,attribute_descriptions,options={}) query="" params=nil params=options[:params] if options.has_key?(:params) if options.has_key?(:map) params||={} keys=options[:map][:keys] values=options[:map][:values] (0..keys.length-1).each do |i| key=keys[i] value=values[i] params[key]=value end end if !@append_table_to_domain extend_query(query," ['metadata%%table_name' = '#{table_name}']") end if options if options.has_key?(:query) extend_query(query,"["+options[:query]+"]") end if params params.each do |key,value| got_something=false if attribute_descriptions.has_key?(key) if value==:NOT_NULL got_something=true extend_query(query," ['#{key}' starts-with '']") end if value==:NULL got_something=true extend_query(query," not ['#{key}' starts-with '']") end if value.respond_to?(:less_than) && value.less_than got_something=true extend_query(query," ['#{key}' < '#{escape_quotes(attribute_descriptions[key].format_for_sdb_single( value.less_than))}']") end if value.respond_to?(:greater_than) && value.greater_than got_something=true extend_query(query," ['#{key}' > '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single( value.greater_than))}']") end if value.respond_to?(:less_than_or_equal_to) && value.less_than_or_equal_to got_something=true extend_query(query,"['#{key}' <= '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single( value.less_than_or_equal_to))}']") end if value.respond_to?(:greater_than_or_equal_to) && value.greater_than_or_equal_to got_something=true extend_query(query," ['#{key}' >= '#{escape_quotes(attribute_descriptions[key].format_for_sdb_single( value.greater_than_or_equal_to))}']") end if value==false got_something=true extend_query(query," ['#{key}' != 'true' ]") end if !got_something extend_query(query," ['#{key}' = '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single(value))}']") end else # #it must be formatted already. likely an index extend_query(query,"['#{key}' = '#{escape_quotes value}']") end end end if options.has_key?(:order_by) && options[:order_by] clause=" ['#{options[:order_by]}' starts-with ''] sort '#{options[:order_by]}' " if options.has_key?(:order) and options[:order]==:descending clause<<" desc " end extend_query(query,clause) end if options.has_key?(:conditions) options[:conditions].each do |condition| extend_query(query," [ "+condition.to_sdb_query()+"]") end end end return query end # #result will be an array of hashes. each hash is a set of attributes def query(table_name,attribute_descriptions,options) if options.has_key?(:limit) and !options.has_key?(:order_by) session_cache_result=@session_cache.query(table_name,attribute_descriptions,options) if options[:limit]==session_cache_result.length return session_cache_result end end the_query=build_query(table_name, attribute_descriptions, options) max=250 if options[:limit] max=options[:limit].to_i end page_size=max>250?250:max sdb_result,token=sdb_query_with_attributes(table_name,the_query,page_size) while !(token.nil? || token.empty? || sdb_result.length>=max) page_size=max- sdb_result.length page_size=page_size>250?250:page_size partial_results,token=sdb_query_with_attributes(table_name,the_query,page_size,token) sdb_result.merge!( partial_results) end result=[] sdb_result.each{|primary_key,sdb_row| attributes =parse_attributes(attribute_descriptions,sdb_row) if attributes # #attributes[:primary_key]=primary_key result< a_value else -1 end end else if !b_value 1 else if a_value a_value <=> b_value else -1 end end end end end if options[:limit] && result.length>options[:limit] result=result[0..(options[:limit]-1)] end return result end def find_one(table_name, primary_key,attribute_descriptions)#, non_clob_attribute_names, clob_attribute_names) session_cache_result=@session_cache.find_one(table_name, make_cache_key(table_name,primary_key),attribute_descriptions) return session_cache_result if session_cache_result # if @use_cache # yaml=@storage.get(@clob_bucket,make_cache_key(table_name,primary_key)) # if yaml # result=YAML::load( yaml) # if result.respond_to?(:non_clob_attributes) && result.non_clob_attributes!=nil # return parse_attributes(attribute_descriptions, result.non_clob_attributes) # end # # end # end attributes=parse_attributes(attribute_descriptions,sdb_get_attributes(table_name,primary_key)) if attributes # #attributes[:primary_key]=primary_key #put_into_cache(table_name, # primary_key, attributes) end attributes end def make_domain_name(table_name) if @append_table_to_domain @sdb_domain_prefix+"_"+table_name else @sdb_domain_prefix end end def sdb_get_attributes(table_name,primary_key) @logger.debug( "SDB get_attributes #{table_name} : #{primary_key}") if @logger 20.times do |i| begin return @sdb.get_attributes(make_domain_name(table_name), make_cache_key(table_name,primary_key)) rescue Exception => e s= "#{e.message}\n#{e.backtrace}" @logger.warn(s) if @logger sleep(i*i) ensure end end end def sdb_query(table_name,query,max,token=nil) @logger.debug( "SDB query:#{table_name}(#{max}) : #{query} #{token}" ) if @logger # puts "#{table_name} #{query} (#{max}) #{token}" 20.times do |i| begin return @sdb.query(make_domain_name(table_name),query,max,token) rescue Exception => e s= "#{e.message}\n#{e.backtrace}" @logger.error(s) if @logger sleep(i*i) ensure end end end def sdb_query_with_attributes(table_name,query,max,token=nil) @logger.debug( "SDB query:#{table_name}(#{max}) : #{query} #{token}" ) if @logger puts "#{table_name} #{query} (#{max}) #{token}" 20.times do |i| begin return @sdb.query_with_attributes(make_domain_name(table_name),query,max,token) rescue Exception => e s= "#{e.message}\n#{e.backtrace}" @logger.error(s) if @logger sleep(i*i) ensure end end end def parse_attributes(attribute_descriptions,attributes) if !attributes || attributes.length==0 return nil end parsed_attributes={} attribute_descriptions.each do |attribute_name,attribute_description| value=attributes[attribute_name.to_sym] if !value value=attributes[attribute_name] end #sdb attributes are often array of one if !attribute_description.is_collection && value.respond_to?(:flatten) && value.length==1 value=value[0] end parsed_attributes[attribute_name.to_sym]=attribute_description.parse_from_sdb(value) end parsed_attributes end def destroy(table_name, primary_key) @sdb.delete_attributes(make_domain_name(table_name),make_cache_key(table_name, primary_key) ) # if @use_cache # @storage.delete(@clob_bucket,make_cache_key(table_name,primary_key)) # end end #parse date in yyyy-mm-dd format def pause sleep(2) end def clear @session_cache.clear end end end