lib/daybreak/db.rb in daybreak-0.2.1 vs lib/daybreak/db.rb in daybreak-0.2.2

- old
+ new

@@ -3,158 +3,162 @@ # Enumerable for functional goodies like map, each, reduce and friends. # @api public class DB include Enumerable - # Database file name - attr_reader :file - - # Counter of how many records are in - attr_reader :logsize - # Set default value, can be a callable attr_writer :default - @@databases = [] - @@databases_mutex = Mutex.new - - # A handler that will ensure that databases are closed and synced when the - # current process exits. - # @api private - def self.exit_handler - loop do - db = @@databases_mutex.synchronize { @@databases.first } - break unless db - warn "Daybreak database #{db.file} was not closed, state might be inconsistent" - begin - db.close - rescue Exception => ex - warn "Failed to close daybreak database: #{ex.message}" - end - end - end - - at_exit { Daybreak::DB.exit_handler } - # Create a new Daybreak::DB. The second argument is the default value # to store when accessing a previously unset key, this follows the # Hash standard. # @param [String] file the path to the db file - # @param [Hash] options a hash that contains the options for creating a new - # database. You can pass in :serializer, :format or :default. + # @param [Hash] options a hash that contains the options for creating a new database + # @option options [Class] :serializer Serializer class + # @option options [Class] :format Format class + # @option options [Object] :default Default value # @yield [key] a block that will return the default value to store. # @yieldparam [String] key the key to be stored. def initialize(file, options = {}, &block) - @file = file @serializer = (options[:serializer] || Serializer::Default).new - @format = (options[:format] || Format).new - @queue = Queue.new - @table = Hash.new(&method(:hash_default)) + @table = Hash.new &method(:hash_default) + @journal = Journal.new(file, (options[:format] || Format).new, @serializer) do |record| + if !record + @table.clear + elsif record.size == 1 + @table.delete(record.first) + else + @table[record.first] = @serializer.load(record.last) + end + end @default = block ? block : options[:default] - open - @mutex = Mutex.new # Mutex to make #lock thread safe - @worker = Thread.new(&method(:worker)) - @worker.priority = -1 - load + @mutex = Mutex.new # Mutex used by #synchronize and #lock @@databases_mutex.synchronize { @@databases << self } end + # Database file name + # @return [String] database file name + def file + @journal.file + end + # Return default value belonging to key - # @param key the default value to retrieve. + # @param [Object] key the default value to retrieve. + # @return [Object] value the default value def default(key = nil) - @table.default(key) + @table.default(@serializer.key_for(key)) end # Retrieve a value at key from the database. If the default value was specified # when this database was created, that value will be set and returned. Aliased # as <tt>get</tt>. - # @param key the value to retrieve from the database. + # @param [Object] key the value to retrieve from the database. + # @return [Object] the value def [](key) @table[@serializer.key_for(key)] end - alias_method :get, :'[]' + alias_method :get, '[]' # Set a key in the database to be written at some future date. If the data # needs to be persisted immediately, call <tt>db.set(key, value, true)</tt>. - # @param [#to_s] key the key of the storage slot in the database - # @param value the value to store + # @param [Object] key the key of the storage slot in the database + # @param [Object] value the value to store + # @return [Object] the value def []=(key, value) key = @serializer.key_for(key) - @queue << [key, value] + @journal << [key, value] @table[key] = value end - alias_method :set, :'[]=' + alias_method :set, '[]=' # set! flushes data immediately to disk. - # @param key the key of the storage slot in the database - # @param value the value to store + # @param [Object] key the key of the storage slot in the database + # @param [Object] value the value to store + # @return [Object] the value def set!(key, value) set(key, value) flush value end # Delete a key from the database - # @param key the key of the storage slot in the database + # @param [Object] key the key of the storage slot in the database + # @return [Object] the value def delete(key) key = @serializer.key_for(key) - @queue << [key] + @journal << [key] @table.delete(key) end # Immediately delete the key on disk. - # @param key the key of the storage slot in the database + # @param [Object] key the key of the storage slot in the database + # @return [Object] the value def delete!(key) value = delete(key) flush value end # Update database with hash (Fast batch update) + # @param [Hash] hash the key/value hash + # @return [DB] self def update(hash) shash = {} hash.each do |key, value| shash[@serializer.key_for(key)] = value end - @queue << shash + @journal << shash @table.update(shash) self end # Updata database and flush data to disk. + # @param [Hash] hash the key/value hash + # @return [DB] self def update!(hash) update(hash) - flush + @journal.flush end - # Does this db have a value for this key? - # @param key the key to check if the DB has a key. + # Does this db have this key? + # @param [Object] key the key to check if the DB has it + # @return [Boolean] def has_key?(key) @table.has_key?(@serializer.key_for(key)) end alias_method :key?, :has_key? alias_method :include?, :has_key? alias_method :member?, :has_key? + # Does this db have this value? + # @param [Object] value the value to check if the DB has it + # @return [Boolean] def has_value?(value) @table.has_value?(value) end alias_method :value?, :has_value? # Return the number of stored items. - # @return [Integer] + # @return [Fixnum] def size @table.size end alias_method :length, :size # Utility method that will return the size of the database in bytes, # useful for determining when to compact + # @return [Fixnum] def bytesize - @fd.stat.size unless closed? + @journal.bytesize end + # Counter of how many records are in the journal + # @return [Fixnum] + def logsize + @journal.size + end + # Return true if database is empty. # @return [Boolean] def empty? @table.empty? end @@ -166,234 +170,113 @@ def each(&block) @table.each(&block) end # Return the keys in the db. - # @return [Array] + # @return [Array<String>] def keys @table.keys end # Flush all changes to disk. + # @return [DB] self def flush - @queue.flush + @journal.flush self end # Sync the database with what is on disk, by first flushing changes, and - # then reading the file if necessary. - def sync - flush - load + # then loading the new records if necessary. + # @return [DB] self + def load + @journal.load + self end + alias_method :sunrise, :load - # Lock the database for an exclusive commit accross processes and threads + # Lock the database for an exclusive commit across processes and threads + # @note This method performs an expensive locking over process boundaries. + # If you want to synchronize only between threads, use #synchronize. + # @see #synchronize # @yield a block where every change to the database is synced + # @yieldparam [DB] db + # @return result of the block def lock - @mutex.synchronize do - # Flush everything to start with a clean state - # and to protect the @locked variable - flush + @mutex.synchronize { @journal.lock { yield self } } + end - with_flock(File::LOCK_EX) do - load - result = yield - flush - result - end - end + # Synchronize access to the database from multiple threads + # @note Daybreak is not thread safe, if you want to access it from + # multiple threads, all accesses have to be in the #synchronize block. + # @see #lock + # @yield a block where every change to the database is synced + # @yieldparam [DB] db + # @return result of the block + def synchronize + @mutex.synchronize { yield self } end # Remove all keys and values from the database. + # @return [DB] self def clear - flush - with_tmpfile do |path, file| - file.write(@format.header) - file.close - # Clear acts like a compactification - File.rename(path, @file) - end @table.clear - open + @journal.clear self end # Compact the database to remove stale commits and reduce the file size. + # @return [DB] self def compact - sync - with_tmpfile do |path, file| - # Compactified database has the same size -> return - return self if @pos == file.write(dump) - with_flock(File::LOCK_EX) do - # Database was compactified in the meantime - if @pos != nil - # Append changed journal records if the database changed during compactification - file.write(read) - file.close - File.rename(path, @file) - end - end - end - open - load + @journal.compact { @table } + self end # Close the database for reading and writing. + # @return nil def close - @queue << nil - @worker.join - @fd.close - @queue.stop if @queue.respond_to?(:stop) + @journal.close @@databases_mutex.synchronize { @@databases.delete(self) } nil end # Check to see if we've already closed the database. + # @return [Boolean] def closed? - @fd.closed? + @journal.closed? end private - # The block used in @table for new entries - def hash_default(_, key) - if @default != nil - value = @default.respond_to?(:call) ? @default.call(key) : @default - @queue << [key, value] - @table[key] = value - end - end + # @private + @@databases = [] - # Update the @table with records - def load - buf = read - until buf.empty? - record = @format.parse(buf) - if record.size == 1 - @table.delete(record.first) - else - @table[record.first] = @serializer.load(record.last) - end - @logsize += 1 - end - self - end + # @private + @@databases_mutex = Mutex.new - # Open or reopen file - def open - @fd.close if @fd - @fd = File.open(@file, 'ab+') - @fd.advise(:sequential) if @fd.respond_to? :advise - stat = @fd.stat - @inode = stat.ino - @logsize = 0 - write(@format.header) if stat.size == 0 - @pos = nil - end - - # Read new file content - def read - with_flock(File::LOCK_SH) do - # File was opened - unless @pos - @fd.pos = 0 - @format.read_header(@fd) - else - @fd.pos = @pos - end - buf = @fd.read - @pos = @fd.pos - buf - end - end - - # Return database dump as string - def dump - dump = @format.header - # each is faster than inject - @table.each do |record| - record[1] = @serializer.dump(record.last) - dump << @format.dump(record) - end - dump - end - - # Worker thread - def worker + # A handler that will ensure that databases are closed and synced when the + # current process exits. + # @private + def self.exit_handler loop do - case record = @queue.next - when Hash - write_batch(record) - when nil - @queue.pop - break - else - write_record(record) + db = @@databases_mutex.synchronize { @@databases.first } + break unless db + warn "Daybreak database #{db.file} was not closed, state might be inconsistent" + begin + db.close + rescue Exception => ex + warn "Failed to close daybreak database: #{ex.message}" end - @queue.pop end - rescue Exception => ex - warn "Daybreak worker: #{ex.message}" - retry end - # Write batch update - def write_batch(records) - dump = '' - records.each do |record| - record[1] = @serializer.dump(record.last) - dump << @format.dump(record) - end - write(dump) - @logsize += records.size - end + at_exit { Daybreak::DB.exit_handler } - # Write single record - def write_record(record) - record[1] = @serializer.dump(record.last) if record.size > 1 - write(@format.dump(record)) - @logsize += 1 - end - - # Write data to output stream and advance @pos - def write(dump) - with_flock(File::LOCK_EX) do - @fd.write(dump) - # Flush to make sure the file is really updated - @fd.flush + # The block used in @table for new records + def hash_default(_, key) + if @default != nil + value = @default.respond_to?(:call) ? @default.call(key) : @default + @journal << [key, value] + @table[key] = value end - @pos = @fd.pos if @pos && @fd.pos == @pos + dump.bytesize - end - - # Block with file lock - def with_flock(mode) - return yield if @locked - begin - loop do - # HACK: JRuby returns false if the process is already hold by the same process - # see https://github.com/jruby/jruby/issues/496 - Thread.pass until @fd.flock(mode) - # Check if database was compactified in the meantime - # break if not - stat = @fd.stat - break if stat.nlink > 0 && stat.ino == @inode - open - end - @locked = true - yield - ensure - @fd.flock(File::LOCK_UN) - @locked = false - end - end - - # Open temporary file and pass it to the block - def with_tmpfile - path = [@file, $$.to_s(36), Thread.current.object_id.to_s(36)].join - file = File.open(path, 'wb') - yield(path, file) - ensure - file.close unless file.closed? - File.unlink(path) if File.exists?(path) end end end