lib/rbbt/persist/tsv/sharder.rb in rbbt-util-5.13.36 vs lib/rbbt/persist/tsv/sharder.rb in rbbt-util-5.13.37

- old
+ new

@@ -1,23 +1,23 @@ module Persist module SharderAdapter - def self.open(path, write, type=nil, &block) + def self.open(path, write, type=nil, options = {}, &block) - database = CONNECTIONS[path] ||= Sharder.new(path, write, type, &block) + database = CONNECTIONS[path] ||= Sharder.new(path, write, type, options, &block) database.extend Persist::SharderAdapter unless Persist::SharderAdapter === database database end - end class Sharder - attr_accessor :persistence_path, :shard_function, :databases, :closed, :writable, :mutex, :db_type + attr_accessor :persistence_path, :shard_function, :databases, :closed, :writable, :mutex, :db_type, :options - def initialize(persistence_path, write = false, db_type=nil, &block) + def initialize(persistence_path, write = false, db_type=nil, options = {}, &block) @shard_function = block + @options = options @persistence_path = Path.setup(persistence_path) @mutex = Mutex.new @writable = write @db_type = db_type @@ -30,32 +30,44 @@ self[key] = value end def persistence_path=(path) @persistence_path = path + databases.values.each{|db| db.persistence_path = File.join(path, File.basename(db.persistence_path))} end def databases @databases ||= begin hash = {} @persistence_path.glob('shard-*').each do |f| shard = File.basename(f).match(/shard-(.*)/)[1] - hash[shard] = Persist.open_database(f, false, :clean, db_type) + if shard == 'metadata' + hash[shard] = Persist.open_database(f, false, :clean, "HDB", @options) + else + hash[shard] = Persist.open_database(f, false, :clean, db_type, @options) + end end hash end end def database(key) - shard = key =~ /__tsv_/ ? "0" : shard_function.call(key) + shard = key =~ /__tsv_/ ? "metadata" : shard_function.call(key) if databases.include? shard databases[shard] else - database ||= begin + if shard == 'metadata' + database ||= begin + path = File.join(persistence_path, 'shard-' << shard.to_s) + (writable or File.exists?(path)) ? Persist.open_database(path, writable, :clean, "HDB", @options) : nil + end + else + database ||= begin path = File.join(persistence_path, 'shard-' << shard.to_s) - (writable or File.exists?(path)) ? Persist.open_database(path, writable, :clean, db_type) : nil + (writable or File.exists?(path)) ? Persist.open_database(path, writable, :clean, db_type, @options) : nil end + end if database databases[shard] = database else Log.warn "Database #{ path } missing" if nil @@ -82,10 +94,11 @@ @closed = true super end def read(force = false) + raise "SIOT" return if not write? and not closed and not force self.close databases.each{|d| d.read } @writable = false @closed = false @@ -202,26 +215,33 @@ def write databases.values.each{|database| database.write } end - def read - databases.values.each{|database| database.read } + def read(force = false) + databases.values.each{|database| database.read(force) } end def close databases.values.each{|database| database.close } end + + def size + databases.inject(0){|acc,i| + shard, db = i; + acc += db.size + } + end end - def self.open_sharder(path, write, serializer = nil, tokyocabinet_class = TokyoCabinet::HDB, &shard_function) + def self.open_sharder(path, write, serializer = nil, type = TokyoCabinet::HDB, options, &shard_function) write = true unless File.exists? path FileUtils.mkdir_p File.dirname(path) unless File.exists?(File.dirname(path)) - database = Persist::SharderAdapter.open(path, write, tokyocabinet_class, &shard_function) + database = Persist::SharderAdapter.open(path, write, type, options, &shard_function) - unless serializer == :clean + unless serializer == :clean #or type.to_s == 'fwt' TSV.setup database database.serializer = serializer if serializer end database