lib/daybreak/db.rb in daybreak-0.1.3 vs lib/daybreak/db.rb in daybreak-0.2.0

- old
+ new

@@ -1,186 +1,380 @@ module Daybreak - # Daybreak::DB contains the public api for Daybreak, you may extend it like - # any other Ruby class (i.e. to overwrite serialize and parse). It includes + # Daybreak::DB contains the public api for Daybreak. It includes # Enumerable for functional goodies like map, each, reduce and friends. + # @api public class DB include Enumerable + # Accessors for the database file, and a counter of how many records are in + # sync with the file. + attr_reader :file, :logsize + attr_writer :default + + @databases = [] + @databases_mutex = Mutex.new + + # A handler that will ensure that databases are closed and synced when the + # current process exits. + at_exit do + loop do + db = @databases_mutex.synchronize { @databases.first } + break unless db + warn "Daybreak database #{db.file} was not closed, state might be inconsistent" + begin + db.close + rescue Exception => ex + warn "Failed to close daybreak database: #{ex.message}" + end + end + end + + class << self + # @api private + def register(db) + @databases_mutex.synchronize { @databases << db } + end + + # @api private + def unregister(db) + @databases_mutex.synchronize { @databases.delete(db) } + end + end + # Create a new Daybreak::DB. The second argument is the default value # to store when accessing a previously unset key, this follows the # Hash standard. # @param [String] file the path to the db file - # @param default the default value to store and return when a key is - # not yet in the database. + # @param [Hash] options a hash that contains the options for creating a new + # database. You can pass in :serializer, :format or :default. # @yield [key] a block that will return the default value to store. # @yieldparam [String] key the key to be stored. - def initialize(file, default=nil, &blk) - @table = {} - @file_name = file - @writer = Writer.new(@file_name) - @default = block_given? ? blk : default - read! + def initialize(file, options = {}, &block) + @file = file + @serializer = (options[:serializer] || Serializer::Default).new + @format = (options[:format] || Format).new + @default = block ? block : options[:default] + @queue = Queue.new + @table = {} + open + @mutex = Mutex.new # Mutex to make #lock thread safe + @worker = Thread.new(&method(:worker)) + @worker.priority = -1 + update + self.class.register(self) end + # Return default value belonging to key + # @param key the default value to retrieve. + def default(key = nil) + @default.respond_to?(:call) ? @default.call(key) : @default + end + + # Retrieve a value at key from the database. If the default value was specified + # when this database was created, that value will be set and returned. Aliased + # as <tt>get</tt>. + # @param key the value to retrieve from the database. + def [](key) + skey = @serializer.key_for(key) + value = @table[skey] + if value != nil || @table.has_key?(skey) + value + elsif @default + value = default(key) + @queue << [skey, value] + @table[skey] = value + end + end + alias_method :get, :'[]' + # Set a key in the database to be written at some future date. If the data # needs to be persisted immediately, call <tt>db.set(key, value, true)</tt>. # @param [#to_s] key the key of the storage slot in the database # @param value the value to store - # @param [Boolean] sync if true, sync this value immediately - def []=(key, value, sync = false) - key = key.to_s - write key, value, sync + def []=(key, value) + key = @serializer.key_for(key) + @queue << [key, value] @table[key] = value end - alias_method :set, :"[]=" + alias_method :set, :'[]=' # set! flushes data immediately to disk. - # @param [#to_s] key the key of the storage slot in the database + # @param key the key of the storage slot in the database # @param value the value to store def set!(key, value) - set key, value, true + set(key, value) + flush + value end # Delete a key from the database - # @param [#to_s] key the key of the storage slot in the database - # @param [Boolean] sync if true, sync this deletion immediately - def delete(key, sync = false) - key = key.to_s - write key, '', sync, true - @table.delete key + # @param key the key of the storage slot in the database + def delete(key) + key = @serializer.key_for(key) + @queue << [key] + @table.delete(key) end - # delete! immediately deletes the key on disk. - # @param [#to_s] key the key of the storage slot in the database + # Immediately delete the key on disk. + # @param key the key of the storage slot in the database def delete!(key) - delete key, true + value = delete(key) + flush + value end - # Retrieve a value at key from the database. If the default value was specified - # when this database was created, that value will be set and returned. Aliased - # as <tt>get</tt>. - # @param [#to_s] key the value to retrieve from the database. - def [](key) - key = key.to_s - if @table.has_key? key - @table[key] - elsif default? - set key, Proc === @default ? @default.call(key) : @default - end + # Does this db have a value for this key? + # @param key the key to check if the DB has a key. + def has_key?(key) + @table.has_key?(@serializer.key_for(key)) end - alias_method :get, :"[]" + alias_method :key?, :has_key? + alias_method :include?, :has_key? + alias_method :member?, :has_key? - # Iterate over the key, value pairs in the database. - # @yield [key, value] blk the iterator for each key value pair. - # @yieldparam [String] key the key. - # @yieldparam value the value from the database. - def each - keys.each { |k| yield(k, get(k)) } + def has_value?(value) + @table.has_value?(value) end + alias_method :value?, :has_value? - # Does this db have a default value. - def default? - !@default.nil? + # Return the number of stored items. + # @return [Integer] + def size + @table.size end + alias_method :length, :size - # Does this db have a value for this key? - # @param [key#to_s] key the key to check if the DB has a key. - def has_key?(key) - @table.has_key? key.to_s + # Return true if database is empty. + # @return [Boolean] + def empty? + @table.empty? end + # Iterate over the key, value pairs in the database. + # @yield [key, value] blk the iterator for each key value pair. + # @yieldparam key the key. + # @yieldparam value the value from the database. + def each(&block) + @table.each(&block) + end + # Return the keys in the db. - # @return [Array<String>] + # @return [Array] def keys @table.keys end - # Return the number of stored items. - # @return [Integer] - def length - @table.keys.length + # Flush all changes to disk. + def flush + @queue.flush end - alias_method :size, :length - # Serialize the data for writing to disk, if you don't want to use <tt>Marshal</tt> - # overwrite this method. - # @param value the value to be serialized - # @return [String] - def serialize(value) - Marshal.dump(value) + # Sync the database with what is on disk, by first flushing changes, and + # then reading the file if necessary. + def sync + flush + update end - # Parse the serialized value from disk, like serialize if you want to use a - # different serialization method overwrite this method. - # @param value the value to be parsed - # @return [String] - def parse(value) - Marshal.load(value) + # Lock the database for an exclusive commit accross processes and threads + # @yield a block where every change to the database is synced + def lock + @mutex.synchronize do + # We need a flush before exclusive + # so that @exclusive is not modified by the worker + flush + exclusive do + update + result = yield + flush + result + end + end end - # Empty the database file. - def empty! - @writer.truncate! + # Remove all keys and values from the database. + def clear + flush + with_tmpfile do |path, file| + file.write(@format.header) + file.close + # Clear acts like a compactification + File.rename(path, @file) + end @table.clear - read! + open + self end - alias_method :clear, :empty! - # Force all queued commits to be written to disk. - def flush! - @writer.flush! + # Compact the database to remove stale commits and reduce the file size. + def compact + sync + with_tmpfile do |path, file| + compactsize = file.write(dump) + exclusive do + stat = @fd.stat + # Check if database was compactified at the same time + if stat.nlink > 0 && stat.ino == @inode + # Compactified database has the same size -> return + return self if stat.size == compactsize + # Append changed journal records if the database changed during compactification + file.write(read) + file.close + File.rename(path, @file) + end + end + end + open + update + self end # Close the database for reading and writing. - def close! - @writer.close! + def close + @queue << nil + @worker.join + @fd.close + @queue.stop if @queue.respond_to?(:stop) + self.class.unregister(self) + nil end - # Compact the database to remove stale commits and reduce the file size. - def compact! - # Create a new temporary database - tmp_file = @file_name + "-#{$$}-#{Thread.current.object_id}" - copy_db = self.class.new tmp_file + # Check to see if we've already closed the database. + def closed? + @fd.closed? + end - # Copy the database key by key into the temporary table - each do |key, value| - copy_db.set(key, get(key)) + private + + # Update the @table with records read from the file, and increment @logsize + def update + buf = new_records + until buf.empty? + record = @format.parse(buf) + if record.size == 1 + @table.delete(record.first) + else + @table[record.first] = @serializer.load(record.last) + end + @logsize += 1 end - copy_db.close! + end - close! + # Read new records from journal log and return buffer + def new_records + loop do + unless @exclusive + # HACK: JRuby returns false if the process is already hold by the same process + # see https://github.com/jruby/jruby/issues/496 + Thread.pass until @fd.flock(File::LOCK_SH) + end + # Check if database was compactified in the meantime + # break if not + stat = @fd.stat + break if stat.nlink > 0 && stat.ino == @inode + open + end - # Move the copy into place - File.rename tmp_file, @file_name + # Read new journal records + read + ensure + @fd.flock(File::LOCK_UN) unless @exclusive + end - # Reopen this database - @writer = Writer.new(@file_name) - @table.clear - read! + def open + @fd.close if @fd + @fd = File.open(@file, 'ab+') + @fd.advise(:sequential) if @fd.respond_to? :advise + stat = @fd.stat + @inode = stat.ino + @logsize = 0 + if stat.size == 0 + @fd.write(@format.header) + @fd.flush + end + @pos = nil end - # Read all values from the log file. If you want to check for changed data - # call this again. - def read! - buf = nil - File.open(@file_name, 'rb') do |fd| - fd.flock(File::LOCK_SH) - buf = fd.read + def read + # File was opened + unless @pos + @fd.pos = 0 + @format.read_header(@fd) + else + @fd.pos = @pos end - until buf.empty? - key, data, deleted = Record.deserialize(buf) - if deleted - @table.delete key - else - @table[key] = parse(data) - end + buf = @fd.read + @pos = @fd.pos + buf + end + + # Return database dump as string + def dump + dump = @format.header + # each is faster than inject + @table.each do |record| + record[1] = @serializer.dump(record.last) + dump << @format.dump(record) end + dump end - private + # Worker thread + def worker + loop do + record = @queue.next + write_record(record) if record + @queue.pop + break unless record + end + rescue Exception => ex + warn "Daybreak worker: #{ex.message}" + retry + end - def write(key, value, sync = false, delete = false) - @writer.write([key, serialize(value), delete]) - flush! if sync + # Write record to output stream and + # advance input stream + def write_record(record) + record[1] = @serializer.dump(record.last) if record.size > 1 + record = @format.dump(record) + exclusive do + @fd.write(record) + # Flush to make sure the file is really updated + @fd.flush + end + @pos = @fd.pos if @pos && @fd.pos == @pos + record.bytesize + @logsize += 1 + end + + # Lock database exclusively + def exclusive + return yield if @exclusive + begin + loop do + # HACK: JRuby returns false if the process is already hold by the same process + # see https://github.com/jruby/jruby/issues/496 + Thread.pass until @fd.flock(File::LOCK_EX) + # Check if database was compactified in the meantime + # break if not + stat = @fd.stat + break if stat.nlink > 0 && stat.ino == @inode + open + end + @exclusive = true + yield + ensure + @fd.flock(File::LOCK_UN) + @exclusive = false + end + end + + # Open temporary file and pass it to the block + def with_tmpfile + path = [@file, $$.to_s(36), Thread.current.object_id.to_s(36)].join + file = File.open(path, 'wb') + yield(path, file) + ensure + file.close unless file.closed? + File.unlink(path) if File.exists?(path) end end end