lib/rbbt/persist/tsv/sharder.rb in rbbt-util-5.13.23 vs lib/rbbt/persist/tsv/sharder.rb in rbbt-util-5.13.24
- old
+ new
@@ -1,25 +1,68 @@
-require 'rbbt-util'
-
module Persist
+ module SharderAdapter
+ def self.open(path, write, type=nil, &block)
+
+ database = CONNECTIONS[path] ||= Sharder.new(path, write, type, &block)
+
+ database.extend Persist::SharderAdapter unless Persist::SharderAdapter === database
+
+ database
+ end
+
+ end
+
class Sharder
- attr_accessor :directory, :params, :shard_function, :databases, :closed, :writable, :mutex
+ attr_accessor :persistence_path, :shard_function, :databases, :closed, :writable, :mutex, :db_type
- def initialize(directory, *rest, &block)
+ def initialize(persistence_path, write = false, db_type=nil, &block)
@shard_function = block
- @params = rest
- @databases = {}
- @directory = directory
+ @persistence_path = Path.setup(persistence_path)
@mutex = Mutex.new
+ @writable = write
+ @db_type = db_type
+
+ if write
+ @databases = {}
+ end
end
+ def <<(key,value)
+ self[key] = value
+ end
+
+ def persistence_path=(path)
+ @persistence_path = path
+ end
+
+ def databases
+ @databases ||= begin
+ hash = {}
+ @persistence_path.glob('shard-*').each do |f|
+ shard = File.basename(f).match(/shard-(.*)/)[1]
+ hash[shard] = Persist.open_database(f, false, :clean, db_type)
+ end
+ hash
+ end
+ end
+
def database(key)
- shard = shard_function.call(key)
- databases[shard] ||= begin
- path = File.join(directory, 'shard-' << shard.to_s)
- Persist.open_database(path, *params)
- end
+ shard = key =~ /__tsv_/ ? "0" : shard_function.call(key)
+ if databases.include? shard
+ databases[shard]
+ else
+ database ||= begin
+ path = File.join(persistence_path, 'shard-' << shard.to_s)
+ (writable or File.exists?(path)) ? Persist.open_database(path, writable, :clean, db_type) : nil
+ end
+ if database
+ databases[shard] = database
+ else
+ Log.warn "Database #{ path } missing" if
+ nil
+ end
+ end
end
MAX_CHAR = 255.chr
def prefix(key)
@@ -67,17 +110,21 @@
def read?
! write?
end
def each
- databases.each do |database|
+ databases.values.each do |database|
database.each do |k,v|
yield k, v
end
end
end
+ def include?(key)
+ self[key] != nil
+ end
+
def collect
res = []
each do |key, value|
res << if block_given?
yield key, value
@@ -87,11 +134,11 @@
end
res
end
def write_and_read
- lock_filename = Persist.persistence_path(File.join(directory, 'write'), {:dir => TSV.lock_dir})
+ lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir})
Misc.lock(lock_filename) do
@mutex.synchronize do
write if @closed or not write?
res = begin
yield
@@ -102,11 +149,11 @@
end
end
end
def write_and_close
- lock_filename = Persist.persistence_path(File.join(directory, 'write'), {:dir => TSV.lock_dir})
+ lock_filename = Persist.persistence_path(File.join(persistence_path, 'write'), {:dir => TSV.lock_dir})
Misc.lock(lock_filename) do
@mutex.synchronize do
write if @closed or not write?
res = begin
yield
@@ -135,19 +182,19 @@
self[key] = values
end
end
def keys
- databases.values.collect{|d| d.keys }.flatten
+ databases.values.collect{|d| d.keys }.flatten - TSV::ENTRY_KEYS.to_a
end
- def []=(key, value)
- database(key)[key] = value
+ def []=(key, value, clean = false)
+ database(key).send(:[]=, key, value)
end
- def [](key, value)
- database(key)[key]
+ def [](key, clean=false)
+ v = database(key).send(:[], key)
end
def <<(p)
return if p.nil?
self[p.first] = p.last
@@ -162,7 +209,22 @@
end
def close
databases.values.each{|database| database.close }
end
+ end
+
+ def self.open_sharder(path, write, serializer = nil, tokyocabinet_class = TokyoCabinet::HDB, &shard_function)
+ write = true unless File.exists? path
+
+ FileUtils.mkdir_p File.dirname(path) unless File.exists?(File.dirname(path))
+
+ database = Persist::SharderAdapter.open(path, write, tokyocabinet_class, &shard_function)
+
+ unless serializer == :clean
+ TSV.setup database
+ database.serializer = serializer if serializer
+ end
+
+ database
end
end