./lib/sdb_dal/repository.rb in sdb_dal-0.0.4 vs ./lib/sdb_dal/repository.rb in sdb_dal-0.0.5

- old
+ new

@@ -1,368 +1,419 @@ module SdbDal -$KCODE = 'u' -require "aws_sdb" -require "sdb_dal/storage.rb" -require 'ya2yaml' -require File.dirname(__FILE__) +"/memory_repository.rb" -require File.dirname(__FILE__) +"/domain_object_cache_item.rb" -class Repository - attr_accessor :use_cache - attr_accessor :storage - def initialize( - sdb_domain_prefix,#MyDevPayApp - clob_bucket, - aws_key_id, - aws_secret_key, - memcache_servers = nil , - a_storage=nil, - append_table_to_domain=true, - options={} + $KCODE = 'u' + require "aws_sdb" + require "sdb_dal/storage.rb" + require 'ya2yaml' + require File.dirname(__FILE__) +"/memory_repository.rb" + require File.dirname(__FILE__) +"/domain_object_cache_item.rb" + + require File.dirname(__FILE__) +"/sdb_monkey_patch.rb" + class Repository + attr_accessor :use_cache + attr_accessor :storage + def initialize( + sdb_domain_prefix,#MyDevPayApp + clob_bucket, + aws_key_id, + aws_secret_key, + memcache_servers = nil , + a_storage=nil, + append_table_to_domain=true, + options={} - ) + ) - @use_cache=true - @storage=a_storage - @storage||=Storage.new(aws_key_id,aws_secret_key,memcache_servers) - @sdb_domain_prefix = sdb_domain_prefix - @clob_bucket = clob_bucket - @memcache_servers = memcache_servers - @aws_key_id = aws_key_id - @aws_secret_key = aws_secret_key - @append_table_to_domain=append_table_to_domain - @logger = options[:logger] - if !@logger - @logger = Logger.new(STDOUT) - @logger.level = Logger::ERROR - end + @use_cache=true + @storage=a_storage + @storage||=Storage.new(aws_key_id,aws_secret_key,memcache_servers) + @sdb_domain_prefix = sdb_domain_prefix + @clob_bucket = clob_bucket + @memcache_servers = memcache_servers + @aws_key_id = aws_key_id + @aws_secret_key = aws_secret_key + @append_table_to_domain=append_table_to_domain + @logger = options[:logger] + if !@logger + @logger = Logger.new(STDOUT) + @logger.level = Logger::ERROR + end - @sdb=AwsSdb::Service.new(:access_key_id=>aws_key_id,:secret_access_key=>aws_secret_key,:url=>"http://sdb.amazonaws.com",:logger=>@logger) - @session_cache=MemoryRepository.new + @sdb=AwsSdb::Service.new(:access_key_id=>aws_key_id,:secret_access_key=>aws_secret_key,:url=>"http://sdb.amazonaws.com",:logger=>@logger) + @session_cache=MemoryRepository.new - #create_domain() - end + # #create_domain() + end - def clear_session_cache - @session_cache.clear - end - def make_clob_key(table_name,primary_key,clob_name) - return "clobs/#{table_name}/#{CGI.escape(primary_key.to_s)}/#{clob_name}" - end - def make_cache_key(table_name,primary_key) - return "cached_objects/#{table_name}/#{CGI.escape(primary_key.to_s)}" - end - def get_clob(table_name,primary_key,clob_name) - return @storage.get(@clob_bucket,make_clob_key(table_name,primary_key,clob_name)) - - end - def save(table_name, primary_key, attributes) - @session_cache.save(table_name,primary_key,attributes) - formatted_attributes={} - attributes.each do |description,value| - if value || description.value_type==:boolean - if description.is_clob - @storage.put( - @clob_bucket, - make_clob_key(table_name,primary_key,description.name), - value.to_s, - {}) - else - formatted_attributes[description.name]=description.format_for_sdb(value) + def clear_session_cache + @session_cache.clear + end + def flatten_key(key) + if key.is_a?( Array) + flattened_key="" + key.each do |key_part| + flattened_key<<CGI.escape(key_part.to_s)+"/" end + return flattened_key[0..-2] + else + return CGI.escape(key.to_s) end end + def make_cache_key(table_name,primary_key) + + primary_key=flatten_key(primary_key) + primary_key="#{table_name}/#{primary_key}" unless @append_table_to_domain + return primary_key + end + def make_clob_key(table_name,primary_key,clob_name) + return "clobs/#{table_name}/#{flatten_key(primary_key)}/#{clob_name}" + end + def get_clob(table_name,primary_key,clob_name) + return @storage.get(@clob_bucket,make_clob_key(table_name,primary_key,clob_name)) - put_attributes(table_name,primary_key, formatted_attributes ) - # put_into_cache(table_name, primary_key, formatted_attributes) + end + def save(table_name, primary_key, attributes,index_descriptions) + @session_cache.save(table_name,primary_key,attributes,index_descriptions) + formatted_attributes={} + attributes.each do |description,value| + if value || description.value_type==:boolean + if description.is_clob + @storage.put( + @clob_bucket, + make_clob_key(table_name,primary_key,description.name), + value.to_s, + {}) + else + formatted_attributes[description.name]=description.format_for_sdb(value) + end + end + end + if !@append_table_to_domain + formatted_attributes['metadata%%table_name'] = table_name + + end + put_attributes(table_name,primary_key, formatted_attributes ) + # put_into_cache(table_name, primary_key, formatted_attributes) - end -# def put_into_cache(table_name, primary_key, formatted_attributes) -# if @use_cache -# cacheItem=DomainObjectCacheItem.new(table_name, primary_key, formatted_attributes) -# -# yaml=cacheItem.ya2yaml(:syck_compatible => true) -# -# @storage.put( -# @clob_bucket, -# make_cache_key(table_name,primary_key), -# yaml , -# {}) -# end -# end - def create_domain - 20.times do |i| - begin - @sdb.create_domain(@sdb_domain) - return + end + # def put_into_cache(table_name, primary_key, formatted_attributes) + # if @use_cache + # cacheItem=DomainObjectCacheItem.new(table_name, primary_key, formatted_attributes) + # + # yaml=cacheItem.ya2yaml(:syck_compatible => true) + # + # @storage.put( + # @clob_bucket, + # make_cache_key(table_name,primary_key), + # yaml , + # {}) + # end + # end + def create_domain + 20.times do |i| + begin + @sdb.create_domain(@sdb_domain_prefix) + return - rescue => e - s= "#{e.message}\n#{e.backtrace}" - @logger.warn(s) if @logger - sleep(i*i) + rescue => e + s= "#{e.message}\n#{e.backtrace}" + @logger.warn(s) if @logger + sleep(i*i) + end end - end - end - def put_attributes(table_name,primary_key, formatted_attributes,options={}) - 20.times do |i| - begin - @sdb.put_attributes(make_domain_name(table_name),primary_key.to_s, formatted_attributes, true ) - return + end + def put_attributes(table_name,primary_key, formatted_attributes,options={}) + 20.times do |i| + begin + @sdb.put_attributes(make_domain_name(table_name),make_cache_key(table_name,primary_key) , formatted_attributes, true ) + return - rescue Exception => e - s= "#{e.message}\n#{e.backtrace}" - @logger.warn(s) if @logger + rescue Exception => e + s= "#{e.message}\n#{e.backtrace}" + @logger.warn(s) if @logger - sleep(i*i) + sleep(i*i) + end end - end - end - def extend_query(query,new_clause) - if query.length>0 - query << " intersection " end - - query << new_clause - end - def escape_quotes(value) - value.gsub( "'","\\'") - end - def query_ids(table_name,attribute_descriptions,options={}) - - if options.has_key?(:limit) and !options.has_key?(:order_by) - session_cache_result=@session_cache.query_ids(table_name,attribute_descriptions,options) - if options[:limit]==session_cache_result.length - return session_cache_result + def extend_query(query,new_clause) + if query.length>0 + query << " intersection " end + + query << new_clause end - query="" + def escape_quotes(value) + return nil unless value + value.gsub( "'","\\'") + end + def build_query(table_name,attribute_descriptions,options={}) - if options - if options.has_key?(:query) - extend_query(query,"["+options[:query]+"]") + + query="" + params=nil + params=options[:params] if options.has_key?(:params) + if options.has_key?(:map) + params||={} + keys=options[:map][:keys] + values=options[:map][:values] + (0..keys.length-1).each do |i| + key=keys[i] + value=values[i] + params[key]=value + end + end - if options.has_key?(:params) - options[:params].each do |key,value| - got_something=false - if attribute_descriptions.has_key?(key) - if value==:NOT_NULL - got_something=true - extend_query(query," ['#{key}' starts-with '']") + if !@append_table_to_domain + extend_query(query," ['metadata%%table_name' = '#{table_name}']") + + end + if options + if options.has_key?(:query) + extend_query(query,"["+options[:query]+"]") + end + if params + params.each do |key,value| + got_something=false + if attribute_descriptions.has_key?(key) + if value==:NOT_NULL + got_something=true + extend_query(query," ['#{key}' starts-with '']") + end + if value==:NULL + got_something=true + extend_query(query," not ['#{key}' starts-with '']") + end + if value.respond_to?(:less_than) && value.less_than + got_something=true + extend_query(query," ['#{key}' < '#{escape_quotes(attribute_descriptions[key].format_for_sdb_single( value.less_than))}']") + end + if value.respond_to?(:greater_than) && value.greater_than + got_something=true + extend_query(query," ['#{key}' > '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single( value.greater_than))}']") + end + if value.respond_to?(:less_than_or_equal_to) && value.less_than_or_equal_to + got_something=true + extend_query(query,"['#{key}' <= '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single( value.less_than_or_equal_to))}']") + end + if value.respond_to?(:greater_than_or_equal_to) && value.greater_than_or_equal_to + got_something=true + extend_query(query," ['#{key}' >= '#{escape_quotes(attribute_descriptions[key].format_for_sdb_single( value.greater_than_or_equal_to))}']") + end + if value==false + got_something=true + extend_query(query," ['#{key}' != 'true' ]") + end + if !got_something + extend_query(query," ['#{key}' = '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single(value))}']") + end + else + # #it must be formatted already. likely an index + extend_query(query,"['#{key}' = '#{escape_quotes value}']") end - if value==:NULL - got_something=true - extend_query(query," not ['#{key}' starts-with '']") - end - if value.respond_to?(:less_than) && value.less_than - got_something=true - extend_query(query," ['#{key}' < '#{escape_quotes(attribute_descriptions[key].format_for_sdb_single( value.less_than))}']") - end - if value.respond_to?(:greater_than) && value.greater_than - got_something=true - extend_query(query," ['#{key}' > '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single( value.greater_than))}']") - end - if value.respond_to?(:less_than_or_equal_to) && value.less_than_or_equal_to - got_something=true - extend_query(query,"['#{key}' <= '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single( value.less_than_or_equal_to))}']") - end - if value.respond_to?(:greater_than_or_equal_to) && value.greater_than_or_equal_to - got_something=true - extend_query(query," ['#{key}' >= '#{escape_quotes(attribute_descriptions[key].format_for_sdb_single( value.greater_than_or_equal_to))}']") - end - if value==false - got_something=true - extend_query(query," ['#{key}' != 'true' ]") - end - if !got_something - extend_query(query," ['#{key}' = '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single(value))}']") - end - else - #it must be formatted already. likely an index - extend_query(query,"['#{key}' = '#{escape_quotes value}']") end end - end - if options.has_key?(:order_by) && options[:order_by] - clause=" ['#{options[:order_by]}' starts-with ''] sort '#{options[:order_by]}' " - if options.has_key?(:order) and options[:order]==:descending - clause<<" desc " - end - extend_query(query,clause) - end - if options.has_key?(:conditions) - options[:conditions].each do |condition| + if options.has_key?(:order_by) && options[:order_by] + clause=" ['#{options[:order_by]}' starts-with ''] sort '#{options[:order_by]}' " + if options.has_key?(:order) and options[:order]==:descending + clause<<" desc " + end + extend_query(query,clause) + end + if options.has_key?(:conditions) + options[:conditions].each do |condition| - extend_query(query," [ "+condition.to_sdb_query()+"]") + extend_query(query," [ "+condition.to_sdb_query()+"]") + end end - end + end + return query + end - - max=250 - if options[:limit] - max=options[:limit].to_i - - end - page_size=max>250?250:max - sdb_results,token=sdb_query(table_name,query,page_size) - - while !(token.nil? || token.empty? || sdb_results.length>=max) - page_size=max- sdb_results.length - page_size=page_size>250?250:page_size - partial_results,token=sdb_query(table_name,query,page_size,token) - sdb_results.concat( partial_results) - end - sdb_results - - - return sdb_results - end - def query(table_name,attribute_descriptions,options) - ids=query_ids(table_name,attribute_descriptions,options) - result=[] - ids.each do |primary_key| - if primary_key - (0..4).each do |try_count| - this_one=find_one(table_name,primary_key,attribute_descriptions) - if this_one - result<<this_one - break - else - #sdb query just told us it was there so keep trying - sleep(try_count*try_count) - end + # #result will be an array of hashes. each hash is a set of attributes + def query(table_name,attribute_descriptions,options) + if options.has_key?(:limit) and !options.has_key?(:order_by) + session_cache_result=@session_cache.query(table_name,attribute_descriptions,options) + if options[:limit]==session_cache_result.length + return session_cache_result end - else - @logger.error("blank primary key") end - end - if options and options[:order_by] - result.sort! do |a,b| - a_value=a[options[:order_by]] - b_value=b[options[:order_by]] - if options[:order] && options[:order]!=:ascending - if !a_value - 1 - else - if b_value - b_value <=> a_value + + the_query=build_query(table_name, attribute_descriptions, options) + max=250 + if options[:limit] + max=options[:limit].to_i + + end + page_size=max>250?250:max + sdb_result,token=sdb_query_with_attributes(table_name,the_query,page_size) + + while !(token.nil? || token.empty? || sdb_result.length>=max) + page_size=max- sdb_result.length + page_size=page_size>250?250:page_size + partial_results,token=sdb_query_with_attributes(table_name,the_query,page_size,token) + + sdb_result.merge!( partial_results) + end + result=[] + sdb_result.each{|primary_key,sdb_row| + attributes =parse_attributes(attribute_descriptions,sdb_row) + if attributes + # #attributes[:primary_key]=primary_key + + result<<attributes + end + } + if options and options[:order_by] + result.sort! do |a,b| + a_value=a[options[:order_by]] + b_value=b[options[:order_by]] + if options[:order] && options[:order]!=:ascending + if !a_value + 1 else - -1 + if b_value + b_value <=> a_value + else + -1 + end end - end - else - if !b_value - 1 else - if a_value - a_value <=> b_value + if !b_value + 1 else - -1 + if a_value + a_value <=> b_value + else + -1 + end end end end end + if options[:limit] && result.length>options[:limit] + result=result[0..(options[:limit]-1)] + end + return result end - if options[:limit] && result.length>options[:limit] - result=result[0..(options[:limit]-1)] + def find_one(table_name, primary_key,attribute_descriptions)#, non_clob_attribute_names, clob_attribute_names) + session_cache_result=@session_cache.find_one(table_name, make_cache_key(table_name,primary_key),attribute_descriptions) + return session_cache_result if session_cache_result + # if @use_cache + # yaml=@storage.get(@clob_bucket,make_cache_key(table_name,primary_key)) + # if yaml + # result=YAML::load( yaml) + # if result.respond_to?(:non_clob_attributes) && result.non_clob_attributes!=nil + # return parse_attributes(attribute_descriptions, result.non_clob_attributes) + # end + # + # end + # end + attributes=parse_attributes(attribute_descriptions,sdb_get_attributes(table_name,primary_key)) + if attributes + # #attributes[:primary_key]=primary_key #put_into_cache(table_name, + # primary_key, attributes) + end + + attributes end - return result - end - def find_one(table_name, primary_key,attribute_descriptions)#, non_clob_attribute_names, clob_attribute_names) - session_cache_result=@session_cache.find_one(table_name, primary_key,attribute_descriptions) - return session_cache_result if session_cache_result -# if @use_cache -# yaml=@storage.get(@clob_bucket,make_cache_key(table_name,primary_key)) -# if yaml -# result=YAML::load( yaml) -# if result.respond_to?(:non_clob_attributes) && result.non_clob_attributes!=nil -# return parse_attributes(attribute_descriptions, result.non_clob_attributes) -# end -# -# end -# end - attributes=parse_attributes(attribute_descriptions,sdb_get_attributes(table_name,primary_key)) - if attributes - attributes[:primary_key]=primary_key - #put_into_cache(table_name, primary_key, attributes) + def make_domain_name(table_name) + if @append_table_to_domain + @sdb_domain_prefix+"_"+table_name + else + @sdb_domain_prefix + end end - - attributes - end - def make_domain_name(table_name) - if @append_table_to_domain - @sdb_domain_prefix+"_"+table_name - else - @sdb_domain_prefix - end - end - def sdb_get_attributes(table_name,primary_key) + def sdb_get_attributes(table_name,primary_key) - @logger.debug( "SDB get_attributes #{table_name} : #{primary_key}") if @logger + @logger.debug( "SDB get_attributes #{table_name} : #{primary_key}") if @logger - 20.times do |i| - begin - return @sdb.get_attributes(make_domain_name(table_name), primary_key) - rescue Exception => e - s= "#{e.message}\n#{e.backtrace}" - @logger.warn(s) if @logger + 20.times do |i| + begin + return @sdb.get_attributes(make_domain_name(table_name), make_cache_key(table_name,primary_key)) + rescue Exception => e + s= "#{e.message}\n#{e.backtrace}" + @logger.warn(s) if @logger - sleep(i*i) - ensure + sleep(i*i) + ensure + end end - end - end - def sdb_query(table_name,query,max,token=nil) + end + def sdb_query(table_name,query,max,token=nil) @logger.debug( "SDB query:#{table_name}(#{max}) : #{query} #{token}" ) if @logger - puts "#{table_name} #{query} (#{max}) #{token}" - 20.times do |i| - begin - return @sdb.query(make_domain_name(table_name),query,max,token) + # puts "#{table_name} #{query} (#{max}) #{token}" + 20.times do |i| + begin + return @sdb.query(make_domain_name(table_name),query,max,token) - rescue Exception => e - s= "#{e.message}\n#{e.backtrace}" - @logger.error(s) if @logger + rescue Exception => e + s= "#{e.message}\n#{e.backtrace}" + @logger.error(s) if @logger - sleep(i*i) - ensure + sleep(i*i) + ensure + end end - end - end - - def parse_attributes(attribute_descriptions,attributes) - if !attributes || attributes.length==0 - return nil end - parsed_attributes={} - attribute_descriptions.each do |attribute_name,attribute_description| - value=attributes[attribute_name.to_sym] - if !value - value=attributes[attribute_name] + def sdb_query_with_attributes(table_name,query,max,token=nil) + + @logger.debug( "SDB query:#{table_name}(#{max}) : #{query} #{token}" ) if @logger + puts "#{table_name} #{query} (#{max}) #{token}" + 20.times do |i| + begin + return @sdb.query_with_attributes(make_domain_name(table_name),query,max,token) + + rescue Exception => e + s= "#{e.message}\n#{e.backtrace}" + @logger.error(s) if @logger + + sleep(i*i) + ensure + + end end - #sdb attributes are often array of one - if !attribute_description.is_collection && value.respond_to?(:flatten) && value.length==1 - value=value[0] - end - parsed_attributes[attribute_name.to_sym]=attribute_description.parse_from_sdb(value) + end - parsed_attributes - end - def destroy(table_name, primary_key) - @sdb.delete_attributes(make_domain_name(table_name), primary_key) - if @use_cache - @storage.delete(@clob_bucket,make_cache_key(table_name,primary_key)) + def parse_attributes(attribute_descriptions,attributes) + if !attributes || attributes.length==0 + return nil + end + parsed_attributes={} + attribute_descriptions.each do |attribute_name,attribute_description| + value=attributes[attribute_name.to_sym] + if !value + value=attributes[attribute_name] + end + #sdb attributes are often array of one + if !attribute_description.is_collection && value.respond_to?(:flatten) && value.length==1 + value=value[0] + end + parsed_attributes[attribute_name.to_sym]=attribute_description.parse_from_sdb(value) + end + parsed_attributes end - end - #parse date in yyyy-mm-dd format + def destroy(table_name, primary_key) + @sdb.delete_attributes(make_domain_name(table_name),make_cache_key(table_name, primary_key) ) + # if @use_cache + # @storage.delete(@clob_bucket,make_cache_key(table_name,primary_key)) + # end + end + #parse date in yyyy-mm-dd format -def pause - sleep(2) -end -end + def pause + sleep(2) + end + + def clear + @session_cache.clear + end + end end \ No newline at end of file