lib/daybreak/db.rb in daybreak-0.2.0 vs lib/daybreak/db.rb in daybreak-0.2.1
- old
+ new
@@ -3,45 +3,40 @@
# Enumerable for functional goodies like map, each, reduce and friends.
# @api public
class DB
include Enumerable
- # Accessors for the database file, and a counter of how many records are in
- # sync with the file.
- attr_reader :file, :logsize
+ # Database file name
+ attr_reader :file
+
+ # Counter of how many records are in
+ attr_reader :logsize
+
+ # Set default value, can be a callable
attr_writer :default
- @databases = []
- @databases_mutex = Mutex.new
+ @@databases = []
+ @@databases_mutex = Mutex.new
# A handler that will ensure that databases are closed and synced when the
# current process exits.
- at_exit do
+ # @api private
+ def self.exit_handler
loop do
- db = @databases_mutex.synchronize { @databases.first }
+ db = @@databases_mutex.synchronize { @@databases.first }
break unless db
warn "Daybreak database #{db.file} was not closed, state might be inconsistent"
begin
db.close
rescue Exception => ex
warn "Failed to close daybreak database: #{ex.message}"
end
end
end
- class << self
- # @api private
- def register(db)
- @databases_mutex.synchronize { @databases << db }
- end
+ at_exit { Daybreak::DB.exit_handler }
- # @api private
- def unregister(db)
- @databases_mutex.synchronize { @databases.delete(db) }
- end
- end
-
# Create a new Daybreak::DB. The second argument is the default value
# to store when accessing a previously unset key, this follows the
# Hash standard.
# @param [String] file the path to the db file
# @param [Hash] options a hash that contains the options for creating a new
@@ -50,41 +45,33 @@
# @yieldparam [String] key the key to be stored.
def initialize(file, options = {}, &block)
@file = file
@serializer = (options[:serializer] || Serializer::Default).new
@format = (options[:format] || Format).new
- @default = block ? block : options[:default]
@queue = Queue.new
- @table = {}
+ @table = Hash.new(&method(:hash_default))
+ @default = block ? block : options[:default]
open
@mutex = Mutex.new # Mutex to make #lock thread safe
@worker = Thread.new(&method(:worker))
@worker.priority = -1
- update
- self.class.register(self)
+ load
+ @@databases_mutex.synchronize { @@databases << self }
end
# Return default value belonging to key
# @param key the default value to retrieve.
def default(key = nil)
- @default.respond_to?(:call) ? @default.call(key) : @default
+ @table.default(key)
end
# Retrieve a value at key from the database. If the default value was specified
# when this database was created, that value will be set and returned. Aliased
# as <tt>get</tt>.
# @param key the value to retrieve from the database.
def [](key)
- skey = @serializer.key_for(key)
- value = @table[skey]
- if value != nil || @table.has_key?(skey)
- value
- elsif @default
- value = default(key)
- @queue << [skey, value]
- @table[skey] = value
- end
+ @table[@serializer.key_for(key)]
end
alias_method :get, :'[]'
# Set a key in the database to be written at some future date. If the data
# needs to be persisted immediately, call <tt>db.set(key, value, true)</tt>.
@@ -120,10 +107,27 @@
value = delete(key)
flush
value
end
+ # Update database with hash (Fast batch update)
+ def update(hash)
+ shash = {}
+ hash.each do |key, value|
+ shash[@serializer.key_for(key)] = value
+ end
+ @queue << shash
+ @table.update(shash)
+ self
+ end
+
+ # Updata database and flush data to disk.
+ def update!(hash)
+ update(hash)
+ flush
+ end
+
# Does this db have a value for this key?
# @param key the key to check if the DB has a key.
def has_key?(key)
@table.has_key?(@serializer.key_for(key))
end
@@ -141,10 +145,16 @@
def size
@table.size
end
alias_method :length, :size
+ # Utility method that will return the size of the database in bytes,
+ # useful for determining when to compact
+ def bytesize
+ @fd.stat.size unless closed?
+ end
+
# Return true if database is empty.
# @return [Boolean]
def empty?
@table.empty?
end
@@ -164,28 +174,30 @@
end
# Flush all changes to disk.
def flush
@queue.flush
+ self
end
# Sync the database with what is on disk, by first flushing changes, and
# then reading the file if necessary.
def sync
flush
- update
+ load
end
# Lock the database for an exclusive commit accross processes and threads
# @yield a block where every change to the database is synced
def lock
@mutex.synchronize do
- # We need a flush before exclusive
- # so that @exclusive is not modified by the worker
+ # Flush everything to start with a clean state
+ # and to protect the @locked variable
flush
- exclusive do
- update
+
+ with_flock(File::LOCK_EX) do
+ load
result = yield
flush
result
end
end
@@ -207,106 +219,93 @@
# Compact the database to remove stale commits and reduce the file size.
def compact
sync
with_tmpfile do |path, file|
- compactsize = file.write(dump)
- exclusive do
- stat = @fd.stat
- # Check if database was compactified at the same time
- if stat.nlink > 0 && stat.ino == @inode
- # Compactified database has the same size -> return
- return self if stat.size == compactsize
+ # Compactified database has the same size -> return
+ return self if @pos == file.write(dump)
+ with_flock(File::LOCK_EX) do
+ # Database was compactified in the meantime
+ if @pos != nil
# Append changed journal records if the database changed during compactification
file.write(read)
file.close
File.rename(path, @file)
end
end
end
open
- update
- self
+ load
end
# Close the database for reading and writing.
def close
@queue << nil
@worker.join
@fd.close
@queue.stop if @queue.respond_to?(:stop)
- self.class.unregister(self)
+ @@databases_mutex.synchronize { @@databases.delete(self) }
nil
end
# Check to see if we've already closed the database.
def closed?
@fd.closed?
end
private
- # Update the @table with records read from the file, and increment @logsize
- def update
- buf = new_records
+ # The block used in @table for new entries
+ def hash_default(_, key)
+ if @default != nil
+ value = @default.respond_to?(:call) ? @default.call(key) : @default
+ @queue << [key, value]
+ @table[key] = value
+ end
+ end
+
+ # Update the @table with records
+ def load
+ buf = read
until buf.empty?
record = @format.parse(buf)
if record.size == 1
@table.delete(record.first)
else
@table[record.first] = @serializer.load(record.last)
end
@logsize += 1
end
+ self
end
- # Read new records from journal log and return buffer
- def new_records
- loop do
- unless @exclusive
- # HACK: JRuby returns false if the process is already hold by the same process
- # see https://github.com/jruby/jruby/issues/496
- Thread.pass until @fd.flock(File::LOCK_SH)
- end
- # Check if database was compactified in the meantime
- # break if not
- stat = @fd.stat
- break if stat.nlink > 0 && stat.ino == @inode
- open
- end
-
- # Read new journal records
- read
- ensure
- @fd.flock(File::LOCK_UN) unless @exclusive
- end
-
+ # Open or reopen file
def open
@fd.close if @fd
@fd = File.open(@file, 'ab+')
@fd.advise(:sequential) if @fd.respond_to? :advise
stat = @fd.stat
@inode = stat.ino
@logsize = 0
- if stat.size == 0
- @fd.write(@format.header)
- @fd.flush
- end
+ write(@format.header) if stat.size == 0
@pos = nil
end
+ # Read new file content
def read
- # File was opened
- unless @pos
- @fd.pos = 0
- @format.read_header(@fd)
- else
- @fd.pos = @pos
+ with_flock(File::LOCK_SH) do
+ # File was opened
+ unless @pos
+ @fd.pos = 0
+ @format.read_header(@fd)
+ else
+ @fd.pos = @pos
+ end
+ buf = @fd.read
+ @pos = @fd.pos
+ buf
end
- buf = @fd.read
- @pos = @fd.pos
- buf
end
# Return database dump as string
def dump
dump = @format.header
@@ -319,52 +318,72 @@
end
# Worker thread
def worker
loop do
- record = @queue.next
- write_record(record) if record
+ case record = @queue.next
+ when Hash
+ write_batch(record)
+ when nil
+ @queue.pop
+ break
+ else
+ write_record(record)
+ end
@queue.pop
- break unless record
end
rescue Exception => ex
warn "Daybreak worker: #{ex.message}"
retry
end
- # Write record to output stream and
- # advance input stream
+ # Write batch update
+ def write_batch(records)
+ dump = ''
+ records.each do |record|
+ record[1] = @serializer.dump(record.last)
+ dump << @format.dump(record)
+ end
+ write(dump)
+ @logsize += records.size
+ end
+
+ # Write single record
def write_record(record)
record[1] = @serializer.dump(record.last) if record.size > 1
- record = @format.dump(record)
- exclusive do
- @fd.write(record)
+ write(@format.dump(record))
+ @logsize += 1
+ end
+
+ # Write data to output stream and advance @pos
+ def write(dump)
+ with_flock(File::LOCK_EX) do
+ @fd.write(dump)
# Flush to make sure the file is really updated
@fd.flush
end
- @pos = @fd.pos if @pos && @fd.pos == @pos + record.bytesize
- @logsize += 1
+ @pos = @fd.pos if @pos && @fd.pos == @pos + dump.bytesize
end
- # Lock database exclusively
- def exclusive
- return yield if @exclusive
+ # Block with file lock
+ def with_flock(mode)
+ return yield if @locked
begin
loop do
# HACK: JRuby returns false if the process is already hold by the same process
# see https://github.com/jruby/jruby/issues/496
- Thread.pass until @fd.flock(File::LOCK_EX)
+ Thread.pass until @fd.flock(mode)
# Check if database was compactified in the meantime
# break if not
stat = @fd.stat
break if stat.nlink > 0 && stat.ino == @inode
open
end
- @exclusive = true
+ @locked = true
yield
ensure
@fd.flock(File::LOCK_UN)
- @exclusive = false
+ @locked = false
end
end
# Open temporary file and pass it to the block
def with_tmpfile