./lib/sdb_dal/repository.rb in sdb_dal-0.0.4 vs ./lib/sdb_dal/repository.rb in sdb_dal-0.0.5
- old
+ new
@@ -1,368 +1,419 @@
module SdbDal
-$KCODE = 'u'
-require "aws_sdb"
-require "sdb_dal/storage.rb"
-require 'ya2yaml'
-require File.dirname(__FILE__) +"/memory_repository.rb"
-require File.dirname(__FILE__) +"/domain_object_cache_item.rb"
-class Repository
- attr_accessor :use_cache
- attr_accessor :storage
- def initialize(
- sdb_domain_prefix,#MyDevPayApp
- clob_bucket,
- aws_key_id,
- aws_secret_key,
- memcache_servers = nil ,
- a_storage=nil,
- append_table_to_domain=true,
- options={}
+ $KCODE = 'u'
+ require "aws_sdb"
+ require "sdb_dal/storage.rb"
+ require 'ya2yaml'
+ require File.dirname(__FILE__) +"/memory_repository.rb"
+ require File.dirname(__FILE__) +"/domain_object_cache_item.rb"
+
+ require File.dirname(__FILE__) +"/sdb_monkey_patch.rb"
+ class Repository
+ attr_accessor :use_cache
+ attr_accessor :storage
+ def initialize(
+ sdb_domain_prefix,#MyDevPayApp
+ clob_bucket,
+ aws_key_id,
+ aws_secret_key,
+ memcache_servers = nil ,
+ a_storage=nil,
+ append_table_to_domain=true,
+ options={}
- )
+ )
- @use_cache=true
- @storage=a_storage
- @storage||=Storage.new(aws_key_id,aws_secret_key,memcache_servers)
- @sdb_domain_prefix = sdb_domain_prefix
- @clob_bucket = clob_bucket
- @memcache_servers = memcache_servers
- @aws_key_id = aws_key_id
- @aws_secret_key = aws_secret_key
- @append_table_to_domain=append_table_to_domain
- @logger = options[:logger]
- if !@logger
- @logger = Logger.new(STDOUT)
- @logger.level = Logger::ERROR
- end
+ @use_cache=true
+ @storage=a_storage
+ @storage||=Storage.new(aws_key_id,aws_secret_key,memcache_servers)
+ @sdb_domain_prefix = sdb_domain_prefix
+ @clob_bucket = clob_bucket
+ @memcache_servers = memcache_servers
+ @aws_key_id = aws_key_id
+ @aws_secret_key = aws_secret_key
+ @append_table_to_domain=append_table_to_domain
+ @logger = options[:logger]
+ if !@logger
+ @logger = Logger.new(STDOUT)
+ @logger.level = Logger::ERROR
+ end
- @sdb=AwsSdb::Service.new(:access_key_id=>aws_key_id,:secret_access_key=>aws_secret_key,:url=>"http://sdb.amazonaws.com",:logger=>@logger)
- @session_cache=MemoryRepository.new
+ @sdb=AwsSdb::Service.new(:access_key_id=>aws_key_id,:secret_access_key=>aws_secret_key,:url=>"http://sdb.amazonaws.com",:logger=>@logger)
+ @session_cache=MemoryRepository.new
- #create_domain()
- end
+ # #create_domain()
+ end
- def clear_session_cache
- @session_cache.clear
- end
- def make_clob_key(table_name,primary_key,clob_name)
- return "clobs/#{table_name}/#{CGI.escape(primary_key.to_s)}/#{clob_name}"
- end
- def make_cache_key(table_name,primary_key)
- return "cached_objects/#{table_name}/#{CGI.escape(primary_key.to_s)}"
- end
- def get_clob(table_name,primary_key,clob_name)
- return @storage.get(@clob_bucket,make_clob_key(table_name,primary_key,clob_name))
-
- end
- def save(table_name, primary_key, attributes)
- @session_cache.save(table_name,primary_key,attributes)
- formatted_attributes={}
- attributes.each do |description,value|
- if value || description.value_type==:boolean
- if description.is_clob
- @storage.put(
- @clob_bucket,
- make_clob_key(table_name,primary_key,description.name),
- value.to_s,
- {})
- else
- formatted_attributes[description.name]=description.format_for_sdb(value)
+ def clear_session_cache
+ @session_cache.clear
+ end
+ def flatten_key(key)
+ if key.is_a?( Array)
+ flattened_key=""
+ key.each do |key_part|
+ flattened_key<<CGI.escape(key_part.to_s)+"/"
end
+ return flattened_key[0..-2]
+ else
+ return CGI.escape(key.to_s)
end
end
+ def make_cache_key(table_name,primary_key)
+
+ primary_key=flatten_key(primary_key)
+ primary_key="#{table_name}/#{primary_key}" unless @append_table_to_domain
+ return primary_key
+ end
+ def make_clob_key(table_name,primary_key,clob_name)
+ return "clobs/#{table_name}/#{flatten_key(primary_key)}/#{clob_name}"
+ end
+ def get_clob(table_name,primary_key,clob_name)
+ return @storage.get(@clob_bucket,make_clob_key(table_name,primary_key,clob_name))
- put_attributes(table_name,primary_key, formatted_attributes )
- # put_into_cache(table_name, primary_key, formatted_attributes)
+ end
+ def save(table_name, primary_key, attributes,index_descriptions)
+ @session_cache.save(table_name,primary_key,attributes,index_descriptions)
+ formatted_attributes={}
+ attributes.each do |description,value|
+ if value || description.value_type==:boolean
+ if description.is_clob
+ @storage.put(
+ @clob_bucket,
+ make_clob_key(table_name,primary_key,description.name),
+ value.to_s,
+ {})
+ else
+ formatted_attributes[description.name]=description.format_for_sdb(value)
+ end
+ end
+ end
+ if !@append_table_to_domain
+ formatted_attributes['metadata%%table_name'] = table_name
+
+ end
+ put_attributes(table_name,primary_key, formatted_attributes )
+ # put_into_cache(table_name, primary_key, formatted_attributes)
- end
-# def put_into_cache(table_name, primary_key, formatted_attributes)
-# if @use_cache
-# cacheItem=DomainObjectCacheItem.new(table_name, primary_key, formatted_attributes)
-#
-# yaml=cacheItem.ya2yaml(:syck_compatible => true)
-#
-# @storage.put(
-# @clob_bucket,
-# make_cache_key(table_name,primary_key),
-# yaml ,
-# {})
-# end
-# end
- def create_domain
- 20.times do |i|
- begin
- @sdb.create_domain(@sdb_domain)
- return
+ end
+ # def put_into_cache(table_name, primary_key, formatted_attributes)
+ # if @use_cache
+ # cacheItem=DomainObjectCacheItem.new(table_name, primary_key, formatted_attributes)
+ #
+ # yaml=cacheItem.ya2yaml(:syck_compatible => true)
+ #
+ # @storage.put(
+ # @clob_bucket,
+ # make_cache_key(table_name,primary_key),
+ # yaml ,
+ # {})
+ # end
+ # end
+ def create_domain
+ 20.times do |i|
+ begin
+ @sdb.create_domain(@sdb_domain_prefix)
+ return
- rescue => e
- s= "#{e.message}\n#{e.backtrace}"
- @logger.warn(s) if @logger
- sleep(i*i)
+ rescue => e
+ s= "#{e.message}\n#{e.backtrace}"
+ @logger.warn(s) if @logger
+ sleep(i*i)
+ end
end
- end
- end
- def put_attributes(table_name,primary_key, formatted_attributes,options={})
- 20.times do |i|
- begin
- @sdb.put_attributes(make_domain_name(table_name),primary_key.to_s, formatted_attributes, true )
- return
+ end
+ def put_attributes(table_name,primary_key, formatted_attributes,options={})
+ 20.times do |i|
+ begin
+ @sdb.put_attributes(make_domain_name(table_name),make_cache_key(table_name,primary_key) , formatted_attributes, true )
+ return
- rescue Exception => e
- s= "#{e.message}\n#{e.backtrace}"
- @logger.warn(s) if @logger
+ rescue Exception => e
+ s= "#{e.message}\n#{e.backtrace}"
+ @logger.warn(s) if @logger
- sleep(i*i)
+ sleep(i*i)
+ end
end
- end
- end
- def extend_query(query,new_clause)
- if query.length>0
- query << " intersection "
end
-
- query << new_clause
- end
- def escape_quotes(value)
- value.gsub( "'","\\'")
- end
- def query_ids(table_name,attribute_descriptions,options={})
-
- if options.has_key?(:limit) and !options.has_key?(:order_by)
- session_cache_result=@session_cache.query_ids(table_name,attribute_descriptions,options)
- if options[:limit]==session_cache_result.length
- return session_cache_result
+ def extend_query(query,new_clause)
+ if query.length>0
+ query << " intersection "
end
+
+ query << new_clause
end
- query=""
+ def escape_quotes(value)
+ return nil unless value
+ value.gsub( "'","\\'")
+ end
+ def build_query(table_name,attribute_descriptions,options={})
- if options
- if options.has_key?(:query)
- extend_query(query,"["+options[:query]+"]")
+
+ query=""
+ params=nil
+ params=options[:params] if options.has_key?(:params)
+ if options.has_key?(:map)
+ params||={}
+ keys=options[:map][:keys]
+ values=options[:map][:values]
+ (0..keys.length-1).each do |i|
+ key=keys[i]
+ value=values[i]
+ params[key]=value
+ end
+
end
- if options.has_key?(:params)
- options[:params].each do |key,value|
- got_something=false
- if attribute_descriptions.has_key?(key)
- if value==:NOT_NULL
- got_something=true
- extend_query(query," ['#{key}' starts-with '']")
+ if !@append_table_to_domain
+ extend_query(query," ['metadata%%table_name' = '#{table_name}']")
+
+ end
+ if options
+ if options.has_key?(:query)
+ extend_query(query,"["+options[:query]+"]")
+ end
+ if params
+ params.each do |key,value|
+ got_something=false
+ if attribute_descriptions.has_key?(key)
+ if value==:NOT_NULL
+ got_something=true
+ extend_query(query," ['#{key}' starts-with '']")
+ end
+ if value==:NULL
+ got_something=true
+ extend_query(query," not ['#{key}' starts-with '']")
+ end
+ if value.respond_to?(:less_than) && value.less_than
+ got_something=true
+ extend_query(query," ['#{key}' < '#{escape_quotes(attribute_descriptions[key].format_for_sdb_single( value.less_than))}']")
+ end
+ if value.respond_to?(:greater_than) && value.greater_than
+ got_something=true
+ extend_query(query," ['#{key}' > '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single( value.greater_than))}']")
+ end
+ if value.respond_to?(:less_than_or_equal_to) && value.less_than_or_equal_to
+ got_something=true
+ extend_query(query,"['#{key}' <= '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single( value.less_than_or_equal_to))}']")
+ end
+ if value.respond_to?(:greater_than_or_equal_to) && value.greater_than_or_equal_to
+ got_something=true
+ extend_query(query," ['#{key}' >= '#{escape_quotes(attribute_descriptions[key].format_for_sdb_single( value.greater_than_or_equal_to))}']")
+ end
+ if value==false
+ got_something=true
+ extend_query(query," ['#{key}' != 'true' ]")
+ end
+ if !got_something
+ extend_query(query," ['#{key}' = '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single(value))}']")
+ end
+ else
+ # #it must be formatted already. likely an index
+ extend_query(query,"['#{key}' = '#{escape_quotes value}']")
end
- if value==:NULL
- got_something=true
- extend_query(query," not ['#{key}' starts-with '']")
- end
- if value.respond_to?(:less_than) && value.less_than
- got_something=true
- extend_query(query," ['#{key}' < '#{escape_quotes(attribute_descriptions[key].format_for_sdb_single( value.less_than))}']")
- end
- if value.respond_to?(:greater_than) && value.greater_than
- got_something=true
- extend_query(query," ['#{key}' > '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single( value.greater_than))}']")
- end
- if value.respond_to?(:less_than_or_equal_to) && value.less_than_or_equal_to
- got_something=true
- extend_query(query,"['#{key}' <= '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single( value.less_than_or_equal_to))}']")
- end
- if value.respond_to?(:greater_than_or_equal_to) && value.greater_than_or_equal_to
- got_something=true
- extend_query(query," ['#{key}' >= '#{escape_quotes(attribute_descriptions[key].format_for_sdb_single( value.greater_than_or_equal_to))}']")
- end
- if value==false
- got_something=true
- extend_query(query," ['#{key}' != 'true' ]")
- end
- if !got_something
- extend_query(query," ['#{key}' = '#{escape_quotes( attribute_descriptions[key].format_for_sdb_single(value))}']")
- end
- else
- #it must be formatted already. likely an index
- extend_query(query,"['#{key}' = '#{escape_quotes value}']")
end
end
- end
- if options.has_key?(:order_by) && options[:order_by]
- clause=" ['#{options[:order_by]}' starts-with ''] sort '#{options[:order_by]}' "
- if options.has_key?(:order) and options[:order]==:descending
- clause<<" desc "
- end
- extend_query(query,clause)
- end
- if options.has_key?(:conditions)
- options[:conditions].each do |condition|
+ if options.has_key?(:order_by) && options[:order_by]
+ clause=" ['#{options[:order_by]}' starts-with ''] sort '#{options[:order_by]}' "
+ if options.has_key?(:order) and options[:order]==:descending
+ clause<<" desc "
+ end
+ extend_query(query,clause)
+ end
+ if options.has_key?(:conditions)
+ options[:conditions].each do |condition|
- extend_query(query," [ "+condition.to_sdb_query()+"]")
+ extend_query(query," [ "+condition.to_sdb_query()+"]")
+ end
end
- end
+ end
+ return query
+
end
-
- max=250
- if options[:limit]
- max=options[:limit].to_i
-
- end
- page_size=max>250?250:max
- sdb_results,token=sdb_query(table_name,query,page_size)
-
- while !(token.nil? || token.empty? || sdb_results.length>=max)
- page_size=max- sdb_results.length
- page_size=page_size>250?250:page_size
- partial_results,token=sdb_query(table_name,query,page_size,token)
- sdb_results.concat( partial_results)
- end
- sdb_results
-
-
- return sdb_results
- end
- def query(table_name,attribute_descriptions,options)
- ids=query_ids(table_name,attribute_descriptions,options)
- result=[]
- ids.each do |primary_key|
- if primary_key
- (0..4).each do |try_count|
- this_one=find_one(table_name,primary_key,attribute_descriptions)
- if this_one
- result<<this_one
- break
- else
- #sdb query just told us it was there so keep trying
- sleep(try_count*try_count)
- end
+ # #result will be an array of hashes. each hash is a set of attributes
+ def query(table_name,attribute_descriptions,options)
+ if options.has_key?(:limit) and !options.has_key?(:order_by)
+ session_cache_result=@session_cache.query(table_name,attribute_descriptions,options)
+ if options[:limit]==session_cache_result.length
+ return session_cache_result
end
- else
- @logger.error("blank primary key")
end
- end
- if options and options[:order_by]
- result.sort! do |a,b|
- a_value=a[options[:order_by]]
- b_value=b[options[:order_by]]
- if options[:order] && options[:order]!=:ascending
- if !a_value
- 1
- else
- if b_value
- b_value <=> a_value
+
+ the_query=build_query(table_name, attribute_descriptions, options)
+ max=250
+ if options[:limit]
+ max=options[:limit].to_i
+
+ end
+ page_size=max>250?250:max
+ sdb_result,token=sdb_query_with_attributes(table_name,the_query,page_size)
+
+ while !(token.nil? || token.empty? || sdb_result.length>=max)
+ page_size=max- sdb_result.length
+ page_size=page_size>250?250:page_size
+ partial_results,token=sdb_query_with_attributes(table_name,the_query,page_size,token)
+
+ sdb_result.merge!( partial_results)
+ end
+ result=[]
+ sdb_result.each{|primary_key,sdb_row|
+ attributes =parse_attributes(attribute_descriptions,sdb_row)
+ if attributes
+ # #attributes[:primary_key]=primary_key
+
+ result<<attributes
+ end
+ }
+ if options and options[:order_by]
+ result.sort! do |a,b|
+ a_value=a[options[:order_by]]
+ b_value=b[options[:order_by]]
+ if options[:order] && options[:order]!=:ascending
+ if !a_value
+ 1
else
- -1
+ if b_value
+ b_value <=> a_value
+ else
+ -1
+ end
end
- end
- else
- if !b_value
- 1
else
- if a_value
- a_value <=> b_value
+ if !b_value
+ 1
else
- -1
+ if a_value
+ a_value <=> b_value
+ else
+ -1
+ end
end
end
end
end
+ if options[:limit] && result.length>options[:limit]
+ result=result[0..(options[:limit]-1)]
+ end
+ return result
end
- if options[:limit] && result.length>options[:limit]
- result=result[0..(options[:limit]-1)]
+ def find_one(table_name, primary_key,attribute_descriptions)#, non_clob_attribute_names, clob_attribute_names)
+ session_cache_result=@session_cache.find_one(table_name, make_cache_key(table_name,primary_key),attribute_descriptions)
+ return session_cache_result if session_cache_result
+ # if @use_cache
+ # yaml=@storage.get(@clob_bucket,make_cache_key(table_name,primary_key))
+ # if yaml
+ # result=YAML::load( yaml)
+ # if result.respond_to?(:non_clob_attributes) && result.non_clob_attributes!=nil
+ # return parse_attributes(attribute_descriptions, result.non_clob_attributes)
+ # end
+ #
+ # end
+ # end
+ attributes=parse_attributes(attribute_descriptions,sdb_get_attributes(table_name,primary_key))
+ if attributes
+ # #attributes[:primary_key]=primary_key #put_into_cache(table_name,
+ # primary_key, attributes)
+ end
+
+ attributes
end
- return result
- end
- def find_one(table_name, primary_key,attribute_descriptions)#, non_clob_attribute_names, clob_attribute_names)
- session_cache_result=@session_cache.find_one(table_name, primary_key,attribute_descriptions)
- return session_cache_result if session_cache_result
-# if @use_cache
-# yaml=@storage.get(@clob_bucket,make_cache_key(table_name,primary_key))
-# if yaml
-# result=YAML::load( yaml)
-# if result.respond_to?(:non_clob_attributes) && result.non_clob_attributes!=nil
-# return parse_attributes(attribute_descriptions, result.non_clob_attributes)
-# end
-#
-# end
-# end
- attributes=parse_attributes(attribute_descriptions,sdb_get_attributes(table_name,primary_key))
- if attributes
- attributes[:primary_key]=primary_key
- #put_into_cache(table_name, primary_key, attributes)
+ def make_domain_name(table_name)
+ if @append_table_to_domain
+ @sdb_domain_prefix+"_"+table_name
+ else
+ @sdb_domain_prefix
+ end
end
-
- attributes
- end
- def make_domain_name(table_name)
- if @append_table_to_domain
- @sdb_domain_prefix+"_"+table_name
- else
- @sdb_domain_prefix
- end
- end
- def sdb_get_attributes(table_name,primary_key)
+ def sdb_get_attributes(table_name,primary_key)
- @logger.debug( "SDB get_attributes #{table_name} : #{primary_key}") if @logger
+ @logger.debug( "SDB get_attributes #{table_name} : #{primary_key}") if @logger
- 20.times do |i|
- begin
- return @sdb.get_attributes(make_domain_name(table_name), primary_key)
- rescue Exception => e
- s= "#{e.message}\n#{e.backtrace}"
- @logger.warn(s) if @logger
+ 20.times do |i|
+ begin
+ return @sdb.get_attributes(make_domain_name(table_name), make_cache_key(table_name,primary_key))
+ rescue Exception => e
+ s= "#{e.message}\n#{e.backtrace}"
+ @logger.warn(s) if @logger
- sleep(i*i)
- ensure
+ sleep(i*i)
+ ensure
+ end
end
- end
- end
- def sdb_query(table_name,query,max,token=nil)
+ end
+ def sdb_query(table_name,query,max,token=nil)
@logger.debug( "SDB query:#{table_name}(#{max}) : #{query} #{token}" ) if @logger
- puts "#{table_name} #{query} (#{max}) #{token}"
- 20.times do |i|
- begin
- return @sdb.query(make_domain_name(table_name),query,max,token)
+ # puts "#{table_name} #{query} (#{max}) #{token}"
+ 20.times do |i|
+ begin
+ return @sdb.query(make_domain_name(table_name),query,max,token)
- rescue Exception => e
- s= "#{e.message}\n#{e.backtrace}"
- @logger.error(s) if @logger
+ rescue Exception => e
+ s= "#{e.message}\n#{e.backtrace}"
+ @logger.error(s) if @logger
- sleep(i*i)
- ensure
+ sleep(i*i)
+ ensure
+ end
end
- end
- end
-
- def parse_attributes(attribute_descriptions,attributes)
- if !attributes || attributes.length==0
- return nil
end
- parsed_attributes={}
- attribute_descriptions.each do |attribute_name,attribute_description|
- value=attributes[attribute_name.to_sym]
- if !value
- value=attributes[attribute_name]
+ def sdb_query_with_attributes(table_name,query,max,token=nil)
+
+ @logger.debug( "SDB query:#{table_name}(#{max}) : #{query} #{token}" ) if @logger
+ puts "#{table_name} #{query} (#{max}) #{token}"
+ 20.times do |i|
+ begin
+ return @sdb.query_with_attributes(make_domain_name(table_name),query,max,token)
+
+ rescue Exception => e
+ s= "#{e.message}\n#{e.backtrace}"
+ @logger.error(s) if @logger
+
+ sleep(i*i)
+ ensure
+
+ end
end
- #sdb attributes are often array of one
- if !attribute_description.is_collection && value.respond_to?(:flatten) && value.length==1
- value=value[0]
- end
- parsed_attributes[attribute_name.to_sym]=attribute_description.parse_from_sdb(value)
+
end
- parsed_attributes
- end
- def destroy(table_name, primary_key)
- @sdb.delete_attributes(make_domain_name(table_name), primary_key)
- if @use_cache
- @storage.delete(@clob_bucket,make_cache_key(table_name,primary_key))
+ def parse_attributes(attribute_descriptions,attributes)
+ if !attributes || attributes.length==0
+ return nil
+ end
+ parsed_attributes={}
+ attribute_descriptions.each do |attribute_name,attribute_description|
+ value=attributes[attribute_name.to_sym]
+ if !value
+ value=attributes[attribute_name]
+ end
+ #sdb attributes are often array of one
+ if !attribute_description.is_collection && value.respond_to?(:flatten) && value.length==1
+ value=value[0]
+ end
+ parsed_attributes[attribute_name.to_sym]=attribute_description.parse_from_sdb(value)
+ end
+ parsed_attributes
end
- end
- #parse date in yyyy-mm-dd format
+ def destroy(table_name, primary_key)
+ @sdb.delete_attributes(make_domain_name(table_name),make_cache_key(table_name, primary_key) )
+ # if @use_cache
+ # @storage.delete(@clob_bucket,make_cache_key(table_name,primary_key))
+ # end
+ end
+ #parse date in yyyy-mm-dd format
-def pause
- sleep(2)
-end
-end
+ def pause
+ sleep(2)
+ end
+
+ def clear
+ @session_cache.clear
+ end
+ end
end
\ No newline at end of file