lib/perobs/FlatFile.rb in perobs-2.5.0 vs lib/perobs/FlatFile.rb in perobs-3.0.0
- old
+ new
@@ -27,68 +27,97 @@
require 'zlib'
require 'perobs/Log'
require 'perobs/FlatFileBlobHeader'
-require 'perobs/IndexTree'
-require 'perobs/FreeSpaceManager'
+require 'perobs/BTree'
+require 'perobs/SpaceTree'
module PEROBS
# The FlatFile class manages the storage file of the FlatFileDB. It contains
# a sequence of blobs Each blob consists of header and the actual
# blob data bytes.
class FlatFile
+ # The number of entries in a single BTree node of the index file.
+ INDEX_BTREE_ORDER = 65
+
# Create a new FlatFile object for a database in the given path.
# @param dir [String] Directory path for the data base file
def initialize(dir)
@db_dir = dir
@f = nil
- @index = IndexTree.new(dir)
- @space_list = FreeSpaceManager.new(dir)
+ @index = BTree.new(@db_dir, 'index', INDEX_BTREE_ORDER)
+ @space_list = SpaceTree.new(@db_dir)
end
# Open the flat file for reading and writing.
def open
file_name = File.join(@db_dir, 'database.blobs')
+ new_db_created = false
begin
if File.exist?(file_name)
@f = File.open(file_name, 'rb+')
else
- PEROBS.log.info 'New database.blobs file created'
+ PEROBS.log.info "New FlatFile database '#{file_name}' created"
@f = File.open(file_name, 'wb+')
+ new_db_created = true
end
rescue IOError => e
- PEROBS.log.fatal "Cannot open flat file database #{file_name}: " +
+ PEROBS.log.fatal "Cannot open FlatFile database #{file_name}: " +
e.message
end
unless @f.flock(File::LOCK_NB | File::LOCK_EX)
- PEROBS.log.fatal 'Database is locked by another process'
+ PEROBS.log.fatal "FlatFile database '#{file_name}' is locked by " +
+ "another process"
end
- @index.open
- @space_list.open
+
+ begin
+ @index.open(!new_db_created)
+ @space_list.open
+ rescue FatalError
+ # Ensure that the index is really closed.
+ @index.close
+ # Erase it completely
+ @index.erase
+ # Then create it again.
+ @index.open
+
+ # Ensure that the spaces list is really closed.
+ @space_list.close
+ # Erase it completely
+ @space_list.erase
+ # Then create it again
+ @space_list.open
+
+ regenerate_index_and_spaces
+ end
end
# Close the flat file. This method must be called to ensure that all data
# is really written into the filesystem.
def close
@space_list.close
@index.close
- @f.flush
- @f.flock(File::LOCK_UN)
- @f.close
- @f = nil
+
+ if @f
+ @f.flush
+ @f.flock(File::LOCK_UN)
+ @f.close
+ @f = nil
+ end
end
# Force outstanding data to be written to the filesystem.
def sync
begin
@f.flush
rescue IOError => e
PEROBS.log.fatal "Cannot sync flat file database: #{e.message}"
end
+ @index.sync
end
# Delete the blob for the specified ID.
# @param id [Integer] ID of the object to be deleted
# @return [Boolean] True if object was deleted, false otherwise
@@ -103,20 +132,14 @@
# Delete the blob that is stored at the specified address.
# @param addr [Integer] Address of the blob to delete
# @param id [Integer] ID of the blob to delete
def delete_obj_by_address(addr, id)
- @index.delete_value(id)
+ @index.remove(id)
header = FlatFileBlobHeader.read_at(@f, addr, id)
- begin
- @f.seek(addr)
- @f.write([ 0 ].pack('C'))
- @f.flush
- @space_list.add_space(addr, header.length)
- rescue IOError => e
- PEROBS.log.fatal "Cannot erase blob for ID #{header.id}: #{e.message}"
- end
+ header.clear_flags
+ @space_list.add_space(addr, header.length)
end
# Delete all unmarked objects.
def delete_unmarked_objects
PEROBS.log.info "Deleting unmarked objects..."
@@ -134,16 +157,27 @@
PEROBS.log.info "#{deleted_ids.length} unmarked objects deleted " +
"in #{Time.now - t} seconds"
deleted_ids
end
- # Write the given object into the file. This method assumes that no other
- # entry with the given ID exists already in the file.
+ # Write the given object into the file. This method never uses in-place
+ # updates for existing objects. A new copy is inserted first and only when
+ # the insert was successful, the old copy is deleted and the index
+ # updated.
# @param id [Integer] ID of the object
# @param raw_obj [String] Raw object as String
# @return [Integer] position of the written blob in the blob file
def write_obj_by_id(id, raw_obj)
+ # Check if we have already an object with the given ID. We'll mark it as
+ # outdated and save the header for later deletion. In case this
+ # operation is aborted or interrupted we ensure that we either have the
+ # old or the new version available.
+ if (old_addr = find_obj_addr_by_id(id))
+ old_header = FlatFileBlobHeader.read_at(@f, old_addr)
+ old_header.set_outdated_flag
+ end
+
crc = checksum(raw_obj)
# If the raw_obj is larger then 256 characters we will compress it to
# safe some space in the database file. For smaller strings the
# performance impact of compression is not compensated by writing
@@ -166,16 +200,16 @@
if raw_obj.length > header.length
PEROBS.log.fatal "Object (#{raw_obj.length}) is longer than " +
"blob space (#{header.length})."
end
if header.is_valid?
- PEROBS.log.fatal "Entry (mark: #{header.mark}) is already used."
+ PEROBS.log.fatal "Entry (flags: #{header.flags}) is already used."
end
end
- @f.seek(addr)
- FlatFileBlobHeader.new(compressed ? (1 << 2) | 1 : 1, raw_obj.length,
- id, crc).write(@f)
+ flags = 1 << FlatFileBlobHeader::VALID_FLAG_BIT
+ flags |= (1 << FlatFileBlobHeader::COMPRESSED_FLAG_BIT) if compressed
+ FlatFileBlobHeader.new(@f, addr, flags, raw_obj.length, id, crc).write
@f.write(raw_obj)
if length != -1 && raw_obj.length < length
# The new object was not appended and it did not completely fill the
# free space. So we have to write a new header to mark the remaining
# empty space.
@@ -184,16 +218,24 @@
"header (space: #{length} bytes, object: #{raw_obj.length} " +
"bytes)."
end
space_address = @f.pos
space_length = length - FlatFileBlobHeader::LENGTH - raw_obj.length
- FlatFileBlobHeader.new(0, space_length, 0, 0).write(@f)
+ FlatFileBlobHeader.new(@f, space_address, 0, space_length,
+ 0, 0).write
# Register the new space with the space list.
@space_list.add_space(space_address, space_length) if space_length > 0
end
- @f.flush
- @index.put_value(id, addr)
+ if old_addr
+ # If we had an existing object stored for the ID we have to mark
+ # this entry as deleted now.
+ old_header.clear_flags
+ else
+ @f.flush
+ end
+ # Once the blob has been written we can update the index as well.
+ @index.insert(id, addr)
rescue IOError => e
PEROBS.log.fatal "Cannot write blob for ID #{id} to FlatFileDB: " +
e.message
end
@@ -202,11 +244,11 @@
# Find the address of the object with the given ID.
# @param id [Integer] ID of the object
# @return [Integer] Offset in the flat file or nil if not found
def find_obj_addr_by_id(id)
- @index.get_value(id)
+ @index.get(id)
end
# Read the object with the given ID.
# @param id [Integer] ID of the object
# @return [String or nil] Raw object data if found, otherwise nil
@@ -236,13 +278,18 @@
buf = @f.read(header.length)
rescue IOError => e
PEROBS.log.fatal "Cannot read blob for ID #{id}: #{e.message}"
end
- # Uncompress the data if the compression bit is set in the mark byte.
+ # Uncompress the data if the compression bit is set in the flags byte.
if header.is_compressed?
- buf = Zlib.inflate(buf)
+ begin
+ buf = Zlib.inflate(buf)
+ rescue Zlib::BufError, Zlib::DataError
+ PEROBS.log.fatal "Corrupted compressed block with ID " +
+ "#{header.id} found."
+ end
end
if checksum(buf) != header.crc
PEROBS.log.fatal "Checksum failure while reading blob ID #{id}"
end
@@ -260,19 +307,11 @@
# Mark the object at the specified address.
# @param addr [Integer] Offset in the file
# @param id [Integer] ID of the object
def mark_obj_by_address(addr, id)
- header = FlatFileBlobHeader.read_at(@f, addr, id)
- begin
- @f.seek(addr)
- @f.write([ header.mark | (1 << 1) ].pack('C'))
- @f.flush
- rescue IOError => e
- PEROBS.log.fatal "Marking of FlatFile blob with ID #{id} " +
- "failed: #{e.message}"
- end
+ FlatFileBlobHeader.read_at(@f, addr, id).set_mark_flag
end
# Return true if the object with the given ID is marked, false otherwise.
# @param id [Integer] ID of the object
def is_marked_by_id?(id)
@@ -295,28 +334,22 @@
each_blob_header do |pos, header|
total_blob_count += 1
if header.is_valid? && header.is_marked?
# Clear all valid and marked blocks.
marked_blob_count += 1
- begin
- @f.seek(pos)
- @f.write([ header.mark & 0b11111101 ].pack('C'))
- @f.flush
- rescue IOError => e
- PEROBS.log.fatal "Unmarking of FlatFile blob with ID #{blob_id} " +
- "failed: #{e.message}"
- end
+ header.clear_mark_flag
end
end
PEROBS.log.info "#{marked_blob_count} marks in #{total_blob_count} " +
"objects cleared in #{Time.now - t} seconds"
end
# Eliminate all the holes in the file. This is an in-place
# implementation. No additional space will be needed on the file system.
def defragmentize
distance = 0
+ new_file_size = 0
deleted_blobs = 0
valid_blobs = 0
t = Time.now
PEROBS.log.info "Defragmenting FlatFile"
# Iterate over all entries.
@@ -333,21 +366,23 @@
buf = @f.read(entry_bytes)
# Write the buffer right after the end of the previous entry.
@f.seek(pos - distance)
@f.write(buf)
# Update the index with the new position
- @index.put_value(header.id, pos - distance)
+ @index.insert(header.id, pos - distance)
# Mark the space between the relocated current entry and the
# next valid entry as deleted space.
- FlatFileBlobHeader.new(0, distance - FlatFileBlobHeader::LENGTH,
- 0, 0).write(@f)
+ FlatFileBlobHeader.new(@f, @f.pos, 0,
+ distance - FlatFileBlobHeader::LENGTH,
+ 0, 0).write
@f.flush
rescue IOError => e
PEROBS.log.fatal "Error while moving blob for ID #{header.id}: " +
e.message
end
end
+ new_file_size = pos + FlatFileBlobHeader::LENGTH + header.length
else
deleted_blobs += 1
distance += entry_bytes
end
end
@@ -355,11 +390,11 @@
PEROBS.log.info "#{distance / 1000} KiB/#{deleted_blobs} blobs of " +
"#{@f.size / 1000} KiB/#{valid_blobs} blobs or " +
"#{'%.1f' % (distance.to_f / @f.size * 100.0)}% reclaimed"
@f.flush
- @f.truncate(@f.size - distance)
+ @f.truncate(new_file_size)
@f.flush
@space_list.clear
sync
end
@@ -391,60 +426,120 @@
# Reclaim the space saved by compressing entries.
defragmentize
end
+ # Check (and repair) the FlatFile.
+ # @param repair [Boolean] True if errors should be fixed.
+ # @return [Integer] Number of errors found
def check(repair = false)
- return unless @f
+ errors = 0
+ return errors unless @f
t = Time.now
PEROBS.log.info "Checking FlatFile database" +
"#{repair ? ' in repair mode' : ''}..."
# First check the database blob file. Each entry should be readable and
- # correct.
+ # correct and all IDs must be unique. We use a shadow index to keep
+ # track of the already found IDs.
+ new_index = BTree.new(@db_dir, 'new-index', INDEX_BTREE_ORDER)
+ new_index.erase
+ new_index.open
+
each_blob_header do |pos, header|
if header.is_valid?
# We have a non-deleted entry.
begin
@f.seek(pos + FlatFileBlobHeader::LENGTH)
buf = @f.read(header.length)
+ if buf.length != header.length
+ PEROBS.log.error "Premature end of file in blob with ID " +
+ "#{header.id}."
+ discard_damaged_blob(header) if repair
+ errors += 1
+ next
+ end
+
# Uncompress the data if the compression bit is set in the mark
# byte.
- buf = Zlib.inflate(buf) if header.is_compressed?
+ if header.is_compressed?
+ begin
+ buf = Zlib.inflate(buf)
+ rescue Zlib::BufError, Zlib::DataError
+ PEROBS.log.error "Corrupted compressed block with ID " +
+ "#{header.id} found."
+ discard_damaged_blob(header) if repair
+ errors += 1
+ next
+ end
+ end
if header.crc && checksum(buf) != header.crc
- if repair
- PEROBS.log.error "Checksum failure while checking blob " +
- "with ID #{header.id}. Deleting object."
- delete_obj_by_address(pos, header.id)
- else
- PEROBS.log.fatal "Checksum failure while checking blob " +
- "with ID #{header.id}"
- end
+ PEROBS.log.error "Checksum failure while checking blob " +
+ "with ID #{header.id}"
+ discard_damaged_blob(header) if repair
+ errors += 1
+ next
end
rescue IOError => e
PEROBS.log.fatal "Check of blob with ID #{header.id} failed: " +
e.message
end
+
+ # Check if the ID has already been found in the file.
+ if (previous_address = new_index.get(header.id))
+ PEROBS.log.error "Multiple blobs for ID #{header.id} found. " +
+ "Addresses: #{previous_address}, #{pos}"
+ previous_header = FlatFileBlobHeader.read_at(@f, previous_address,
+ header.id)
+ if repair
+ # We have two blobs with the same ID and we must discard one of
+ # them.
+ if header.is_outdated?
+ discard_damaged_blob(header)
+ elsif previous_header.is_outdated?
+ discard_damaged_blob(previous_header)
+ else
+ PEROBS.log.error "None of the blobs with same ID have " +
+ "the outdated flag set. Deleting the smaller one."
+ discard_damaged_blob(header.length < previous_header.length ?
+ header : previous_header)
+ end
+ next
+ end
+ else
+ # ID is unique so far. Add it to the shadow index.
+ new_index.insert(header.id, pos)
+ end
+
end
end
+ # We no longer need the new index.
+ new_index.close
+ new_index.erase
# Now we check the index data. It must be correct and the entries must
# match the blob file. All entries in the index must be in the blob file
# and vise versa.
begin
- unless @index.check(self) && @space_list.check(self) &&
- cross_check_entries
+ index_ok = @index.check do |id, address|
+ has_id_at?(id, address)
+ end
+ unless index_ok && @space_list.check(self) && cross_check_entries
regenerate_index_and_spaces if repair
end
rescue PEROBS::FatalError
+ errors += 1
regenerate_index_and_spaces if repair
end
sync if repair
- PEROBS.log.info "check_db completed in #{Time.now - t} seconds"
+ PEROBS.log.info "check_db completed in #{Time.now - t} seconds. " +
+ "#{errors} errors found."
+
+ errors
end
# This method clears the index tree and the free space list and
# regenerates them from the FlatFile.
def regenerate_index_and_spaces
@@ -452,11 +547,11 @@
@index.clear
@space_list.clear
each_blob_header do |pos, header|
if header.is_valid?
- @index.put_value(header.id, pos)
+ @index.insert(header.id, pos)
else
@space_list.add_space(pos, header.length) if header.length > 0
end
end
end
@@ -472,23 +567,21 @@
end
def inspect
s = '['
each_blob_header do |pos, header|
- s << "{ :pos => #{pos}, :mark => #{header.mark}, " +
+ s << "{ :pos => #{pos}, :flags => #{header.flags}, " +
":length => #{header.length}, :id => #{header.id}, " +
":crc => #{header.crc}"
if header.is_valid?
s << ", :value => #{@f.read(header.length)}"
end
s << " }\n"
end
s + ']'
end
-
-
private
def each_blob_header(&block)
pos = 0
begin
@@ -527,29 +620,37 @@
def checksum(raw_obj)
Zlib.crc32(raw_obj, 0)
end
def cross_check_entries
+ errors = 0
+
each_blob_header do |pos, header|
if !header.is_valid?
if header.length > 0
unless @space_list.has_space?(pos, header.length)
PEROBS.log.error "FlatFile has free space " +
"(addr: #{pos}, len: #{header.length}) that is not in " +
"FreeSpaceManager"
- return false
+ errors += 1
end
end
else
- unless @index.get_value(header.id) == pos
+ unless @index.get(header.id) == pos
PEROBS.log.error "FlatFile blob at address #{pos} is listed " +
- "in index with address #{@index.get_value(header.id)}"
- return false
+ "in index with address #{@index.get(header.id)}"
+ errors += 1
end
end
end
- true
+ errors == 0
+ end
+
+ def discard_damaged_blob(header)
+ PEROBS.log.error "Discarding corrupted data blob for ID #{header.id} " +
+ "at offset #{header.addr}"
+ header.clear_flags
end
end
end