lib/daybreak/db.rb in daybreak-0.1.3 vs lib/daybreak/db.rb in daybreak-0.2.0
- old
+ new
@@ -1,186 +1,380 @@
module Daybreak
- # Daybreak::DB contains the public api for Daybreak, you may extend it like
- # any other Ruby class (i.e. to overwrite serialize and parse). It includes
+ # Daybreak::DB contains the public api for Daybreak. It includes
# Enumerable for functional goodies like map, each, reduce and friends.
+ # @api public
class DB
include Enumerable
+ # Accessors for the database file, and a counter of how many records are in
+ # sync with the file.
+ attr_reader :file, :logsize
+ attr_writer :default
+
+ @databases = []
+ @databases_mutex = Mutex.new
+
+ # A handler that will ensure that databases are closed and synced when the
+ # current process exits.
+ at_exit do
+ loop do
+ db = @databases_mutex.synchronize { @databases.first }
+ break unless db
+ warn "Daybreak database #{db.file} was not closed, state might be inconsistent"
+ begin
+ db.close
+ rescue Exception => ex
+ warn "Failed to close daybreak database: #{ex.message}"
+ end
+ end
+ end
+
+ class << self
+ # @api private
+ def register(db)
+ @databases_mutex.synchronize { @databases << db }
+ end
+
+ # @api private
+ def unregister(db)
+ @databases_mutex.synchronize { @databases.delete(db) }
+ end
+ end
+
# Create a new Daybreak::DB. The second argument is the default value
# to store when accessing a previously unset key, this follows the
# Hash standard.
# @param [String] file the path to the db file
- # @param default the default value to store and return when a key is
- # not yet in the database.
+ # @param [Hash] options a hash that contains the options for creating a new
+ # database. You can pass in :serializer, :format or :default.
# @yield [key] a block that will return the default value to store.
# @yieldparam [String] key the key to be stored.
- def initialize(file, default=nil, &blk)
- @table = {}
- @file_name = file
- @writer = Writer.new(@file_name)
- @default = block_given? ? blk : default
- read!
+ def initialize(file, options = {}, &block)
+ @file = file
+ @serializer = (options[:serializer] || Serializer::Default).new
+ @format = (options[:format] || Format).new
+ @default = block ? block : options[:default]
+ @queue = Queue.new
+ @table = {}
+ open
+ @mutex = Mutex.new # Mutex to make #lock thread safe
+ @worker = Thread.new(&method(:worker))
+ @worker.priority = -1
+ update
+ self.class.register(self)
end
+ # Return default value belonging to key
+ # @param key the default value to retrieve.
+ def default(key = nil)
+ @default.respond_to?(:call) ? @default.call(key) : @default
+ end
+
+ # Retrieve a value at key from the database. If the default value was specified
+ # when this database was created, that value will be set and returned. Aliased
+ # as <tt>get</tt>.
+ # @param key the value to retrieve from the database.
+ def [](key)
+ skey = @serializer.key_for(key)
+ value = @table[skey]
+ if value != nil || @table.has_key?(skey)
+ value
+ elsif @default
+ value = default(key)
+ @queue << [skey, value]
+ @table[skey] = value
+ end
+ end
+ alias_method :get, :'[]'
+
# Set a key in the database to be written at some future date. If the data
# needs to be persisted immediately, call <tt>db.set(key, value, true)</tt>.
# @param [#to_s] key the key of the storage slot in the database
# @param value the value to store
- # @param [Boolean] sync if true, sync this value immediately
- def []=(key, value, sync = false)
- key = key.to_s
- write key, value, sync
+ def []=(key, value)
+ key = @serializer.key_for(key)
+ @queue << [key, value]
@table[key] = value
end
- alias_method :set, :"[]="
+ alias_method :set, :'[]='
# set! flushes data immediately to disk.
- # @param [#to_s] key the key of the storage slot in the database
+ # @param key the key of the storage slot in the database
# @param value the value to store
def set!(key, value)
- set key, value, true
+ set(key, value)
+ flush
+ value
end
# Delete a key from the database
- # @param [#to_s] key the key of the storage slot in the database
- # @param [Boolean] sync if true, sync this deletion immediately
- def delete(key, sync = false)
- key = key.to_s
- write key, '', sync, true
- @table.delete key
+ # @param key the key of the storage slot in the database
+ def delete(key)
+ key = @serializer.key_for(key)
+ @queue << [key]
+ @table.delete(key)
end
- # delete! immediately deletes the key on disk.
- # @param [#to_s] key the key of the storage slot in the database
+ # Immediately delete the key on disk.
+ # @param key the key of the storage slot in the database
def delete!(key)
- delete key, true
+ value = delete(key)
+ flush
+ value
end
- # Retrieve a value at key from the database. If the default value was specified
- # when this database was created, that value will be set and returned. Aliased
- # as <tt>get</tt>.
- # @param [#to_s] key the value to retrieve from the database.
- def [](key)
- key = key.to_s
- if @table.has_key? key
- @table[key]
- elsif default?
- set key, Proc === @default ? @default.call(key) : @default
- end
+ # Does this db have a value for this key?
+ # @param key the key to check if the DB has a key.
+ def has_key?(key)
+ @table.has_key?(@serializer.key_for(key))
end
- alias_method :get, :"[]"
+ alias_method :key?, :has_key?
+ alias_method :include?, :has_key?
+ alias_method :member?, :has_key?
- # Iterate over the key, value pairs in the database.
- # @yield [key, value] blk the iterator for each key value pair.
- # @yieldparam [String] key the key.
- # @yieldparam value the value from the database.
- def each
- keys.each { |k| yield(k, get(k)) }
+ def has_value?(value)
+ @table.has_value?(value)
end
+ alias_method :value?, :has_value?
- # Does this db have a default value.
- def default?
- !@default.nil?
+ # Return the number of stored items.
+ # @return [Integer]
+ def size
+ @table.size
end
+ alias_method :length, :size
- # Does this db have a value for this key?
- # @param [key#to_s] key the key to check if the DB has a key.
- def has_key?(key)
- @table.has_key? key.to_s
+ # Return true if database is empty.
+ # @return [Boolean]
+ def empty?
+ @table.empty?
end
+ # Iterate over the key, value pairs in the database.
+ # @yield [key, value] blk the iterator for each key value pair.
+ # @yieldparam key the key.
+ # @yieldparam value the value from the database.
+ def each(&block)
+ @table.each(&block)
+ end
+
# Return the keys in the db.
- # @return [Array<String>]
+ # @return [Array]
def keys
@table.keys
end
- # Return the number of stored items.
- # @return [Integer]
- def length
- @table.keys.length
+ # Flush all changes to disk.
+ def flush
+ @queue.flush
end
- alias_method :size, :length
- # Serialize the data for writing to disk, if you don't want to use <tt>Marshal</tt>
- # overwrite this method.
- # @param value the value to be serialized
- # @return [String]
- def serialize(value)
- Marshal.dump(value)
+ # Sync the database with what is on disk, by first flushing changes, and
+ # then reading the file if necessary.
+ def sync
+ flush
+ update
end
- # Parse the serialized value from disk, like serialize if you want to use a
- # different serialization method overwrite this method.
- # @param value the value to be parsed
- # @return [String]
- def parse(value)
- Marshal.load(value)
+ # Lock the database for an exclusive commit accross processes and threads
+ # @yield a block where every change to the database is synced
+ def lock
+ @mutex.synchronize do
+ # We need a flush before exclusive
+ # so that @exclusive is not modified by the worker
+ flush
+ exclusive do
+ update
+ result = yield
+ flush
+ result
+ end
+ end
end
- # Empty the database file.
- def empty!
- @writer.truncate!
+ # Remove all keys and values from the database.
+ def clear
+ flush
+ with_tmpfile do |path, file|
+ file.write(@format.header)
+ file.close
+ # Clear acts like a compactification
+ File.rename(path, @file)
+ end
@table.clear
- read!
+ open
+ self
end
- alias_method :clear, :empty!
- # Force all queued commits to be written to disk.
- def flush!
- @writer.flush!
+ # Compact the database to remove stale commits and reduce the file size.
+ def compact
+ sync
+ with_tmpfile do |path, file|
+ compactsize = file.write(dump)
+ exclusive do
+ stat = @fd.stat
+ # Check if database was compactified at the same time
+ if stat.nlink > 0 && stat.ino == @inode
+ # Compactified database has the same size -> return
+ return self if stat.size == compactsize
+ # Append changed journal records if the database changed during compactification
+ file.write(read)
+ file.close
+ File.rename(path, @file)
+ end
+ end
+ end
+ open
+ update
+ self
end
# Close the database for reading and writing.
- def close!
- @writer.close!
+ def close
+ @queue << nil
+ @worker.join
+ @fd.close
+ @queue.stop if @queue.respond_to?(:stop)
+ self.class.unregister(self)
+ nil
end
- # Compact the database to remove stale commits and reduce the file size.
- def compact!
- # Create a new temporary database
- tmp_file = @file_name + "-#{$$}-#{Thread.current.object_id}"
- copy_db = self.class.new tmp_file
+ # Check to see if we've already closed the database.
+ def closed?
+ @fd.closed?
+ end
- # Copy the database key by key into the temporary table
- each do |key, value|
- copy_db.set(key, get(key))
+ private
+
+ # Update the @table with records read from the file, and increment @logsize
+ def update
+ buf = new_records
+ until buf.empty?
+ record = @format.parse(buf)
+ if record.size == 1
+ @table.delete(record.first)
+ else
+ @table[record.first] = @serializer.load(record.last)
+ end
+ @logsize += 1
end
- copy_db.close!
+ end
- close!
+ # Read new records from journal log and return buffer
+ def new_records
+ loop do
+ unless @exclusive
+ # HACK: JRuby returns false if the process is already hold by the same process
+ # see https://github.com/jruby/jruby/issues/496
+ Thread.pass until @fd.flock(File::LOCK_SH)
+ end
+ # Check if database was compactified in the meantime
+ # break if not
+ stat = @fd.stat
+ break if stat.nlink > 0 && stat.ino == @inode
+ open
+ end
- # Move the copy into place
- File.rename tmp_file, @file_name
+ # Read new journal records
+ read
+ ensure
+ @fd.flock(File::LOCK_UN) unless @exclusive
+ end
- # Reopen this database
- @writer = Writer.new(@file_name)
- @table.clear
- read!
+ def open
+ @fd.close if @fd
+ @fd = File.open(@file, 'ab+')
+ @fd.advise(:sequential) if @fd.respond_to? :advise
+ stat = @fd.stat
+ @inode = stat.ino
+ @logsize = 0
+ if stat.size == 0
+ @fd.write(@format.header)
+ @fd.flush
+ end
+ @pos = nil
end
- # Read all values from the log file. If you want to check for changed data
- # call this again.
- def read!
- buf = nil
- File.open(@file_name, 'rb') do |fd|
- fd.flock(File::LOCK_SH)
- buf = fd.read
+ def read
+ # File was opened
+ unless @pos
+ @fd.pos = 0
+ @format.read_header(@fd)
+ else
+ @fd.pos = @pos
end
- until buf.empty?
- key, data, deleted = Record.deserialize(buf)
- if deleted
- @table.delete key
- else
- @table[key] = parse(data)
- end
+ buf = @fd.read
+ @pos = @fd.pos
+ buf
+ end
+
+ # Return database dump as string
+ def dump
+ dump = @format.header
+ # each is faster than inject
+ @table.each do |record|
+ record[1] = @serializer.dump(record.last)
+ dump << @format.dump(record)
end
+ dump
end
- private
+ # Worker thread
+ def worker
+ loop do
+ record = @queue.next
+ write_record(record) if record
+ @queue.pop
+ break unless record
+ end
+ rescue Exception => ex
+ warn "Daybreak worker: #{ex.message}"
+ retry
+ end
- def write(key, value, sync = false, delete = false)
- @writer.write([key, serialize(value), delete])
- flush! if sync
+ # Write record to output stream and
+ # advance input stream
+ def write_record(record)
+ record[1] = @serializer.dump(record.last) if record.size > 1
+ record = @format.dump(record)
+ exclusive do
+ @fd.write(record)
+ # Flush to make sure the file is really updated
+ @fd.flush
+ end
+ @pos = @fd.pos if @pos && @fd.pos == @pos + record.bytesize
+ @logsize += 1
+ end
+
+ # Lock database exclusively
+ def exclusive
+ return yield if @exclusive
+ begin
+ loop do
+ # HACK: JRuby returns false if the process is already hold by the same process
+ # see https://github.com/jruby/jruby/issues/496
+ Thread.pass until @fd.flock(File::LOCK_EX)
+ # Check if database was compactified in the meantime
+ # break if not
+ stat = @fd.stat
+ break if stat.nlink > 0 && stat.ino == @inode
+ open
+ end
+ @exclusive = true
+ yield
+ ensure
+ @fd.flock(File::LOCK_UN)
+ @exclusive = false
+ end
+ end
+
+ # Open temporary file and pass it to the block
+ def with_tmpfile
+ path = [@file, $$.to_s(36), Thread.current.object_id.to_s(36)].join
+ file = File.open(path, 'wb')
+ yield(path, file)
+ ensure
+ file.close unless file.closed?
+ File.unlink(path) if File.exists?(path)
end
end
end