lib/perobs/FlatFile.rb in perobs-2.5.0 vs lib/perobs/FlatFile.rb in perobs-3.0.0

- old
+ new

@@ -27,68 +27,97 @@ require 'zlib' require 'perobs/Log' require 'perobs/FlatFileBlobHeader' -require 'perobs/IndexTree' -require 'perobs/FreeSpaceManager' +require 'perobs/BTree' +require 'perobs/SpaceTree' module PEROBS # The FlatFile class manages the storage file of the FlatFileDB. It contains # a sequence of blobs Each blob consists of header and the actual # blob data bytes. class FlatFile + # The number of entries in a single BTree node of the index file. + INDEX_BTREE_ORDER = 65 + # Create a new FlatFile object for a database in the given path. # @param dir [String] Directory path for the data base file def initialize(dir) @db_dir = dir @f = nil - @index = IndexTree.new(dir) - @space_list = FreeSpaceManager.new(dir) + @index = BTree.new(@db_dir, 'index', INDEX_BTREE_ORDER) + @space_list = SpaceTree.new(@db_dir) end # Open the flat file for reading and writing. def open file_name = File.join(@db_dir, 'database.blobs') + new_db_created = false begin if File.exist?(file_name) @f = File.open(file_name, 'rb+') else - PEROBS.log.info 'New database.blobs file created' + PEROBS.log.info "New FlatFile database '#{file_name}' created" @f = File.open(file_name, 'wb+') + new_db_created = true end rescue IOError => e - PEROBS.log.fatal "Cannot open flat file database #{file_name}: " + + PEROBS.log.fatal "Cannot open FlatFile database #{file_name}: " + e.message end unless @f.flock(File::LOCK_NB | File::LOCK_EX) - PEROBS.log.fatal 'Database is locked by another process' + PEROBS.log.fatal "FlatFile database '#{file_name}' is locked by " + + "another process" end - @index.open - @space_list.open + + begin + @index.open(!new_db_created) + @space_list.open + rescue FatalError + # Ensure that the index is really closed. + @index.close + # Erase it completely + @index.erase + # Then create it again. + @index.open + + # Ensure that the spaces list is really closed. + @space_list.close + # Erase it completely + @space_list.erase + # Then create it again + @space_list.open + + regenerate_index_and_spaces + end end # Close the flat file. This method must be called to ensure that all data # is really written into the filesystem. def close @space_list.close @index.close - @f.flush - @f.flock(File::LOCK_UN) - @f.close - @f = nil + + if @f + @f.flush + @f.flock(File::LOCK_UN) + @f.close + @f = nil + end end # Force outstanding data to be written to the filesystem. def sync begin @f.flush rescue IOError => e PEROBS.log.fatal "Cannot sync flat file database: #{e.message}" end + @index.sync end # Delete the blob for the specified ID. # @param id [Integer] ID of the object to be deleted # @return [Boolean] True if object was deleted, false otherwise @@ -103,20 +132,14 @@ # Delete the blob that is stored at the specified address. # @param addr [Integer] Address of the blob to delete # @param id [Integer] ID of the blob to delete def delete_obj_by_address(addr, id) - @index.delete_value(id) + @index.remove(id) header = FlatFileBlobHeader.read_at(@f, addr, id) - begin - @f.seek(addr) - @f.write([ 0 ].pack('C')) - @f.flush - @space_list.add_space(addr, header.length) - rescue IOError => e - PEROBS.log.fatal "Cannot erase blob for ID #{header.id}: #{e.message}" - end + header.clear_flags + @space_list.add_space(addr, header.length) end # Delete all unmarked objects. def delete_unmarked_objects PEROBS.log.info "Deleting unmarked objects..." @@ -134,16 +157,27 @@ PEROBS.log.info "#{deleted_ids.length} unmarked objects deleted " + "in #{Time.now - t} seconds" deleted_ids end - # Write the given object into the file. This method assumes that no other - # entry with the given ID exists already in the file. + # Write the given object into the file. This method never uses in-place + # updates for existing objects. A new copy is inserted first and only when + # the insert was successful, the old copy is deleted and the index + # updated. # @param id [Integer] ID of the object # @param raw_obj [String] Raw object as String # @return [Integer] position of the written blob in the blob file def write_obj_by_id(id, raw_obj) + # Check if we have already an object with the given ID. We'll mark it as + # outdated and save the header for later deletion. In case this + # operation is aborted or interrupted we ensure that we either have the + # old or the new version available. + if (old_addr = find_obj_addr_by_id(id)) + old_header = FlatFileBlobHeader.read_at(@f, old_addr) + old_header.set_outdated_flag + end + crc = checksum(raw_obj) # If the raw_obj is larger then 256 characters we will compress it to # safe some space in the database file. For smaller strings the # performance impact of compression is not compensated by writing @@ -166,16 +200,16 @@ if raw_obj.length > header.length PEROBS.log.fatal "Object (#{raw_obj.length}) is longer than " + "blob space (#{header.length})." end if header.is_valid? - PEROBS.log.fatal "Entry (mark: #{header.mark}) is already used." + PEROBS.log.fatal "Entry (flags: #{header.flags}) is already used." end end - @f.seek(addr) - FlatFileBlobHeader.new(compressed ? (1 << 2) | 1 : 1, raw_obj.length, - id, crc).write(@f) + flags = 1 << FlatFileBlobHeader::VALID_FLAG_BIT + flags |= (1 << FlatFileBlobHeader::COMPRESSED_FLAG_BIT) if compressed + FlatFileBlobHeader.new(@f, addr, flags, raw_obj.length, id, crc).write @f.write(raw_obj) if length != -1 && raw_obj.length < length # The new object was not appended and it did not completely fill the # free space. So we have to write a new header to mark the remaining # empty space. @@ -184,16 +218,24 @@ "header (space: #{length} bytes, object: #{raw_obj.length} " + "bytes)." end space_address = @f.pos space_length = length - FlatFileBlobHeader::LENGTH - raw_obj.length - FlatFileBlobHeader.new(0, space_length, 0, 0).write(@f) + FlatFileBlobHeader.new(@f, space_address, 0, space_length, + 0, 0).write # Register the new space with the space list. @space_list.add_space(space_address, space_length) if space_length > 0 end - @f.flush - @index.put_value(id, addr) + if old_addr + # If we had an existing object stored for the ID we have to mark + # this entry as deleted now. + old_header.clear_flags + else + @f.flush + end + # Once the blob has been written we can update the index as well. + @index.insert(id, addr) rescue IOError => e PEROBS.log.fatal "Cannot write blob for ID #{id} to FlatFileDB: " + e.message end @@ -202,11 +244,11 @@ # Find the address of the object with the given ID. # @param id [Integer] ID of the object # @return [Integer] Offset in the flat file or nil if not found def find_obj_addr_by_id(id) - @index.get_value(id) + @index.get(id) end # Read the object with the given ID. # @param id [Integer] ID of the object # @return [String or nil] Raw object data if found, otherwise nil @@ -236,13 +278,18 @@ buf = @f.read(header.length) rescue IOError => e PEROBS.log.fatal "Cannot read blob for ID #{id}: #{e.message}" end - # Uncompress the data if the compression bit is set in the mark byte. + # Uncompress the data if the compression bit is set in the flags byte. if header.is_compressed? - buf = Zlib.inflate(buf) + begin + buf = Zlib.inflate(buf) + rescue Zlib::BufError, Zlib::DataError + PEROBS.log.fatal "Corrupted compressed block with ID " + + "#{header.id} found." + end end if checksum(buf) != header.crc PEROBS.log.fatal "Checksum failure while reading blob ID #{id}" end @@ -260,19 +307,11 @@ # Mark the object at the specified address. # @param addr [Integer] Offset in the file # @param id [Integer] ID of the object def mark_obj_by_address(addr, id) - header = FlatFileBlobHeader.read_at(@f, addr, id) - begin - @f.seek(addr) - @f.write([ header.mark | (1 << 1) ].pack('C')) - @f.flush - rescue IOError => e - PEROBS.log.fatal "Marking of FlatFile blob with ID #{id} " + - "failed: #{e.message}" - end + FlatFileBlobHeader.read_at(@f, addr, id).set_mark_flag end # Return true if the object with the given ID is marked, false otherwise. # @param id [Integer] ID of the object def is_marked_by_id?(id) @@ -295,28 +334,22 @@ each_blob_header do |pos, header| total_blob_count += 1 if header.is_valid? && header.is_marked? # Clear all valid and marked blocks. marked_blob_count += 1 - begin - @f.seek(pos) - @f.write([ header.mark & 0b11111101 ].pack('C')) - @f.flush - rescue IOError => e - PEROBS.log.fatal "Unmarking of FlatFile blob with ID #{blob_id} " + - "failed: #{e.message}" - end + header.clear_mark_flag end end PEROBS.log.info "#{marked_blob_count} marks in #{total_blob_count} " + "objects cleared in #{Time.now - t} seconds" end # Eliminate all the holes in the file. This is an in-place # implementation. No additional space will be needed on the file system. def defragmentize distance = 0 + new_file_size = 0 deleted_blobs = 0 valid_blobs = 0 t = Time.now PEROBS.log.info "Defragmenting FlatFile" # Iterate over all entries. @@ -333,21 +366,23 @@ buf = @f.read(entry_bytes) # Write the buffer right after the end of the previous entry. @f.seek(pos - distance) @f.write(buf) # Update the index with the new position - @index.put_value(header.id, pos - distance) + @index.insert(header.id, pos - distance) # Mark the space between the relocated current entry and the # next valid entry as deleted space. - FlatFileBlobHeader.new(0, distance - FlatFileBlobHeader::LENGTH, - 0, 0).write(@f) + FlatFileBlobHeader.new(@f, @f.pos, 0, + distance - FlatFileBlobHeader::LENGTH, + 0, 0).write @f.flush rescue IOError => e PEROBS.log.fatal "Error while moving blob for ID #{header.id}: " + e.message end end + new_file_size = pos + FlatFileBlobHeader::LENGTH + header.length else deleted_blobs += 1 distance += entry_bytes end end @@ -355,11 +390,11 @@ PEROBS.log.info "#{distance / 1000} KiB/#{deleted_blobs} blobs of " + "#{@f.size / 1000} KiB/#{valid_blobs} blobs or " + "#{'%.1f' % (distance.to_f / @f.size * 100.0)}% reclaimed" @f.flush - @f.truncate(@f.size - distance) + @f.truncate(new_file_size) @f.flush @space_list.clear sync end @@ -391,60 +426,120 @@ # Reclaim the space saved by compressing entries. defragmentize end + # Check (and repair) the FlatFile. + # @param repair [Boolean] True if errors should be fixed. + # @return [Integer] Number of errors found def check(repair = false) - return unless @f + errors = 0 + return errors unless @f t = Time.now PEROBS.log.info "Checking FlatFile database" + "#{repair ? ' in repair mode' : ''}..." # First check the database blob file. Each entry should be readable and - # correct. + # correct and all IDs must be unique. We use a shadow index to keep + # track of the already found IDs. + new_index = BTree.new(@db_dir, 'new-index', INDEX_BTREE_ORDER) + new_index.erase + new_index.open + each_blob_header do |pos, header| if header.is_valid? # We have a non-deleted entry. begin @f.seek(pos + FlatFileBlobHeader::LENGTH) buf = @f.read(header.length) + if buf.length != header.length + PEROBS.log.error "Premature end of file in blob with ID " + + "#{header.id}." + discard_damaged_blob(header) if repair + errors += 1 + next + end + # Uncompress the data if the compression bit is set in the mark # byte. - buf = Zlib.inflate(buf) if header.is_compressed? + if header.is_compressed? + begin + buf = Zlib.inflate(buf) + rescue Zlib::BufError, Zlib::DataError + PEROBS.log.error "Corrupted compressed block with ID " + + "#{header.id} found." + discard_damaged_blob(header) if repair + errors += 1 + next + end + end if header.crc && checksum(buf) != header.crc - if repair - PEROBS.log.error "Checksum failure while checking blob " + - "with ID #{header.id}. Deleting object." - delete_obj_by_address(pos, header.id) - else - PEROBS.log.fatal "Checksum failure while checking blob " + - "with ID #{header.id}" - end + PEROBS.log.error "Checksum failure while checking blob " + + "with ID #{header.id}" + discard_damaged_blob(header) if repair + errors += 1 + next end rescue IOError => e PEROBS.log.fatal "Check of blob with ID #{header.id} failed: " + e.message end + + # Check if the ID has already been found in the file. + if (previous_address = new_index.get(header.id)) + PEROBS.log.error "Multiple blobs for ID #{header.id} found. " + + "Addresses: #{previous_address}, #{pos}" + previous_header = FlatFileBlobHeader.read_at(@f, previous_address, + header.id) + if repair + # We have two blobs with the same ID and we must discard one of + # them. + if header.is_outdated? + discard_damaged_blob(header) + elsif previous_header.is_outdated? + discard_damaged_blob(previous_header) + else + PEROBS.log.error "None of the blobs with same ID have " + + "the outdated flag set. Deleting the smaller one." + discard_damaged_blob(header.length < previous_header.length ? + header : previous_header) + end + next + end + else + # ID is unique so far. Add it to the shadow index. + new_index.insert(header.id, pos) + end + end end + # We no longer need the new index. + new_index.close + new_index.erase # Now we check the index data. It must be correct and the entries must # match the blob file. All entries in the index must be in the blob file # and vise versa. begin - unless @index.check(self) && @space_list.check(self) && - cross_check_entries + index_ok = @index.check do |id, address| + has_id_at?(id, address) + end + unless index_ok && @space_list.check(self) && cross_check_entries regenerate_index_and_spaces if repair end rescue PEROBS::FatalError + errors += 1 regenerate_index_and_spaces if repair end sync if repair - PEROBS.log.info "check_db completed in #{Time.now - t} seconds" + PEROBS.log.info "check_db completed in #{Time.now - t} seconds. " + + "#{errors} errors found." + + errors end # This method clears the index tree and the free space list and # regenerates them from the FlatFile. def regenerate_index_and_spaces @@ -452,11 +547,11 @@ @index.clear @space_list.clear each_blob_header do |pos, header| if header.is_valid? - @index.put_value(header.id, pos) + @index.insert(header.id, pos) else @space_list.add_space(pos, header.length) if header.length > 0 end end end @@ -472,23 +567,21 @@ end def inspect s = '[' each_blob_header do |pos, header| - s << "{ :pos => #{pos}, :mark => #{header.mark}, " + + s << "{ :pos => #{pos}, :flags => #{header.flags}, " + ":length => #{header.length}, :id => #{header.id}, " + ":crc => #{header.crc}" if header.is_valid? s << ", :value => #{@f.read(header.length)}" end s << " }\n" end s + ']' end - - private def each_blob_header(&block) pos = 0 begin @@ -527,29 +620,37 @@ def checksum(raw_obj) Zlib.crc32(raw_obj, 0) end def cross_check_entries + errors = 0 + each_blob_header do |pos, header| if !header.is_valid? if header.length > 0 unless @space_list.has_space?(pos, header.length) PEROBS.log.error "FlatFile has free space " + "(addr: #{pos}, len: #{header.length}) that is not in " + "FreeSpaceManager" - return false + errors += 1 end end else - unless @index.get_value(header.id) == pos + unless @index.get(header.id) == pos PEROBS.log.error "FlatFile blob at address #{pos} is listed " + - "in index with address #{@index.get_value(header.id)}" - return false + "in index with address #{@index.get(header.id)}" + errors += 1 end end end - true + errors == 0 + end + + def discard_damaged_blob(header) + PEROBS.log.error "Discarding corrupted data blob for ID #{header.id} " + + "at offset #{header.addr}" + header.clear_flags end end end