lib/perobs/FlatFile.rb in perobs-2.4.2 vs lib/perobs/FlatFile.rb in perobs-2.5.0
- old
+ new
@@ -26,40 +26,21 @@
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
require 'zlib'
require 'perobs/Log'
+require 'perobs/FlatFileBlobHeader'
require 'perobs/IndexTree'
require 'perobs/FreeSpaceManager'
module PEROBS
# The FlatFile class manages the storage file of the FlatFileDB. It contains
- # a sequence of blobs Each blob consists of a 25 byte header and the actual
- # blob data bytes. The header has the following structure:
- #
- # 1 Byte: Mark byte.
- # Bit 0: 0 deleted entry, 1 valid entry
- # Bit 1: 0 unmarked, 1 marked
- # Bit 2 - 7: reserved, must be 0
- # 8 bytes: Length of the data blob in bytes
- # 8 bytes: ID of the value in the data blob
- # 4 bytes: CRC32 checksum of the data blob
- #
- # If the bit 0 of the mark byte is 0, only the length is valid. The blob is
- # empty. Only of bit 0 is set then entry is valid.
+ # a sequence of blobs Each blob consists of header and the actual
+ # blob data bytes.
class FlatFile
- # Utility class to hold all the data that is stored in a blob header.
- class Header < Struct.new(:mark, :length, :id, :crc)
- end
-
- # The 'pack()' format of the header.
- BLOB_HEADER_FORMAT = 'CQQL'
- # The length of the header in bytes.
- BLOB_HEADER_LENGTH = 21
-
# Create a new FlatFile object for a database in the given path.
# @param dir [String] Directory path for the data base file
def initialize(dir)
@db_dir = dir
@f = nil
@@ -123,31 +104,31 @@
# Delete the blob that is stored at the specified address.
# @param addr [Integer] Address of the blob to delete
# @param id [Integer] ID of the blob to delete
def delete_obj_by_address(addr, id)
@index.delete_value(id)
- header = read_blob_header(addr, id)
+ header = FlatFileBlobHeader.read_at(@f, addr, id)
begin
@f.seek(addr)
@f.write([ 0 ].pack('C'))
@f.flush
@space_list.add_space(addr, header.length)
- rescue => e
+ rescue IOError => e
PEROBS.log.fatal "Cannot erase blob for ID #{header.id}: #{e.message}"
end
end
# Delete all unmarked objects.
def delete_unmarked_objects
PEROBS.log.info "Deleting unmarked objects..."
t = Time.now
deleted_ids = []
- each_blob_header do |pos, mark, length, blob_id, crc|
- if (mark & 3 == 1)
- delete_obj_by_address(pos, blob_id)
- deleted_ids << blob_id
+ each_blob_header do |pos, header|
+ if header.is_valid? && !header.is_marked?
+ delete_obj_by_address(pos, header.id)
+ deleted_ids << header.id
end
end
defragmentize
PEROBS.log.info "#{deleted_ids.length} unmarked objects deleted " +
@@ -159,43 +140,55 @@
# entry with the given ID exists already in the file.
# @param id [Integer] ID of the object
# @param raw_obj [String] Raw object as String
# @return [Integer] position of the written blob in the blob file
def write_obj_by_id(id, raw_obj)
+ crc = checksum(raw_obj)
+
+ # If the raw_obj is larger then 256 characters we will compress it to
+ # safe some space in the database file. For smaller strings the
+ # performance impact of compression is not compensated by writing
+ # less data to the storage.
+ compressed = false
+ if raw_obj.length > 256
+ raw_obj = Zlib.deflate(raw_obj)
+ compressed = true
+ end
+
addr, length = find_free_blob(raw_obj.length)
begin
if length != -1
# Just a safeguard so we don't overwrite current data.
- header = read_blob_header(addr)
+ header = FlatFileBlobHeader.read_at(@f, addr)
if header.length != length
PEROBS.log.fatal "Length in free list (#{length}) and header " +
"(#{header.length}) don't match."
end
if raw_obj.length > header.length
PEROBS.log.fatal "Object (#{raw_obj.length}) is longer than " +
"blob space (#{header.length})."
end
- if header.mark != 0
- PEROBS.log.fatal "Mark (#{header.mark}) is not 0."
+ if header.is_valid?
+ PEROBS.log.fatal "Entry (mark: #{header.mark}) is already used."
end
end
@f.seek(addr)
- @f.write([ 1, raw_obj.length, id, checksum(raw_obj)].
- pack(BLOB_HEADER_FORMAT))
+ FlatFileBlobHeader.new(compressed ? (1 << 2) | 1 : 1, raw_obj.length,
+ id, crc).write(@f)
@f.write(raw_obj)
if length != -1 && raw_obj.length < length
# The new object was not appended and it did not completely fill the
# free space. So we have to write a new header to mark the remaining
# empty space.
- unless length - raw_obj.length >= BLOB_HEADER_LENGTH
+ unless length - raw_obj.length >= FlatFileBlobHeader::LENGTH
PEROBS.log.fatal "Not enough space to append the empty space " +
"header (space: #{length} bytes, object: #{raw_obj.length} " +
"bytes)."
end
space_address = @f.pos
- space_length = length - BLOB_HEADER_LENGTH - raw_obj.length
- @f.write([ 0, space_length, 0, 0 ].pack(BLOB_HEADER_FORMAT))
+ space_length = length - FlatFileBlobHeader::LENGTH - raw_obj.length
+ FlatFileBlobHeader.new(0, space_length, 0, 0).write(@f)
# Register the new space with the space list.
@space_list.add_space(space_address, space_length) if space_length > 0
end
@f.flush
@index.put_value(id, addr)
@@ -228,25 +221,35 @@
# Read the object at the specified address.
# @param addr [Integer] Offset in the flat file
# @param id [Integer] ID of the data blob
# @return [String] Raw object data
def read_obj_by_address(addr, id)
- header = read_blob_header(addr, id)
+ header = FlatFileBlobHeader.read_at(@f, addr, id)
if header.id != id
PEROBS.log.fatal "Database index corrupted: Index for object " +
"#{id} points to object with ID #{header.id}"
end
+
+ buf = nil
+
begin
- @f.seek(addr + BLOB_HEADER_LENGTH)
+ @f.seek(addr + FlatFileBlobHeader::LENGTH)
buf = @f.read(header.length)
- if checksum(buf) != header.crc
- PEROBS.log.fatal "Checksum failure while reading blob ID #{id}"
- end
- return buf
- rescue => e
+ rescue IOError => e
PEROBS.log.fatal "Cannot read blob for ID #{id}: #{e.message}"
end
+
+ # Uncompress the data if the compression bit is set in the mark byte.
+ if header.is_compressed?
+ buf = Zlib.inflate(buf)
+ end
+
+ if checksum(buf) != header.crc
+ PEROBS.log.fatal "Checksum failure while reading blob ID #{id}"
+ end
+
+ buf
end
# Mark the object with the given ID.
# @param id [Integer] ID of the object
def mark_obj_by_id(id)
@@ -257,27 +260,27 @@
# Mark the object at the specified address.
# @param addr [Integer] Offset in the file
# @param id [Integer] ID of the object
def mark_obj_by_address(addr, id)
- header = read_blob_header(addr, id)
+ header = FlatFileBlobHeader.read_at(@f, addr, id)
begin
@f.seek(addr)
- @f.write([ header.mark | 2 ].pack('C'))
+ @f.write([ header.mark | (1 << 1) ].pack('C'))
@f.flush
- rescue => e
+ rescue IOError => e
PEROBS.log.fatal "Marking of FlatFile blob with ID #{id} " +
"failed: #{e.message}"
end
end
# Return true if the object with the given ID is marked, false otherwise.
# @param id [Integer] ID of the object
def is_marked_by_id?(id)
if (addr = find_obj_addr_by_id(id))
- header = read_blob_header(addr, id)
- return (header.mark & 2) == 2
+ header = FlatFileBlobHeader.read_at(@f, addr, id)
+ return header.is_marked?
end
false
end
@@ -287,20 +290,20 @@
PEROBS.log.info "Clearing all marks..."
total_blob_count = 0
marked_blob_count = 0
- each_blob_header do |pos, mark, length, blob_id, crc|
+ each_blob_header do |pos, header|
total_blob_count += 1
- if (mark & 3 == 3)
+ if header.is_valid? && header.is_marked?
# Clear all valid and marked blocks.
marked_blob_count += 1
begin
@f.seek(pos)
- @f.write([ mark & 0b11111101 ].pack('C'))
+ @f.write([ header.mark & 0b11111101 ].pack('C'))
@f.flush
- rescue => e
+ rescue IOError => e
PEROBS.log.fatal "Unmarking of FlatFile blob with ID #{blob_id} " +
"failed: #{e.message}"
end
end
end
@@ -315,14 +318,14 @@
deleted_blobs = 0
valid_blobs = 0
t = Time.now
PEROBS.log.info "Defragmenting FlatFile"
# Iterate over all entries.
- each_blob_header do |pos, mark, length, blob_id, crc|
+ each_blob_header do |pos, header|
# Total size of the current entry
- entry_bytes = BLOB_HEADER_LENGTH + length
- if (mark & 1 == 1)
+ entry_bytes = FlatFileBlobHeader::LENGTH + header.length
+ if header.is_valid?
# We have found a valid entry.
valid_blobs += 1
if distance > 0
begin
# Read current entry into a buffer
@@ -330,18 +333,18 @@
buf = @f.read(entry_bytes)
# Write the buffer right after the end of the previous entry.
@f.seek(pos - distance)
@f.write(buf)
# Update the index with the new position
- @index.put_value(blob_id, pos - distance)
+ @index.put_value(header.id, pos - distance)
# Mark the space between the relocated current entry and the
# next valid entry as deleted space.
- @f.write([ 0, distance - BLOB_HEADER_LENGTH, 0, 0 ].
- pack(BLOB_HEADER_FORMAT))
+ FlatFileBlobHeader.new(0, distance - FlatFileBlobHeader::LENGTH,
+ 0, 0).write(@f)
@f.flush
- rescue => e
- PEROBS.log.fatal "Error while moving blob for ID #{blob_id}: " +
+ rescue IOError => e
+ PEROBS.log.fatal "Error while moving blob for ID #{header.id}: " +
e.message
end
end
else
deleted_blobs += 1
@@ -359,37 +362,70 @@
@space_list.clear
sync
end
+ # This method iterates over all entries in the FlatFile and removes the
+ # entry and inserts it again. This is useful to update all entries in
+ # cased the storage format has changed.
+ def refresh
+ # This iteration might look scary as we iterate over the entries while
+ # while we are rearranging them. Re-inserted items may be inserted
+ # before or at the current entry and this is fine. They also may be
+ # inserted after the current entry and will be re-read again unless they
+ # are inserted after the original file end.
+ file_size = @f.size
+ PEROBS.log.info "Refreshing the DB..."
+ t = Time.now
+ each_blob_header do |pos, header|
+ if header.is_valid?
+ buf = read_obj_by_address(pos, header.id)
+ delete_obj_by_address(pos, header.id)
+ write_obj_by_id(header.id, buf)
+ end
+
+ # Some re-inserted blobs may be inserted after the original file end.
+ # No need to process those blobs again.
+ break if pos >= file_size
+ end
+ PEROBS.log.info "DB refresh completed in #{Time.now - t} seconds"
+
+ # Reclaim the space saved by compressing entries.
+ defragmentize
+ end
+
def check(repair = false)
return unless @f
t = Time.now
PEROBS.log.info "Checking FlatFile database" +
"#{repair ? ' in repair mode' : ''}..."
# First check the database blob file. Each entry should be readable and
# correct.
- each_blob_header do |pos, mark, length, blob_id, crc|
- if (mark & 1 == 1)
+ each_blob_header do |pos, header|
+ if header.is_valid?
# We have a non-deleted entry.
begin
- @f.seek(pos + BLOB_HEADER_LENGTH)
- buf = @f.read(length)
- if crc && checksum(buf) != crc
+ @f.seek(pos + FlatFileBlobHeader::LENGTH)
+ buf = @f.read(header.length)
+ # Uncompress the data if the compression bit is set in the mark
+ # byte.
+ buf = Zlib.inflate(buf) if header.is_compressed?
+
+ if header.crc && checksum(buf) != header.crc
if repair
PEROBS.log.error "Checksum failure while checking blob " +
- "with ID #{id}. Deleting object."
- delete_obj_by_address(pos, blob_id)
+ "with ID #{header.id}. Deleting object."
+ delete_obj_by_address(pos, header.id)
else
PEROBS.log.fatal "Checksum failure while checking blob " +
- "with ID #{id}"
+ "with ID #{header.id}"
end
end
- rescue => e
- PEROBS.log.fatal "Check of blob with ID #{blob_id} failed: " +
+ rescue IOError => e
+ PEROBS.log.fatal "Check of blob with ID #{header.id} failed: " +
e.message
end
end
end
@@ -414,126 +450,106 @@
def regenerate_index_and_spaces
PEROBS.log.warn "Re-generating FlatFileDB index and space files"
@index.clear
@space_list.clear
- each_blob_header do |pos, mark, length, id, crc|
- if mark == 0
- @space_list.add_space(pos, length) if length > 0
+ each_blob_header do |pos, header|
+ if header.is_valid?
+ @index.put_value(header.id, pos)
else
- @index.put_value(id, pos)
+ @space_list.add_space(pos, header.length) if header.length > 0
end
end
end
def has_space?(address, size)
- header = read_blob_header(address)
+ header = FlatFileBlobHeader.read_at(@f, address)
header.length == size
end
def has_id_at?(id, address)
- header = read_blob_header(address)
+ header = FlatFileBlobHeader.read_at(@f, address)
header.id == id
end
def inspect
s = '['
- each_blob_header do |pos, mark, length, blob_id, crc|
- s << "{ :pos => #{pos}, :mark => #{mark}, " +
- ":length => #{length}, :id => #{blob_id}, :crc => #{crc}"
- if mark != 0
- s << ", :value => #{@f.read(length)}"
+ each_blob_header do |pos, header|
+ s << "{ :pos => #{pos}, :mark => #{header.mark}, " +
+ ":length => #{header.length}, :id => #{header.id}, " +
+ ":crc => #{header.crc}"
+ if header.is_valid?
+ s << ", :value => #{@f.read(header.length)}"
end
s << " }\n"
end
s + ']'
end
private
- def read_blob_header(addr, id = nil)
- buf = nil
+ def each_blob_header(&block)
+ pos = 0
begin
- @f.seek(addr)
- buf = @f.read(BLOB_HEADER_LENGTH)
- rescue => e
+ @f.seek(0)
+ while (header = FlatFileBlobHeader.read(@f))
+ yield(pos, header)
+
+ pos += FlatFileBlobHeader::LENGTH + header.length
+ @f.seek(pos)
+ end
+ rescue IOError => e
PEROBS.log.fatal "Cannot read blob in flat file DB: #{e.message}"
end
- if buf.nil? || buf.length != BLOB_HEADER_LENGTH
- PEROBS.log.fatal "Cannot read blob header " +
- "#{id ? "for ID #{id} " : ''}at address " +
- "#{addr}"
- end
- header = Header.new(*buf.unpack(BLOB_HEADER_FORMAT))
- if id && header.id != id
- PEROBS.log.fatal "Mismatch between FlatFile index and blob file " +
- "found for entry with ID #{id}/#{header.id}"
- end
-
- return header
end
def find_free_blob(bytes)
address, size = @space_list.get_space(bytes)
unless address
# We have not found any suitable space. Return the end of the file.
return [ @f.size, -1 ]
end
- if size == bytes || size - BLOB_HEADER_LENGTH >= bytes
+ if size == bytes || size - FlatFileBlobHeader::LENGTH >= bytes
return [ address, size ]
end
# Return the found space again. It's too small for the new content plus
# the gap header.
@space_list.add_space(address, size)
# We need a space that is large enough to hold the bytes and the gap
# header.
- @space_list.get_space(bytes + BLOB_HEADER_LENGTH) || [ @f.size, -1 ]
+ @space_list.get_space(bytes + FlatFileBlobHeader::LENGTH) ||
+ [ @f.size, -1 ]
end
def checksum(raw_obj)
Zlib.crc32(raw_obj, 0)
end
def cross_check_entries
- each_blob_header do |pos, mark, length, blob_id, crc|
- if mark == 0
- if length > 0
- unless @space_list.has_space?(pos, length)
+ each_blob_header do |pos, header|
+ if !header.is_valid?
+ if header.length > 0
+ unless @space_list.has_space?(pos, header.length)
PEROBS.log.error "FlatFile has free space " +
- "(addr: #{pos}, len: #{length}) that is not in FreeSpaceManager"
+ "(addr: #{pos}, len: #{header.length}) that is not in " +
+ "FreeSpaceManager"
return false
end
end
else
- unless @index.get_value(blob_id) == pos
+ unless @index.get_value(header.id) == pos
PEROBS.log.error "FlatFile blob at address #{pos} is listed " +
- "in index with address #{@index.get_value(blob_id)}"
+ "in index with address #{@index.get_value(header.id)}"
return false
end
end
end
true
- end
-
- def each_blob_header(&block)
- pos = 0
- begin
- @f.seek(0)
- while (buf = @f.read(BLOB_HEADER_LENGTH))
- mark, length, id, crc = buf.unpack(BLOB_HEADER_FORMAT)
- yield(pos, mark, length, id, crc)
-
- pos += BLOB_HEADER_LENGTH + length
- @f.seek(pos)
- end
- rescue IOError => e
- PEROBS.log.fatal "Cannot read blob in flat file DB: #{e.message}"
- end
end
end
end