lib/rbbt/persist/tsv/sharder.rb in rbbt-util-5.13.23 vs lib/rbbt/persist/tsv/sharder.rb in rbbt-util-5.13.24

- old
+ new

@@ -1,25 +1,68 @@ -require 'rbbt-util' - module Persist + module SharderAdapter + def self.open(path, write, type=nil, &block) + + database = CONNECTIONS[path] ||= Sharder.new(path, write, type, &block) + + database.extend Persist::SharderAdapter unless Persist::SharderAdapter === database + + database + end + + end + class Sharder - attr_accessor :directory, :params, :shard_function, :databases, :closed, :writable, :mutex + attr_accessor :persistence_path, :shard_function, :databases, :closed, :writable, :mutex, :db_type - def initialize(directory, *rest, &block) + def initialize(persistence_path, write = false, db_type=nil, &block) @shard_function = block - @params = rest - @databases = {} - @directory = directory + @persistence_path = Path.setup(persistence_path) @mutex = Mutex.new + @writable = write + @db_type = db_type + + if write + @databases = {} + end end + def <<(key,value) + self[key] = value + end + + def persistence_path=(path) + @persistence_path = path + end + + def databases + @databases ||= begin + hash = {} + @persistence_path.glob('shard-*').each do |f| + shard = File.basename(f).match(/shard-(.*)/)[1] + hash[shard] = Persist.open_database(f, false, :clean, db_type) + end + hash + end + end + def database(key) - shard = shard_function.call(key) - databases[shard] ||= begin - path = File.join(directory, 'shard-' << shard.to_s) - Persist.open_database(path, *params) - end + shard = key =~ /__tsv_/ ? "0" : shard_function.call(key) + if databases.include? shard + databases[shard] + else + database ||= begin + path = File.join(persistence_path, 'shard-' << shard.to_s) + (writable or File.exists?(path)) ? Persist.open_database(path, writable, :clean, db_type) : nil + end + if database + databases[shard] = database + else + Log.warn "Database #{ path } missing" if + nil + end + end end MAX_CHAR = 255.chr def prefix(key) @@ -67,17 +110,21 @@ def read? ! write? end def each - databases.each do |database| + databases.values.each do |database| database.each do |k,v| yield k, v end end end + def include?(key) + self[key] != nil + end + def collect res = [] each do |key, value| res << if block_given? yield key, value @@ -87,11 +134,11 @@ end res end def write_and_read - lock_filename = Persist.persistence_path(File.join(directory, 'write'), {:dir => TSV.lock_dir}) + lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir}) Misc.lock(lock_filename) do @mutex.synchronize do write if @closed or not write? res = begin yield @@ -102,11 +149,11 @@ end end end def write_and_close - lock_filename = Persist.persistence_path(File.join(directory, 'write'), {:dir => TSV.lock_dir}) + lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir}) Misc.lock(lock_filename) do @mutex.synchronize do write if @closed or not write? res = begin yield @@ -135,19 +182,19 @@ self[key] = values end end def keys - databases.values.collect{|d| d.keys }.flatten + databases.values.collect{|d| d.keys }.flatten - TSV::ENTRY_KEYS.to_a end - def []=(key, value) - database(key)[key] = value + def []=(key, value, clean = false) + database(key).send(:[]=, key, value) end - def [](key, value) - database(key)[key] + def [](key, clean=false) + v = database(key).send(:[], key) end def <<(p) return if p.nil? self[p.first] = p.last @@ -162,7 +209,22 @@ end def close databases.values.each{|database| database.close } end + end + + def self.open_sharder(path, write, serializer = nil, tokyocabinet_class = TokyoCabinet::HDB, &shard_function) + write = true unless File.exists? path + + FileUtils.mkdir_p File.dirname(path) unless File.exists?(File.dirname(path)) + + database = Persist::SharderAdapter.open(path, write, tokyocabinet_class, &shard_function) + + unless serializer == :clean + TSV.setup database + database.serializer = serializer if serializer + end + + database end end