lib/perobs/FlatFile.rb in perobs-4.1.0 vs lib/perobs/FlatFile.rb in perobs-4.2.0
- old
+ new
@@ -1,10 +1,10 @@
# encoding: UTF-8
#
# = FlatFile.rb -- Persistent Ruby Object Store
#
-# Copyright (c) 2016, 2018 by Chris Schlaeger <chris@taskjuggler.org>
+# Copyright (c) 2016, 2018, 2019 by Chris Schlaeger <chris@taskjuggler.org>
#
# MIT License
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
@@ -29,10 +29,11 @@
require 'perobs/Log'
require 'perobs/FlatFileBlobHeader'
require 'perobs/BTree'
require 'perobs/SpaceTree'
+require 'perobs/SpaceManager'
require 'perobs/IDList'
module PEROBS
# The FlatFile class manages the storage file of the FlatFileDB. It contains
@@ -49,11 +50,18 @@
@db_dir = dir
@progressmeter = progressmeter
@f = nil
@marks = nil
@index = BTree.new(@db_dir, 'index', INDEX_BTREE_ORDER, @progressmeter)
- @space_list = SpaceTree.new(@db_dir, @progressmeter)
+ old_spaces_file = File.join(@db_dir, 'database_spaces.blobs')
+ if File.exist?(old_spaces_file)
+ # PEROBS version 4.1.0 and earlier used this space list format. It is
+ # deprecated now. Newly created DBs use the SpaceManager format.
+ @space_list = SpaceTree.new(@db_dir, @progressmeter)
+ else
+ @space_list = SpaceManager.new(@db_dir, @progressmeter)
+ end
end
# Open the flat file for reading and writing.
def open
file_name = File.join(@db_dir, 'database.blobs')
@@ -132,21 +140,22 @@
header.clear_flags
@space_list.add_space(addr, header.length) if @space_list.is_open?
end
# Delete all unmarked objects.
- def delete_unmarked_objects
+ def delete_unmarked_objects(&block)
# We don't update the index and the space list during this operation as
# we defragmentize the blob file at the end. We'll end the operation
# with an empty space list.
clear_index_files
deleted_objects_count = 0
@progressmeter.start('Sweeping unmarked objects', @f.size) do |pm|
each_blob_header do |header|
if header.is_valid? && !@marks.include?(header.id)
delete_obj_by_address(header.addr, header.id)
+ yield(header.id) if block_given?
deleted_objects_count += 1
end
pm.update(header.addr)
end
@@ -181,48 +190,50 @@
# If the raw_obj is larger then 256 characters we will compress it to
# safe some space in the database file. For smaller strings the
# performance impact of compression is not compensated by writing
# less data to the storage.
compressed = false
- if raw_obj.bytesize > 256
+ raw_obj_bytesize = raw_obj.bytesize
+ if raw_obj_bytesize > 256
raw_obj = Zlib.deflate(raw_obj)
+ raw_obj_bytesize = raw_obj.bytesize
compressed = true
end
- addr, length = find_free_blob(raw_obj.bytesize)
+ addr, length = find_free_blob(raw_obj_bytesize)
begin
if length != -1
# Just a safeguard so we don't overwrite current data.
header = FlatFileBlobHeader.read(@f, addr)
if header.length != length
PEROBS.log.fatal "Length in free list (#{length}) and header " +
"(#{header.length}) for address #{addr} don't match."
end
- if raw_obj.bytesize > header.length
- PEROBS.log.fatal "Object (#{raw_obj.bytesize}) is longer than " +
+ if raw_obj_bytesize > header.length
+ PEROBS.log.fatal "Object (#{raw_obj_bytesize}) is longer than " +
"blob space (#{header.length})."
end
if header.is_valid?
PEROBS.log.fatal "Entry at address #{addr} with flags: " +
"#{header.flags} is already used for ID #{header.id}."
end
end
flags = 1 << FlatFileBlobHeader::VALID_FLAG_BIT
flags |= (1 << FlatFileBlobHeader::COMPRESSED_FLAG_BIT) if compressed
- FlatFileBlobHeader.new(@f, addr, flags, raw_obj.bytesize, id, crc).write
+ FlatFileBlobHeader.new(@f, addr, flags, raw_obj_bytesize, id, crc).write
@f.write(raw_obj)
- if length != -1 && raw_obj.bytesize < length
+ if length != -1 && raw_obj_bytesize < length
# The new object was not appended and it did not completely fill the
# free space. So we have to write a new header to mark the remaining
# empty space.
- unless length - raw_obj.bytesize >= FlatFileBlobHeader::LENGTH
+ unless length - raw_obj_bytesize >= FlatFileBlobHeader::LENGTH
PEROBS.log.fatal "Not enough space to append the empty space " +
- "header (space: #{length} bytes, object: #{raw_obj.bytesize} " +
+ "header (space: #{length} bytes, object: #{raw_obj_bytesize} " +
"bytes)."
end
space_address = @f.pos
- space_length = length - FlatFileBlobHeader::LENGTH - raw_obj.bytesize
+ space_length = length - FlatFileBlobHeader::LENGTH - raw_obj_bytesize
FlatFileBlobHeader.new(@f, space_address, 0, space_length,
0, 0).write
# Register the new space with the space list.
if @space_list.is_open? && space_length > 0
@space_list.add_space(space_address, space_length)
@@ -459,10 +470,11 @@
@progressmeter)
new_index.erase
new_index.open
corrupted_blobs = 0
+ end_of_last_healthy_blob = nil
@progressmeter.start('Checking blobs file', @f.size) do |pm|
corrupted_blobs = each_blob_header do |header|
if header.is_valid?
# We have a non-deleted entry.
begin
@@ -527,16 +539,30 @@
end
else
# ID is unique so far. Add it to the shadow index.
new_index.insert(header.id, header.addr)
end
-
end
+ end_of_last_healthy_blob = header.addr +
+ FlatFileBlobHeader::LENGTH + header.length
pm.update(header.addr)
end
+ if end_of_last_healthy_blob && end_of_last_healthy_blob != @f.size
+ # The blob file ends with a corrupted blob header.
+ PEROBS.log.error "#{@f.size - end_of_last_healthy_blob} corrupted " +
+ 'bytes found at the end of FlatFile.'
+ corrupted_blobs += 1
+ if repair
+ PEROBS.log.error "Truncating FlatFile to " +
+ "#{end_of_last_healthy_blob} bytes by discarding " +
+ "#{@f.size - end_of_last_healthy_blob} bytes"
+ @f.truncate(end_of_last_healthy_blob)
+ end
+ end
+
errors += corrupted_blobs
end
# We no longer need the new index.
new_index.close
@@ -544,11 +570,11 @@
if repair && corrupted_blobs > 0
erase_index_files
defragmentize
regenerate_index_and_spaces
- else
+ elsif corrupted_blobs == 0
# Now we check the index data. It must be correct and the entries must
# match the blob file. All entries in the index must be in the blob file
# and vise versa.
begin
index_ok = @index.check do |id, address|
@@ -573,10 +599,145 @@
"#{errors} errors found."
errors
end
+ # Repair the FlatFile. In contrast to the repair functionality in the
+ # check() method this method is much faster. It simply re-creates the
+ # index and space list from the blob file.
+ # @param repair [Boolean] True if errors should be fixed.
+ # @return [Integer] Number of errors found
+ def repair
+ errors = 0
+ return errors unless @f
+
+ t = Time.now
+ PEROBS.log.info "Repairing FlatFile database"
+
+ # Erase and re-open the index and space list files. We purposely don't
+ # close the files at it would trigger needless flushing.
+ clear_index_files(true)
+
+ # Now we scan the blob file and re-index all blobs and spaces. Corrupted
+ # blobs will be skipped.
+ corrupted_blobs = 0
+ end_of_last_healthy_blob = nil
+ @progressmeter.start('Re-indexing blobs file', @f.size) do |pm|
+ corrupted_blobs = each_blob_header do |header|
+ if header.corruption_start
+ # The blob is preceeded by a corrupted area. We create a new
+ # header of a deleted blob for this area and write the new blob
+ # over it.
+ if (data_length = header.addr - header.corruption_start -
+ FlatFileBlobHeader::LENGTH) <= 0
+ PEROBS.log.error "Found a corrupted blob that is too small to " +
+ "fit a header (#{data_length}). File must be defragmented."
+ else
+ new_header = FlatFileBlobHeader.new(@f, header.corruption_start,
+ 0, data_length, 0, 0)
+ new_header.write
+ @space_list.add_space(header.corruption_start, data_length)
+ end
+ end
+
+ if header.is_valid?
+ # We have a non-deleted entry.
+ begin
+ @f.seek(header.addr + FlatFileBlobHeader::LENGTH)
+ buf = @f.read(header.length)
+ if buf.bytesize != header.length
+ PEROBS.log.error "Premature end of file in blob with ID " +
+ "#{header.id}."
+ discard_damaged_blob(header)
+ errors += 1
+ next
+ end
+
+ # Uncompress the data if the compression bit is set in the mark
+ # byte.
+ if header.is_compressed?
+ begin
+ buf = Zlib.inflate(buf)
+ rescue Zlib::BufError, Zlib::DataError
+ PEROBS.log.error "Corrupted compressed block with ID " +
+ "#{header.id} found."
+ discard_damaged_blob(header)
+ errors += 1
+ next
+ end
+ end
+
+ if header.crc && checksum(buf) != header.crc
+ PEROBS.log.error "Checksum failure while checking blob " +
+ "with ID #{header.id}"
+ discard_damaged_blob(header)
+ errors += 1
+ next
+ end
+ rescue IOError => e
+ PEROBS.log.fatal "Check of blob with ID #{header.id} failed: " +
+ e.message
+ end
+
+ # Check if the ID has already been found in the file.
+ if (previous_address = @index.get(header.id))
+ PEROBS.log.error "Multiple blobs for ID #{header.id} found. " +
+ "Addresses: #{previous_address}, #{header.addr}"
+ errors += 1
+ previous_header = FlatFileBlobHeader.read(@f, previous_address,
+ header.id)
+ # We have two blobs with the same ID and we must discard one of
+ # them.
+ if header.is_outdated?
+ discard_damaged_blob(header)
+ elsif previous_header.is_outdated?
+ discard_damaged_blob(previous_header)
+ else
+ PEROBS.log.error "None of the blobs with same ID have " +
+ "the outdated flag set. Deleting the smaller one."
+ errors += 1
+ discard_damaged_blob(header.length < previous_header.length ?
+ header : previous_header)
+ end
+ else
+ # ID is unique so far. Add it to the shadow index.
+ @index.insert(header.id, header.addr)
+ end
+
+ else
+ if header.length > 0
+ @space_list.add_space(header.addr, header.length)
+ end
+ end
+ end_of_last_healthy_blob = header.addr +
+ FlatFileBlobHeader::LENGTH + header.length
+
+ pm.update(header.addr)
+ end
+
+ if end_of_last_healthy_blob && end_of_last_healthy_blob != @f.size
+ # The blob file ends with a corrupted blob header.
+ PEROBS.log.error "#{@f.size - end_of_last_healthy_blob} corrupted " +
+ 'bytes found at the end of FlatFile.'
+ corrupted_blobs += 1
+
+ PEROBS.log.error "Truncating FlatFile to " +
+ "#{end_of_last_healthy_blob} bytes by discarding " +
+ "#{@f.size - end_of_last_healthy_blob} bytes"
+ @f.truncate(end_of_last_healthy_blob)
+ end
+
+ errors += corrupted_blobs
+ end
+
+ sync
+ PEROBS.log.info "FlatFile repair completed in #{Time.now - t} seconds. " +
+ "#{errors} errors found."
+
+ errors
+ end
+
# This method clears the index tree and the free space list and
# regenerates them from the FlatFile.
def regenerate_index_and_spaces
PEROBS.log.warn "Re-generating FlatFileDB index and space files"
@index.open unless @index.is_open?
@@ -615,11 +776,15 @@
header = FlatFileBlobHeader.read(@f, address)
!header.is_valid? && header.length == size
end
def has_id_at?(id, address)
- header = FlatFileBlobHeader.read(@f, address)
+ begin
+ header = FlatFileBlobHeader.read(@f, address)
+ rescue PEROBS::FatalError
+ return false
+ end
header.is_valid? && header.id == id
end
def inspect
s = '['
@@ -731,11 +896,11 @@
if !header.is_valid?
if header.length > 0
unless @space_list.has_space?(header.addr, header.length)
PEROBS.log.error "FlatFile has free space " +
"(addr: #{header.addr}, len: #{header.length}) that is " +
- "not in FreeSpaceManager"
+ "not in SpaceManager"
errors += 1
end
end
else
if (index_address = @index.get(header.id)).nil?
@@ -765,52 +930,39 @@
def open_index_files(abort_on_missing_files = false)
begin
@index.open(abort_on_missing_files)
@space_list.open
rescue FatalError
- # Ensure that the index is really closed.
- @index.close
- # Erase it completely
- @index.erase
- # Then create it again.
- @index.open
-
- # Ensure that the spaces list is really closed.
- @space_list.close
- # Erase it completely
- @space_list.erase
- # Then create it again
- @space_list.open
-
+ clear_index_files
regenerate_index_and_spaces
end
end
- def erase_index_files
+ def erase_index_files(dont_close_files = false)
# Ensure that the index is really closed.
- @index.close
+ @index.close unless dont_close_files
# Erase it completely
@index.erase
# Ensure that the spaces list is really closed.
- @space_list.close
+ @space_list.close unless dont_close_files
# Erase it completely
@space_list.erase
+
+ if @space_list.is_a?(SpaceTree)
+ # If we still use the old SpaceTree format, this is the moment to
+ # convert it to the new SpaceManager format.
+ @space_list = SpaceManager.new(@db_dir, @progressmeter)
+ PEROBS.log.warn "Converting space list from SpaceTree format " +
+ "to SpaceManager format"
+ end
end
- def clear_index_files
- # Ensure that the index is really closed.
- @index.close
- # Erase it completely
- @index.erase
- # Then create it again.
- @index.open
+ def clear_index_files(dont_close_files = false)
+ erase_index_files(dont_close_files)
- # Ensure that the spaces list is really closed.
- @space_list.close
- # Erase it completely
- @space_list.erase
- # Then create it again
+ # Then create them again.
+ @index.open
@space_list.open
end
end