lib/perobs/FlatFile.rb in perobs-4.1.0 vs lib/perobs/FlatFile.rb in perobs-4.2.0

- old
+ new

@@ -1,10 +1,10 @@ # encoding: UTF-8 # # = FlatFile.rb -- Persistent Ruby Object Store # -# Copyright (c) 2016, 2018 by Chris Schlaeger <chris@taskjuggler.org> +# Copyright (c) 2016, 2018, 2019 by Chris Schlaeger <chris@taskjuggler.org> # # MIT License # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the @@ -29,10 +29,11 @@ require 'perobs/Log' require 'perobs/FlatFileBlobHeader' require 'perobs/BTree' require 'perobs/SpaceTree' +require 'perobs/SpaceManager' require 'perobs/IDList' module PEROBS # The FlatFile class manages the storage file of the FlatFileDB. It contains @@ -49,11 +50,18 @@ @db_dir = dir @progressmeter = progressmeter @f = nil @marks = nil @index = BTree.new(@db_dir, 'index', INDEX_BTREE_ORDER, @progressmeter) - @space_list = SpaceTree.new(@db_dir, @progressmeter) + old_spaces_file = File.join(@db_dir, 'database_spaces.blobs') + if File.exist?(old_spaces_file) + # PEROBS version 4.1.0 and earlier used this space list format. It is + # deprecated now. Newly created DBs use the SpaceManager format. + @space_list = SpaceTree.new(@db_dir, @progressmeter) + else + @space_list = SpaceManager.new(@db_dir, @progressmeter) + end end # Open the flat file for reading and writing. def open file_name = File.join(@db_dir, 'database.blobs') @@ -132,21 +140,22 @@ header.clear_flags @space_list.add_space(addr, header.length) if @space_list.is_open? end # Delete all unmarked objects. - def delete_unmarked_objects + def delete_unmarked_objects(&block) # We don't update the index and the space list during this operation as # we defragmentize the blob file at the end. We'll end the operation # with an empty space list. clear_index_files deleted_objects_count = 0 @progressmeter.start('Sweeping unmarked objects', @f.size) do |pm| each_blob_header do |header| if header.is_valid? && !@marks.include?(header.id) delete_obj_by_address(header.addr, header.id) + yield(header.id) if block_given? deleted_objects_count += 1 end pm.update(header.addr) end @@ -181,48 +190,50 @@ # If the raw_obj is larger then 256 characters we will compress it to # safe some space in the database file. For smaller strings the # performance impact of compression is not compensated by writing # less data to the storage. compressed = false - if raw_obj.bytesize > 256 + raw_obj_bytesize = raw_obj.bytesize + if raw_obj_bytesize > 256 raw_obj = Zlib.deflate(raw_obj) + raw_obj_bytesize = raw_obj.bytesize compressed = true end - addr, length = find_free_blob(raw_obj.bytesize) + addr, length = find_free_blob(raw_obj_bytesize) begin if length != -1 # Just a safeguard so we don't overwrite current data. header = FlatFileBlobHeader.read(@f, addr) if header.length != length PEROBS.log.fatal "Length in free list (#{length}) and header " + "(#{header.length}) for address #{addr} don't match." end - if raw_obj.bytesize > header.length - PEROBS.log.fatal "Object (#{raw_obj.bytesize}) is longer than " + + if raw_obj_bytesize > header.length + PEROBS.log.fatal "Object (#{raw_obj_bytesize}) is longer than " + "blob space (#{header.length})." end if header.is_valid? PEROBS.log.fatal "Entry at address #{addr} with flags: " + "#{header.flags} is already used for ID #{header.id}." end end flags = 1 << FlatFileBlobHeader::VALID_FLAG_BIT flags |= (1 << FlatFileBlobHeader::COMPRESSED_FLAG_BIT) if compressed - FlatFileBlobHeader.new(@f, addr, flags, raw_obj.bytesize, id, crc).write + FlatFileBlobHeader.new(@f, addr, flags, raw_obj_bytesize, id, crc).write @f.write(raw_obj) - if length != -1 && raw_obj.bytesize < length + if length != -1 && raw_obj_bytesize < length # The new object was not appended and it did not completely fill the # free space. So we have to write a new header to mark the remaining # empty space. - unless length - raw_obj.bytesize >= FlatFileBlobHeader::LENGTH + unless length - raw_obj_bytesize >= FlatFileBlobHeader::LENGTH PEROBS.log.fatal "Not enough space to append the empty space " + - "header (space: #{length} bytes, object: #{raw_obj.bytesize} " + + "header (space: #{length} bytes, object: #{raw_obj_bytesize} " + "bytes)." end space_address = @f.pos - space_length = length - FlatFileBlobHeader::LENGTH - raw_obj.bytesize + space_length = length - FlatFileBlobHeader::LENGTH - raw_obj_bytesize FlatFileBlobHeader.new(@f, space_address, 0, space_length, 0, 0).write # Register the new space with the space list. if @space_list.is_open? && space_length > 0 @space_list.add_space(space_address, space_length) @@ -459,10 +470,11 @@ @progressmeter) new_index.erase new_index.open corrupted_blobs = 0 + end_of_last_healthy_blob = nil @progressmeter.start('Checking blobs file', @f.size) do |pm| corrupted_blobs = each_blob_header do |header| if header.is_valid? # We have a non-deleted entry. begin @@ -527,16 +539,30 @@ end else # ID is unique so far. Add it to the shadow index. new_index.insert(header.id, header.addr) end - end + end_of_last_healthy_blob = header.addr + + FlatFileBlobHeader::LENGTH + header.length pm.update(header.addr) end + if end_of_last_healthy_blob && end_of_last_healthy_blob != @f.size + # The blob file ends with a corrupted blob header. + PEROBS.log.error "#{@f.size - end_of_last_healthy_blob} corrupted " + + 'bytes found at the end of FlatFile.' + corrupted_blobs += 1 + if repair + PEROBS.log.error "Truncating FlatFile to " + + "#{end_of_last_healthy_blob} bytes by discarding " + + "#{@f.size - end_of_last_healthy_blob} bytes" + @f.truncate(end_of_last_healthy_blob) + end + end + errors += corrupted_blobs end # We no longer need the new index. new_index.close @@ -544,11 +570,11 @@ if repair && corrupted_blobs > 0 erase_index_files defragmentize regenerate_index_and_spaces - else + elsif corrupted_blobs == 0 # Now we check the index data. It must be correct and the entries must # match the blob file. All entries in the index must be in the blob file # and vise versa. begin index_ok = @index.check do |id, address| @@ -573,10 +599,145 @@ "#{errors} errors found." errors end + # Repair the FlatFile. In contrast to the repair functionality in the + # check() method this method is much faster. It simply re-creates the + # index and space list from the blob file. + # @param repair [Boolean] True if errors should be fixed. + # @return [Integer] Number of errors found + def repair + errors = 0 + return errors unless @f + + t = Time.now + PEROBS.log.info "Repairing FlatFile database" + + # Erase and re-open the index and space list files. We purposely don't + # close the files at it would trigger needless flushing. + clear_index_files(true) + + # Now we scan the blob file and re-index all blobs and spaces. Corrupted + # blobs will be skipped. + corrupted_blobs = 0 + end_of_last_healthy_blob = nil + @progressmeter.start('Re-indexing blobs file', @f.size) do |pm| + corrupted_blobs = each_blob_header do |header| + if header.corruption_start + # The blob is preceeded by a corrupted area. We create a new + # header of a deleted blob for this area and write the new blob + # over it. + if (data_length = header.addr - header.corruption_start - + FlatFileBlobHeader::LENGTH) <= 0 + PEROBS.log.error "Found a corrupted blob that is too small to " + + "fit a header (#{data_length}). File must be defragmented." + else + new_header = FlatFileBlobHeader.new(@f, header.corruption_start, + 0, data_length, 0, 0) + new_header.write + @space_list.add_space(header.corruption_start, data_length) + end + end + + if header.is_valid? + # We have a non-deleted entry. + begin + @f.seek(header.addr + FlatFileBlobHeader::LENGTH) + buf = @f.read(header.length) + if buf.bytesize != header.length + PEROBS.log.error "Premature end of file in blob with ID " + + "#{header.id}." + discard_damaged_blob(header) + errors += 1 + next + end + + # Uncompress the data if the compression bit is set in the mark + # byte. + if header.is_compressed? + begin + buf = Zlib.inflate(buf) + rescue Zlib::BufError, Zlib::DataError + PEROBS.log.error "Corrupted compressed block with ID " + + "#{header.id} found." + discard_damaged_blob(header) + errors += 1 + next + end + end + + if header.crc && checksum(buf) != header.crc + PEROBS.log.error "Checksum failure while checking blob " + + "with ID #{header.id}" + discard_damaged_blob(header) + errors += 1 + next + end + rescue IOError => e + PEROBS.log.fatal "Check of blob with ID #{header.id} failed: " + + e.message + end + + # Check if the ID has already been found in the file. + if (previous_address = @index.get(header.id)) + PEROBS.log.error "Multiple blobs for ID #{header.id} found. " + + "Addresses: #{previous_address}, #{header.addr}" + errors += 1 + previous_header = FlatFileBlobHeader.read(@f, previous_address, + header.id) + # We have two blobs with the same ID and we must discard one of + # them. + if header.is_outdated? + discard_damaged_blob(header) + elsif previous_header.is_outdated? + discard_damaged_blob(previous_header) + else + PEROBS.log.error "None of the blobs with same ID have " + + "the outdated flag set. Deleting the smaller one." + errors += 1 + discard_damaged_blob(header.length < previous_header.length ? + header : previous_header) + end + else + # ID is unique so far. Add it to the shadow index. + @index.insert(header.id, header.addr) + end + + else + if header.length > 0 + @space_list.add_space(header.addr, header.length) + end + end + end_of_last_healthy_blob = header.addr + + FlatFileBlobHeader::LENGTH + header.length + + pm.update(header.addr) + end + + if end_of_last_healthy_blob && end_of_last_healthy_blob != @f.size + # The blob file ends with a corrupted blob header. + PEROBS.log.error "#{@f.size - end_of_last_healthy_blob} corrupted " + + 'bytes found at the end of FlatFile.' + corrupted_blobs += 1 + + PEROBS.log.error "Truncating FlatFile to " + + "#{end_of_last_healthy_blob} bytes by discarding " + + "#{@f.size - end_of_last_healthy_blob} bytes" + @f.truncate(end_of_last_healthy_blob) + end + + errors += corrupted_blobs + end + + sync + PEROBS.log.info "FlatFile repair completed in #{Time.now - t} seconds. " + + "#{errors} errors found." + + errors + end + # This method clears the index tree and the free space list and # regenerates them from the FlatFile. def regenerate_index_and_spaces PEROBS.log.warn "Re-generating FlatFileDB index and space files" @index.open unless @index.is_open? @@ -615,11 +776,15 @@ header = FlatFileBlobHeader.read(@f, address) !header.is_valid? && header.length == size end def has_id_at?(id, address) - header = FlatFileBlobHeader.read(@f, address) + begin + header = FlatFileBlobHeader.read(@f, address) + rescue PEROBS::FatalError + return false + end header.is_valid? && header.id == id end def inspect s = '[' @@ -731,11 +896,11 @@ if !header.is_valid? if header.length > 0 unless @space_list.has_space?(header.addr, header.length) PEROBS.log.error "FlatFile has free space " + "(addr: #{header.addr}, len: #{header.length}) that is " + - "not in FreeSpaceManager" + "not in SpaceManager" errors += 1 end end else if (index_address = @index.get(header.id)).nil? @@ -765,52 +930,39 @@ def open_index_files(abort_on_missing_files = false) begin @index.open(abort_on_missing_files) @space_list.open rescue FatalError - # Ensure that the index is really closed. - @index.close - # Erase it completely - @index.erase - # Then create it again. - @index.open - - # Ensure that the spaces list is really closed. - @space_list.close - # Erase it completely - @space_list.erase - # Then create it again - @space_list.open - + clear_index_files regenerate_index_and_spaces end end - def erase_index_files + def erase_index_files(dont_close_files = false) # Ensure that the index is really closed. - @index.close + @index.close unless dont_close_files # Erase it completely @index.erase # Ensure that the spaces list is really closed. - @space_list.close + @space_list.close unless dont_close_files # Erase it completely @space_list.erase + + if @space_list.is_a?(SpaceTree) + # If we still use the old SpaceTree format, this is the moment to + # convert it to the new SpaceManager format. + @space_list = SpaceManager.new(@db_dir, @progressmeter) + PEROBS.log.warn "Converting space list from SpaceTree format " + + "to SpaceManager format" + end end - def clear_index_files - # Ensure that the index is really closed. - @index.close - # Erase it completely - @index.erase - # Then create it again. - @index.open + def clear_index_files(dont_close_files = false) + erase_index_files(dont_close_files) - # Ensure that the spaces list is really closed. - @space_list.close - # Erase it completely - @space_list.erase - # Then create it again + # Then create them again. + @index.open @space_list.open end end