lib/rbbt/persist/tsv/sharder.rb in rbbt-util-5.13.36 vs lib/rbbt/persist/tsv/sharder.rb in rbbt-util-5.13.37
- old
+ new
@@ -1,23 +1,23 @@
module Persist
module SharderAdapter
- def self.open(path, write, type=nil, &block)
+ def self.open(path, write, type=nil, options = {}, &block)
- database = CONNECTIONS[path] ||= Sharder.new(path, write, type, &block)
+ database = CONNECTIONS[path] ||= Sharder.new(path, write, type, options, &block)
database.extend Persist::SharderAdapter unless Persist::SharderAdapter === database
database
end
-
end
class Sharder
- attr_accessor :persistence_path, :shard_function, :databases, :closed, :writable, :mutex, :db_type
+ attr_accessor :persistence_path, :shard_function, :databases, :closed, :writable, :mutex, :db_type, :options
- def initialize(persistence_path, write = false, db_type=nil, &block)
+ def initialize(persistence_path, write = false, db_type=nil, options = {}, &block)
@shard_function = block
+ @options = options
@persistence_path = Path.setup(persistence_path)
@mutex = Mutex.new
@writable = write
@db_type = db_type
@@ -30,32 +30,44 @@
self[key] = value
end
def persistence_path=(path)
@persistence_path = path
+ databases.values.each{|db| db.persistence_path = File.join(path, File.basename(db.persistence_path))}
end
def databases
@databases ||= begin
hash = {}
@persistence_path.glob('shard-*').each do |f|
shard = File.basename(f).match(/shard-(.*)/)[1]
- hash[shard] = Persist.open_database(f, false, :clean, db_type)
+ if shard == 'metadata'
+ hash[shard] = Persist.open_database(f, false, :clean, "HDB", @options)
+ else
+ hash[shard] = Persist.open_database(f, false, :clean, db_type, @options)
+ end
end
hash
end
end
def database(key)
- shard = key =~ /__tsv_/ ? "0" : shard_function.call(key)
+ shard = key =~ /__tsv_/ ? "metadata" : shard_function.call(key)
if databases.include? shard
databases[shard]
else
- database ||= begin
+ if shard == 'metadata'
+ database ||= begin
+ path = File.join(persistence_path, 'shard-' << shard.to_s)
+ (writable or File.exists?(path)) ? Persist.open_database(path, writable, :clean, "HDB", @options) : nil
+ end
+ else
+ database ||= begin
path = File.join(persistence_path, 'shard-' << shard.to_s)
- (writable or File.exists?(path)) ? Persist.open_database(path, writable, :clean, db_type) : nil
+ (writable or File.exists?(path)) ? Persist.open_database(path, writable, :clean, db_type, @options) : nil
end
+ end
if database
databases[shard] = database
else
Log.warn "Database #{ path } missing" if
nil
@@ -82,10 +94,11 @@
@closed = true
super
end
def read(force = false)
+ raise "SIOT"
return if not write? and not closed and not force
self.close
databases.each{|d| d.read }
@writable = false
@closed = false
@@ -202,26 +215,33 @@
def write
databases.values.each{|database| database.write }
end
- def read
- databases.values.each{|database| database.read }
+ def read(force = false)
+ databases.values.each{|database| database.read(force) }
end
def close
databases.values.each{|database| database.close }
end
+
+ def size
+ databases.inject(0){|acc,i|
+ shard, db = i;
+ acc += db.size
+ }
+ end
end
- def self.open_sharder(path, write, serializer = nil, tokyocabinet_class = TokyoCabinet::HDB, &shard_function)
+ def self.open_sharder(path, write, serializer = nil, type = TokyoCabinet::HDB, options, &shard_function)
write = true unless File.exists? path
FileUtils.mkdir_p File.dirname(path) unless File.exists?(File.dirname(path))
- database = Persist::SharderAdapter.open(path, write, tokyocabinet_class, &shard_function)
+ database = Persist::SharderAdapter.open(path, write, type, options, &shard_function)
- unless serializer == :clean
+ unless serializer == :clean #or type.to_s == 'fwt'
TSV.setup database
database.serializer = serializer if serializer
end
database