lib/daybreak/db.rb in daybreak-0.2.1 vs lib/daybreak/db.rb in daybreak-0.2.2
- old
+ new
@@ -3,158 +3,162 @@
# Enumerable for functional goodies like map, each, reduce and friends.
# @api public
class DB
include Enumerable
- # Database file name
- attr_reader :file
-
- # Counter of how many records are in
- attr_reader :logsize
-
# Set default value, can be a callable
attr_writer :default
- @@databases = []
- @@databases_mutex = Mutex.new
-
- # A handler that will ensure that databases are closed and synced when the
- # current process exits.
- # @api private
- def self.exit_handler
- loop do
- db = @@databases_mutex.synchronize { @@databases.first }
- break unless db
- warn "Daybreak database #{db.file} was not closed, state might be inconsistent"
- begin
- db.close
- rescue Exception => ex
- warn "Failed to close daybreak database: #{ex.message}"
- end
- end
- end
-
- at_exit { Daybreak::DB.exit_handler }
-
# Create a new Daybreak::DB. The second argument is the default value
# to store when accessing a previously unset key, this follows the
# Hash standard.
# @param [String] file the path to the db file
- # @param [Hash] options a hash that contains the options for creating a new
- # database. You can pass in :serializer, :format or :default.
+ # @param [Hash] options a hash that contains the options for creating a new database
+ # @option options [Class] :serializer Serializer class
+ # @option options [Class] :format Format class
+ # @option options [Object] :default Default value
# @yield [key] a block that will return the default value to store.
# @yieldparam [String] key the key to be stored.
def initialize(file, options = {}, &block)
- @file = file
@serializer = (options[:serializer] || Serializer::Default).new
- @format = (options[:format] || Format).new
- @queue = Queue.new
- @table = Hash.new(&method(:hash_default))
+ @table = Hash.new &method(:hash_default)
+ @journal = Journal.new(file, (options[:format] || Format).new, @serializer) do |record|
+ if !record
+ @table.clear
+ elsif record.size == 1
+ @table.delete(record.first)
+ else
+ @table[record.first] = @serializer.load(record.last)
+ end
+ end
@default = block ? block : options[:default]
- open
- @mutex = Mutex.new # Mutex to make #lock thread safe
- @worker = Thread.new(&method(:worker))
- @worker.priority = -1
- load
+ @mutex = Mutex.new # Mutex used by #synchronize and #lock
@@databases_mutex.synchronize { @@databases << self }
end
+ # Database file name
+ # @return [String] database file name
+ def file
+ @journal.file
+ end
+
# Return default value belonging to key
- # @param key the default value to retrieve.
+ # @param [Object] key the default value to retrieve.
+ # @return [Object] value the default value
def default(key = nil)
- @table.default(key)
+ @table.default(@serializer.key_for(key))
end
# Retrieve a value at key from the database. If the default value was specified
# when this database was created, that value will be set and returned. Aliased
# as <tt>get</tt>.
- # @param key the value to retrieve from the database.
+ # @param [Object] key the value to retrieve from the database.
+ # @return [Object] the value
def [](key)
@table[@serializer.key_for(key)]
end
- alias_method :get, :'[]'
+ alias_method :get, '[]'
# Set a key in the database to be written at some future date. If the data
# needs to be persisted immediately, call <tt>db.set(key, value, true)</tt>.
- # @param [#to_s] key the key of the storage slot in the database
- # @param value the value to store
+ # @param [Object] key the key of the storage slot in the database
+ # @param [Object] value the value to store
+ # @return [Object] the value
def []=(key, value)
key = @serializer.key_for(key)
- @queue << [key, value]
+ @journal << [key, value]
@table[key] = value
end
- alias_method :set, :'[]='
+ alias_method :set, '[]='
# set! flushes data immediately to disk.
- # @param key the key of the storage slot in the database
- # @param value the value to store
+ # @param [Object] key the key of the storage slot in the database
+ # @param [Object] value the value to store
+ # @return [Object] the value
def set!(key, value)
set(key, value)
flush
value
end
# Delete a key from the database
- # @param key the key of the storage slot in the database
+ # @param [Object] key the key of the storage slot in the database
+ # @return [Object] the value
def delete(key)
key = @serializer.key_for(key)
- @queue << [key]
+ @journal << [key]
@table.delete(key)
end
# Immediately delete the key on disk.
- # @param key the key of the storage slot in the database
+ # @param [Object] key the key of the storage slot in the database
+ # @return [Object] the value
def delete!(key)
value = delete(key)
flush
value
end
# Update database with hash (Fast batch update)
+ # @param [Hash] hash the key/value hash
+ # @return [DB] self
def update(hash)
shash = {}
hash.each do |key, value|
shash[@serializer.key_for(key)] = value
end
- @queue << shash
+ @journal << shash
@table.update(shash)
self
end
# Updata database and flush data to disk.
+ # @param [Hash] hash the key/value hash
+ # @return [DB] self
def update!(hash)
update(hash)
- flush
+ @journal.flush
end
- # Does this db have a value for this key?
- # @param key the key to check if the DB has a key.
+ # Does this db have this key?
+ # @param [Object] key the key to check if the DB has it
+ # @return [Boolean]
def has_key?(key)
@table.has_key?(@serializer.key_for(key))
end
alias_method :key?, :has_key?
alias_method :include?, :has_key?
alias_method :member?, :has_key?
+ # Does this db have this value?
+ # @param [Object] value the value to check if the DB has it
+ # @return [Boolean]
def has_value?(value)
@table.has_value?(value)
end
alias_method :value?, :has_value?
# Return the number of stored items.
- # @return [Integer]
+ # @return [Fixnum]
def size
@table.size
end
alias_method :length, :size
# Utility method that will return the size of the database in bytes,
# useful for determining when to compact
+ # @return [Fixnum]
def bytesize
- @fd.stat.size unless closed?
+ @journal.bytesize
end
+ # Counter of how many records are in the journal
+ # @return [Fixnum]
+ def logsize
+ @journal.size
+ end
+
# Return true if database is empty.
# @return [Boolean]
def empty?
@table.empty?
end
@@ -166,234 +170,113 @@
def each(&block)
@table.each(&block)
end
# Return the keys in the db.
- # @return [Array]
+ # @return [Array<String>]
def keys
@table.keys
end
# Flush all changes to disk.
+ # @return [DB] self
def flush
- @queue.flush
+ @journal.flush
self
end
# Sync the database with what is on disk, by first flushing changes, and
- # then reading the file if necessary.
- def sync
- flush
- load
+ # then loading the new records if necessary.
+ # @return [DB] self
+ def load
+ @journal.load
+ self
end
+ alias_method :sunrise, :load
- # Lock the database for an exclusive commit accross processes and threads
+ # Lock the database for an exclusive commit across processes and threads
+ # @note This method performs an expensive locking over process boundaries.
+ # If you want to synchronize only between threads, use #synchronize.
+ # @see #synchronize
# @yield a block where every change to the database is synced
+ # @yieldparam [DB] db
+ # @return result of the block
def lock
- @mutex.synchronize do
- # Flush everything to start with a clean state
- # and to protect the @locked variable
- flush
+ @mutex.synchronize { @journal.lock { yield self } }
+ end
- with_flock(File::LOCK_EX) do
- load
- result = yield
- flush
- result
- end
- end
+ # Synchronize access to the database from multiple threads
+ # @note Daybreak is not thread safe, if you want to access it from
+ # multiple threads, all accesses have to be in the #synchronize block.
+ # @see #lock
+ # @yield a block where every change to the database is synced
+ # @yieldparam [DB] db
+ # @return result of the block
+ def synchronize
+ @mutex.synchronize { yield self }
end
# Remove all keys and values from the database.
+ # @return [DB] self
def clear
- flush
- with_tmpfile do |path, file|
- file.write(@format.header)
- file.close
- # Clear acts like a compactification
- File.rename(path, @file)
- end
@table.clear
- open
+ @journal.clear
self
end
# Compact the database to remove stale commits and reduce the file size.
+ # @return [DB] self
def compact
- sync
- with_tmpfile do |path, file|
- # Compactified database has the same size -> return
- return self if @pos == file.write(dump)
- with_flock(File::LOCK_EX) do
- # Database was compactified in the meantime
- if @pos != nil
- # Append changed journal records if the database changed during compactification
- file.write(read)
- file.close
- File.rename(path, @file)
- end
- end
- end
- open
- load
+ @journal.compact { @table }
+ self
end
# Close the database for reading and writing.
+ # @return nil
def close
- @queue << nil
- @worker.join
- @fd.close
- @queue.stop if @queue.respond_to?(:stop)
+ @journal.close
@@databases_mutex.synchronize { @@databases.delete(self) }
nil
end
# Check to see if we've already closed the database.
+ # @return [Boolean]
def closed?
- @fd.closed?
+ @journal.closed?
end
private
- # The block used in @table for new entries
- def hash_default(_, key)
- if @default != nil
- value = @default.respond_to?(:call) ? @default.call(key) : @default
- @queue << [key, value]
- @table[key] = value
- end
- end
+ # @private
+ @@databases = []
- # Update the @table with records
- def load
- buf = read
- until buf.empty?
- record = @format.parse(buf)
- if record.size == 1
- @table.delete(record.first)
- else
- @table[record.first] = @serializer.load(record.last)
- end
- @logsize += 1
- end
- self
- end
+ # @private
+ @@databases_mutex = Mutex.new
- # Open or reopen file
- def open
- @fd.close if @fd
- @fd = File.open(@file, 'ab+')
- @fd.advise(:sequential) if @fd.respond_to? :advise
- stat = @fd.stat
- @inode = stat.ino
- @logsize = 0
- write(@format.header) if stat.size == 0
- @pos = nil
- end
-
- # Read new file content
- def read
- with_flock(File::LOCK_SH) do
- # File was opened
- unless @pos
- @fd.pos = 0
- @format.read_header(@fd)
- else
- @fd.pos = @pos
- end
- buf = @fd.read
- @pos = @fd.pos
- buf
- end
- end
-
- # Return database dump as string
- def dump
- dump = @format.header
- # each is faster than inject
- @table.each do |record|
- record[1] = @serializer.dump(record.last)
- dump << @format.dump(record)
- end
- dump
- end
-
- # Worker thread
- def worker
+ # A handler that will ensure that databases are closed and synced when the
+ # current process exits.
+ # @private
+ def self.exit_handler
loop do
- case record = @queue.next
- when Hash
- write_batch(record)
- when nil
- @queue.pop
- break
- else
- write_record(record)
+ db = @@databases_mutex.synchronize { @@databases.first }
+ break unless db
+ warn "Daybreak database #{db.file} was not closed, state might be inconsistent"
+ begin
+ db.close
+ rescue Exception => ex
+ warn "Failed to close daybreak database: #{ex.message}"
end
- @queue.pop
end
- rescue Exception => ex
- warn "Daybreak worker: #{ex.message}"
- retry
end
- # Write batch update
- def write_batch(records)
- dump = ''
- records.each do |record|
- record[1] = @serializer.dump(record.last)
- dump << @format.dump(record)
- end
- write(dump)
- @logsize += records.size
- end
+ at_exit { Daybreak::DB.exit_handler }
- # Write single record
- def write_record(record)
- record[1] = @serializer.dump(record.last) if record.size > 1
- write(@format.dump(record))
- @logsize += 1
- end
-
- # Write data to output stream and advance @pos
- def write(dump)
- with_flock(File::LOCK_EX) do
- @fd.write(dump)
- # Flush to make sure the file is really updated
- @fd.flush
+ # The block used in @table for new records
+ def hash_default(_, key)
+ if @default != nil
+ value = @default.respond_to?(:call) ? @default.call(key) : @default
+ @journal << [key, value]
+ @table[key] = value
end
- @pos = @fd.pos if @pos && @fd.pos == @pos + dump.bytesize
- end
-
- # Block with file lock
- def with_flock(mode)
- return yield if @locked
- begin
- loop do
- # HACK: JRuby returns false if the process is already hold by the same process
- # see https://github.com/jruby/jruby/issues/496
- Thread.pass until @fd.flock(mode)
- # Check if database was compactified in the meantime
- # break if not
- stat = @fd.stat
- break if stat.nlink > 0 && stat.ino == @inode
- open
- end
- @locked = true
- yield
- ensure
- @fd.flock(File::LOCK_UN)
- @locked = false
- end
- end
-
- # Open temporary file and pass it to the block
- def with_tmpfile
- path = [@file, $$.to_s(36), Thread.current.object_id.to_s(36)].join
- file = File.open(path, 'wb')
- yield(path, file)
- ensure
- file.close unless file.closed?
- File.unlink(path) if File.exists?(path)
end
end
end