lib/perobs/FlatFile.rb in perobs-4.0.0 vs lib/perobs/FlatFile.rb in perobs-4.1.0

- old
+ new

@@ -1,10 +1,10 @@ # encoding: UTF-8 # # = FlatFile.rb -- Persistent Ruby Object Store # -# Copyright (c) 2016 by Chris Schlaeger <chris@taskjuggler.org> +# Copyright (c) 2016, 2018 by Chris Schlaeger <chris@taskjuggler.org> # # MIT License # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the @@ -29,10 +29,11 @@ require 'perobs/Log' require 'perobs/FlatFileBlobHeader' require 'perobs/BTree' require 'perobs/SpaceTree' +require 'perobs/IDList' module PEROBS # The FlatFile class manages the storage file of the FlatFileDB. It contains # a sequence of blobs Each blob consists of header and the actual @@ -42,16 +43,17 @@ # The number of entries in a single BTree node of the index file. INDEX_BTREE_ORDER = 65 # Create a new FlatFile object for a database in the given path. # @param dir [String] Directory path for the data base file - def initialize(dir) + def initialize(dir, progressmeter) @db_dir = dir + @progressmeter = progressmeter @f = nil - @index = BTree.new(@db_dir, 'index', INDEX_BTREE_ORDER) - @marks = BTree.new(@db_dir, 'marks', INDEX_BTREE_ORDER) - @space_list = SpaceTree.new(@db_dir) + @marks = nil + @index = BTree.new(@db_dir, 'index', INDEX_BTREE_ORDER, @progressmeter) + @space_list = SpaceTree.new(@db_dir, @progressmeter) end # Open the flat file for reading and writing. def open file_name = File.join(@db_dir, 'database.blobs') @@ -72,38 +74,24 @@ PEROBS.log.fatal "FlatFile database '#{file_name}' is locked by " + "another process" end @f.sync = true - begin - @index.open(!new_db_created) - @space_list.open - rescue FatalError - # Ensure that the index is really closed. - @index.close - # Erase it completely - @index.erase - # Then create it again. - @index.open - - # Ensure that the spaces list is really closed. - @space_list.close - # Erase it completely - @space_list.erase - # Then create it again - @space_list.open - - regenerate_index_and_spaces - end + open_index_files(!new_db_created) end # Close the flat file. This method must be called to ensure that all data # is really written into the filesystem. def close - @space_list.close - @index.close + @space_list.close if @space_list.is_open? + @index.close if @index.is_open? + if @marks + @marks.erase + @marks = nil + end + if @f @f.flush @f.flock(File::LOCK_UN) @f.fsync @f.close @@ -137,33 +125,40 @@ # Delete the blob that is stored at the specified address. # @param addr [Integer] Address of the blob to delete # @param id [Integer] ID of the blob to delete def delete_obj_by_address(addr, id) - @index.remove(id) - header = FlatFileBlobHeader.read_at(@f, addr, id) + @index.remove(id) if @index.is_open? + header = FlatFileBlobHeader.read(@f, addr, id) header.clear_flags - @space_list.add_space(addr, header.length) + @space_list.add_space(addr, header.length) if @space_list.is_open? end # Delete all unmarked objects. def delete_unmarked_objects - PEROBS.log.info "Deleting unmarked objects..." - t = Time.now + # We don't update the index and the space list during this operation as + # we defragmentize the blob file at the end. We'll end the operation + # with an empty space list. + clear_index_files - deleted_ids = [] - each_blob_header do |pos, header| - if header.is_valid? && @marks.get(header.id).nil? - delete_obj_by_address(pos, header.id) - deleted_ids << header.id + deleted_objects_count = 0 + @progressmeter.start('Sweeping unmarked objects', @f.size) do |pm| + each_blob_header do |header| + if header.is_valid? && !@marks.include?(header.id) + delete_obj_by_address(header.addr, header.id) + deleted_objects_count += 1 + end + + pm.update(header.addr) end end defragmentize - PEROBS.log.info "#{deleted_ids.length} unmarked objects deleted " + - "in #{Time.now - t} seconds" - deleted_ids + # Update the index file and create a new, empty space list. + regenerate_index_and_spaces + + deleted_objects_count end # Write the given object into the file. This method never uses in-place # updates for existing objects. A new copy is inserted first and only when # the insert was successful, the old copy is deleted and the index @@ -175,11 +170,11 @@ # Check if we have already an object with the given ID. We'll mark it as # outdated and save the header for later deletion. In case this # operation is aborted or interrupted we ensure that we either have the # old or the new version available. if (old_addr = find_obj_addr_by_id(id)) - old_header = FlatFileBlobHeader.read_at(@f, old_addr) + old_header = FlatFileBlobHeader.read(@f, old_addr) old_header.set_outdated_flag end crc = checksum(raw_obj) @@ -195,11 +190,11 @@ addr, length = find_free_blob(raw_obj.bytesize) begin if length != -1 # Just a safeguard so we don't overwrite current data. - header = FlatFileBlobHeader.read_at(@f, addr) + header = FlatFileBlobHeader.read(@f, addr) if header.length != length PEROBS.log.fatal "Length in free list (#{length}) and header " + "(#{header.length}) for address #{addr} don't match." end if raw_obj.bytesize > header.length @@ -227,22 +222,26 @@ space_address = @f.pos space_length = length - FlatFileBlobHeader::LENGTH - raw_obj.bytesize FlatFileBlobHeader.new(@f, space_address, 0, space_length, 0, 0).write # Register the new space with the space list. - @space_list.add_space(space_address, space_length) if space_length > 0 + if @space_list.is_open? && space_length > 0 + @space_list.add_space(space_address, space_length) + end end # Once the blob has been written we can update the index as well. - @index.insert(id, addr) + @index.insert(id, addr) if @index.is_open? if old_addr # If we had an existing object stored for the ID we have to mark # this entry as deleted now. old_header.clear_flags # And register the newly freed space with the space list. - @space_list.add_space(old_addr, old_header.length) + if @space_list.is_open? + @space_list.add_space(old_addr, old_header.length) + end else @f.flush end rescue IOError => e PEROBS.log.fatal "Cannot write blob for ID #{id} to FlatFileDB: " + @@ -268,25 +267,21 @@ end nil end - def search_object(id) - each_blob_header do |pos, header| - return read_obj_by_address(pos, id) - end - - nil + # @return [Integer] Number of items stored in the DB. + def item_counter + @index.entries_count end - # Read the object at the specified address. # @param addr [Integer] Offset in the flat file # @param id [Integer] ID of the data blob # @return [String] Raw object data def read_obj_by_address(addr, id) - header = FlatFileBlobHeader.read_at(@f, addr, id) + header = FlatFileBlobHeader.read(@f, addr, id) if header.id != id PEROBS.log.fatal "Database index corrupted: Index for object " + "#{id} points to object with ID #{header.id}" end @@ -317,108 +312,135 @@ end # Mark the object with the given ID. # @param id [Integer] ID of the object def mark_obj_by_id(id) - @marks.insert(id, 0) + @marks.insert(id) end # Return true if the object with the given ID is marked, false otherwise. # @param id [Integer] ID of the object def is_marked_by_id?(id) - !@marks.get(id).nil? + @marks.include?(id) end # Clear alls marks. def clear_all_marks - @marks.erase - @marks.open + if @marks + @marks.clear + else + @marks = IDList.new(@db_dir, 'marks', 8) + end end # Eliminate all the holes in the file. This is an in-place # implementation. No additional space will be needed on the file system. def defragmentize distance = 0 new_file_size = 0 deleted_blobs = 0 + corrupted_blobs = 0 valid_blobs = 0 - t = Time.now - PEROBS.log.info "Defragmenting FlatFile" + # Iterate over all entries. - each_blob_header do |pos, header| - # Total size of the current entry - entry_bytes = FlatFileBlobHeader::LENGTH + header.length - if header.is_valid? - # We have found a valid entry. - valid_blobs += 1 - if distance > 0 - begin - # Read current entry into a buffer - @f.seek(pos) - buf = @f.read(entry_bytes) - # Write the buffer right after the end of the previous entry. - @f.seek(pos - distance) - @f.write(buf) - # Update the index with the new position - @index.insert(header.id, pos - distance) - # Mark the space between the relocated current entry and the - # next valid entry as deleted space. - FlatFileBlobHeader.new(@f, @f.pos, 0, - distance - FlatFileBlobHeader::LENGTH, - 0, 0).write - @f.flush - rescue IOError => e - PEROBS.log.fatal "Error while moving blob for ID #{header.id}: " + - e.message + @progressmeter.start('Defragmentizing blobs file', @f.size) do |pm| + each_blob_header do |header| + # If we have stumbled over a corrupted blob we treat it similar to a + # deleted blob and reuse the space. + if header.corruption_start + distance += header.addr - header.corruption_start + corrupted_blobs += 1 + end + + # Total size of the current entry + entry_bytes = FlatFileBlobHeader::LENGTH + header.length + if header.is_valid? + # We have found a valid entry. + valid_blobs += 1 + if distance > 0 + begin + # Read current entry into a buffer + @f.seek(header.addr) + buf = @f.read(entry_bytes) + # Write the buffer right after the end of the previous entry. + @f.seek(header.addr - distance) + @f.write(buf) + # Mark the space between the relocated current entry and the + # next valid entry as deleted space. + FlatFileBlobHeader.new(@f, @f.pos, 0, + distance - FlatFileBlobHeader::LENGTH, + 0, 0).write + @f.flush + rescue IOError => e + PEROBS.log.fatal "Error while moving blob for ID " + + "#{header.id}: #{e.message}" + end end + new_file_size = header.addr - distance + + FlatFileBlobHeader::LENGTH + header.length + else + deleted_blobs += 1 + distance += entry_bytes end - new_file_size = pos + FlatFileBlobHeader::LENGTH + header.length - else - deleted_blobs += 1 - distance += entry_bytes + + pm.update(header.addr) end end - PEROBS.log.info "FlatFile defragmented in #{Time.now - t} seconds" + PEROBS.log.info "#{distance / 1000} KiB/#{deleted_blobs} blobs of " + "#{@f.size / 1000} KiB/#{valid_blobs} blobs or " + "#{'%.1f' % (distance.to_f / @f.size * 100.0)}% reclaimed" + if corrupted_blobs > 0 + PEROBS.log.info "#{corrupted_blobs} corrupted blob(s) found. Space " + + "was recycled." + end @f.flush @f.truncate(new_file_size) @f.flush - @space_list.clear sync end # This method iterates over all entries in the FlatFile and removes the # entry and inserts it again. This is useful to update all entries in - # cased the storage format has changed. + # case the storage format has changed. def refresh # This iteration might look scary as we iterate over the entries while # while we are rearranging them. Re-inserted items may be inserted # before or at the current entry and this is fine. They also may be # inserted after the current entry and will be re-read again unless they # are inserted after the original file end. file_size = @f.size - PEROBS.log.info "Refreshing the DB..." - t = Time.now - each_blob_header do |pos, header| - if header.is_valid? - buf = read_obj_by_address(pos, header.id) - delete_obj_by_address(pos, header.id) - write_obj_by_id(header.id, buf) - end - # Some re-inserted blobs may be inserted after the original file end. - # No need to process those blobs again. - break if pos >= file_size + # We don't update the index and the space list during this operation as + # we defragmentize the blob file at the end. We'll end the operation + # with an empty space list. + clear_index_files + + @progressmeter.start('Converting objects to new storage format', + @f.size) do |pm| + each_blob_header do |header| + if header.is_valid? + buf = read_obj_by_address(header.addr, header.id) + delete_obj_by_address(header.addr, header.id) + write_obj_by_id(header.id, buf) + end + + # Some re-inserted blobs may be inserted after the original file end. + # No need to process those blobs again. + break if header.addr >= file_size + + pm.update(header.addr) + end end - PEROBS.log.info "DB refresh completed in #{Time.now - t} seconds" # Reclaim the space saved by compressing entries. defragmentize + + # Recreate the index file and create an empty space list. + regenerate_index_and_spaces end # Check (and repair) the FlatFile. # @param repair [Boolean] True if errors should be fixed. # @return [Integer] Number of errors found @@ -431,99 +453,121 @@ "#{repair ? ' in repair mode' : ''}..." # First check the database blob file. Each entry should be readable and # correct and all IDs must be unique. We use a shadow index to keep # track of the already found IDs. - new_index = BTree.new(@db_dir, 'new-index', INDEX_BTREE_ORDER) + new_index = BTree.new(@db_dir, 'new-index', INDEX_BTREE_ORDER, + @progressmeter) new_index.erase new_index.open - each_blob_header do |pos, header| - if header.is_valid? - # We have a non-deleted entry. - begin - @f.seek(pos + FlatFileBlobHeader::LENGTH) - buf = @f.read(header.length) - if buf.bytesize != header.length - PEROBS.log.error "Premature end of file in blob with ID " + - "#{header.id}." - discard_damaged_blob(header) if repair - errors += 1 - next - end + corrupted_blobs = 0 + @progressmeter.start('Checking blobs file', @f.size) do |pm| + corrupted_blobs = each_blob_header do |header| + if header.is_valid? + # We have a non-deleted entry. + begin + @f.seek(header.addr + FlatFileBlobHeader::LENGTH) + buf = @f.read(header.length) + if buf.bytesize != header.length + PEROBS.log.error "Premature end of file in blob with ID " + + "#{header.id}." + discard_damaged_blob(header) if repair + errors += 1 + next + end - # Uncompress the data if the compression bit is set in the mark - # byte. - if header.is_compressed? - begin - buf = Zlib.inflate(buf) - rescue Zlib::BufError, Zlib::DataError - PEROBS.log.error "Corrupted compressed block with ID " + - "#{header.id} found." + # Uncompress the data if the compression bit is set in the mark + # byte. + if header.is_compressed? + begin + buf = Zlib.inflate(buf) + rescue Zlib::BufError, Zlib::DataError + PEROBS.log.error "Corrupted compressed block with ID " + + "#{header.id} found." + discard_damaged_blob(header) if repair + errors += 1 + next + end + end + + if header.crc && checksum(buf) != header.crc + PEROBS.log.error "Checksum failure while checking blob " + + "with ID #{header.id}" discard_damaged_blob(header) if repair errors += 1 next end + rescue IOError => e + PEROBS.log.fatal "Check of blob with ID #{header.id} failed: " + + e.message end - if header.crc && checksum(buf) != header.crc - PEROBS.log.error "Checksum failure while checking blob " + - "with ID #{header.id}" - discard_damaged_blob(header) if repair + # Check if the ID has already been found in the file. + if (previous_address = new_index.get(header.id)) + PEROBS.log.error "Multiple blobs for ID #{header.id} found. " + + "Addresses: #{previous_address}, #{header.addr}" errors += 1 - next - end - rescue IOError => e - PEROBS.log.fatal "Check of blob with ID #{header.id} failed: " + - e.message - end - - # Check if the ID has already been found in the file. - if (previous_address = new_index.get(header.id)) - PEROBS.log.error "Multiple blobs for ID #{header.id} found. " + - "Addresses: #{previous_address}, #{pos}" - previous_header = FlatFileBlobHeader.read_at(@f, previous_address, - header.id) - if repair - # We have two blobs with the same ID and we must discard one of - # them. - if header.is_outdated? - discard_damaged_blob(header) - elsif previous_header.is_outdated? - discard_damaged_blob(previous_header) - else - PEROBS.log.error "None of the blobs with same ID have " + - "the outdated flag set. Deleting the smaller one." - discard_damaged_blob(header.length < previous_header.length ? - header : previous_header) + previous_header = FlatFileBlobHeader.read(@f, previous_address, + header.id) + if repair + # We have two blobs with the same ID and we must discard one of + # them. + if header.is_outdated? + discard_damaged_blob(header) + elsif previous_header.is_outdated? + discard_damaged_blob(previous_header) + else + PEROBS.log.error "None of the blobs with same ID have " + + "the outdated flag set. Deleting the smaller one." + errors += 1 + discard_damaged_blob(header.length < previous_header.length ? + header : previous_header) + end + next end - next + else + # ID is unique so far. Add it to the shadow index. + new_index.insert(header.id, header.addr) end - else - # ID is unique so far. Add it to the shadow index. - new_index.insert(header.id, pos) + end + pm.update(header.addr) end + + errors += corrupted_blobs end + # We no longer need the new index. new_index.close new_index.erase - # Now we check the index data. It must be correct and the entries must - # match the blob file. All entries in the index must be in the blob file - # and vise versa. - begin - index_ok = @index.check do |id, address| - has_id_at?(id, address) - end - unless index_ok && @space_list.check(self) && cross_check_entries + if repair && corrupted_blobs > 0 + erase_index_files + defragmentize + regenerate_index_and_spaces + else + # Now we check the index data. It must be correct and the entries must + # match the blob file. All entries in the index must be in the blob file + # and vise versa. + begin + index_ok = @index.check do |id, address| + has_id_at?(id, address) + end + x_check_errs = 0 + space_check_ok = true + unless index_ok && (space_check_ok = @space_list.check(self)) && + (x_check_errs = cross_check_entries) == 0 + errors += 1 unless index_ok && space_check_ok + errors += x_check_errs + regenerate_index_and_spaces if repair + end + rescue PEROBS::FatalError + errors += 1 regenerate_index_and_spaces if repair end - rescue PEROBS::FatalError - errors += 1 - regenerate_index_and_spaces if repair end sync if repair PEROBS.log.info "check_db completed in #{Time.now - t} seconds. " + "#{errors} errors found." @@ -533,71 +577,128 @@ # This method clears the index tree and the free space list and # regenerates them from the FlatFile. def regenerate_index_and_spaces PEROBS.log.warn "Re-generating FlatFileDB index and space files" + @index.open unless @index.is_open? @index.clear + @space_list.open unless @space_list.is_open? @space_list.clear - each_blob_header do |pos, header| - if header.is_valid? - if (duplicate_pos = @index.get(header.id)) - PEROBS.log.error "FlatFile contains multiple blobs for ID " + - "#{header.id}. First blob is at address #{duplicate_pos}. " + - "Other blob found at address #{pos}." - @space_list.add_space(pos, header.length) if header.length > 0 - discard_damaged_blob(header) + @progressmeter.start('Re-generating database index', @f.size) do |pm| + each_blob_header do |header| + if header.is_valid? + if (duplicate_pos = @index.get(header.id)) + PEROBS.log.error "FlatFile contains multiple blobs for ID " + + "#{header.id}. First blob is at address #{duplicate_pos}. " + + "Other blob found at address #{header.addr}." + if header.length > 0 + @space_list.add_space(header.addr, header.length) + end + discard_damaged_blob(header) + else + @index.insert(header.id, header.addr) + end else - @index.insert(header.id, pos) + if header.length > 0 + @space_list.add_space(header.addr, header.length) + end end - else - @space_list.add_space(pos, header.length) if header.length > 0 + + pm.update(header.addr) end end sync end def has_space?(address, size) - header = FlatFileBlobHeader.read_at(@f, address) + header = FlatFileBlobHeader.read(@f, address) !header.is_valid? && header.length == size end def has_id_at?(id, address) - header = FlatFileBlobHeader.read_at(@f, address) + header = FlatFileBlobHeader.read(@f, address) header.is_valid? && header.id == id end def inspect s = '[' - each_blob_header do |pos, header| - s << "{ :pos => #{pos}, :flags => #{header.flags}, " + + each_blob_header do |header| + s << "{ :pos => #{header.addr}, :flags => #{header.flags}, " + ":length => #{header.length}, :id => #{header.id}, " + ":crc => #{header.crc}" if header.is_valid? s << ", :value => #{@f.read(header.length)}" end s << " }\n" end s + ']' end + def FlatFile::insert_header_checksums(db_dir) + old_file_name = File.join(db_dir, 'database.blobs') + new_file_name = File.join(db_dir, 'database_v4.blobs') + bak_file_name = File.join(db_dir, 'database_v3.blobs') + + old_file = File.open(old_file_name, 'rb') + new_file = File.open(new_file_name, 'wb') + + entries = 0 + while (buf = old_file.read(21)) + flags, length, id, crc = *buf.unpack('CQQL') + blob_data = old_file.read(length) + + # Some basic sanity checking to ensure all reserved bits are 0. Older + # versions of PEROBS used to set bit 1 despite it being reserved now. + unless flags & 0xF0 == 0 + PEROBS.log.fatal "Blob file #{old_file_name} contains illegal " + + "flag byte #{'%02x' % flags} at #{old_file.pos - 21}" + end + + # Check if the blob is valid and current. + if flags & 0x1 == 1 && flags & 0x8 == 0 + # Make sure the bit 1 is not set anymore. + flags = flags & 0x05 + header_str = [ flags, length, id, crc ].pack('CQQL') + header_crc = Zlib.crc32(header_str, 0) + header_str += [ header_crc ].pack('L') + + new_file.write(header_str + blob_data) + entries += 1 + end + end + PEROBS.log.info "Header checksum added to #{entries} entries" + + old_file.close + new_file.close + + File.rename(old_file_name, bak_file_name) + File.rename(new_file_name, old_file_name) + end + private def each_blob_header(&block) - pos = 0 + corrupted_blobs = 0 + begin @f.seek(0) while (header = FlatFileBlobHeader.read(@f)) - yield(pos, header) + if header.corruption_start + corrupted_blobs += 1 + end - pos += FlatFileBlobHeader::LENGTH + header.length - @f.seek(pos) + yield(header) + + @f.seek(header.addr + FlatFileBlobHeader::LENGTH + header.length) end rescue IOError => e PEROBS.log.fatal "Cannot read blob in flat file DB: #{e.message}" end + + corrupted_blobs end def find_free_blob(bytes) address, size = @space_list.get_space(bytes) unless address @@ -623,35 +724,94 @@ end def cross_check_entries errors = 0 - each_blob_header do |pos, header| - if !header.is_valid? - if header.length > 0 - unless @space_list.has_space?(pos, header.length) - PEROBS.log.error "FlatFile has free space " + - "(addr: #{pos}, len: #{header.length}) that is not in " + - "FreeSpaceManager" - errors += 1 + @progressmeter.start('Cross checking blobs and index', @f.size) do |pm| + each_blob_header do |header| + if !header.is_valid? + if header.length > 0 + unless @space_list.has_space?(header.addr, header.length) + PEROBS.log.error "FlatFile has free space " + + "(addr: #{header.addr}, len: #{header.length}) that is " + + "not in FreeSpaceManager" + errors += 1 + end end + else + if (index_address = @index.get(header.id)).nil? + PEROBS.log.error "FlatFile blob at address #{header.addr} " + + "is not listed in the index" + errors +=1 + elsif index_address != header.addr + PEROBS.log.error "FlatFile blob at address #{header.addr} " + + "is listed in index with address #{index_address}" + errors += 1 + end end - else - unless @index.get(header.id) == pos - PEROBS.log.error "FlatFile blob at address #{pos} is listed " + - "in index with address #{@index.get(header.id)}" - errors += 1 - end + + pm.update(header.addr) end end - errors == 0 + errors end def discard_damaged_blob(header) PEROBS.log.error "Discarding corrupted data blob for ID #{header.id} " + "at offset #{header.addr}" header.clear_flags + end + + def open_index_files(abort_on_missing_files = false) + begin + @index.open(abort_on_missing_files) + @space_list.open + rescue FatalError + # Ensure that the index is really closed. + @index.close + # Erase it completely + @index.erase + # Then create it again. + @index.open + + # Ensure that the spaces list is really closed. + @space_list.close + # Erase it completely + @space_list.erase + # Then create it again + @space_list.open + + regenerate_index_and_spaces + end + end + + def erase_index_files + # Ensure that the index is really closed. + @index.close + # Erase it completely + @index.erase + + # Ensure that the spaces list is really closed. + @space_list.close + # Erase it completely + @space_list.erase + end + + def clear_index_files + # Ensure that the index is really closed. + @index.close + # Erase it completely + @index.erase + # Then create it again. + @index.open + + # Ensure that the spaces list is really closed. + @space_list.close + # Erase it completely + @space_list.erase + # Then create it again + @space_list.open end end end