lib/perobs/FlatFile.rb in perobs-2.4.2 vs lib/perobs/FlatFile.rb in perobs-2.5.0

- old
+ new

@@ -26,40 +26,21 @@ # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. require 'zlib' require 'perobs/Log' +require 'perobs/FlatFileBlobHeader' require 'perobs/IndexTree' require 'perobs/FreeSpaceManager' module PEROBS # The FlatFile class manages the storage file of the FlatFileDB. It contains - # a sequence of blobs Each blob consists of a 25 byte header and the actual - # blob data bytes. The header has the following structure: - # - # 1 Byte: Mark byte. - # Bit 0: 0 deleted entry, 1 valid entry - # Bit 1: 0 unmarked, 1 marked - # Bit 2 - 7: reserved, must be 0 - # 8 bytes: Length of the data blob in bytes - # 8 bytes: ID of the value in the data blob - # 4 bytes: CRC32 checksum of the data blob - # - # If the bit 0 of the mark byte is 0, only the length is valid. The blob is - # empty. Only of bit 0 is set then entry is valid. + # a sequence of blobs Each blob consists of header and the actual + # blob data bytes. class FlatFile - # Utility class to hold all the data that is stored in a blob header. - class Header < Struct.new(:mark, :length, :id, :crc) - end - - # The 'pack()' format of the header. - BLOB_HEADER_FORMAT = 'CQQL' - # The length of the header in bytes. - BLOB_HEADER_LENGTH = 21 - # Create a new FlatFile object for a database in the given path. # @param dir [String] Directory path for the data base file def initialize(dir) @db_dir = dir @f = nil @@ -123,31 +104,31 @@ # Delete the blob that is stored at the specified address. # @param addr [Integer] Address of the blob to delete # @param id [Integer] ID of the blob to delete def delete_obj_by_address(addr, id) @index.delete_value(id) - header = read_blob_header(addr, id) + header = FlatFileBlobHeader.read_at(@f, addr, id) begin @f.seek(addr) @f.write([ 0 ].pack('C')) @f.flush @space_list.add_space(addr, header.length) - rescue => e + rescue IOError => e PEROBS.log.fatal "Cannot erase blob for ID #{header.id}: #{e.message}" end end # Delete all unmarked objects. def delete_unmarked_objects PEROBS.log.info "Deleting unmarked objects..." t = Time.now deleted_ids = [] - each_blob_header do |pos, mark, length, blob_id, crc| - if (mark & 3 == 1) - delete_obj_by_address(pos, blob_id) - deleted_ids << blob_id + each_blob_header do |pos, header| + if header.is_valid? && !header.is_marked? + delete_obj_by_address(pos, header.id) + deleted_ids << header.id end end defragmentize PEROBS.log.info "#{deleted_ids.length} unmarked objects deleted " + @@ -159,43 +140,55 @@ # entry with the given ID exists already in the file. # @param id [Integer] ID of the object # @param raw_obj [String] Raw object as String # @return [Integer] position of the written blob in the blob file def write_obj_by_id(id, raw_obj) + crc = checksum(raw_obj) + + # If the raw_obj is larger then 256 characters we will compress it to + # safe some space in the database file. For smaller strings the + # performance impact of compression is not compensated by writing + # less data to the storage. + compressed = false + if raw_obj.length > 256 + raw_obj = Zlib.deflate(raw_obj) + compressed = true + end + addr, length = find_free_blob(raw_obj.length) begin if length != -1 # Just a safeguard so we don't overwrite current data. - header = read_blob_header(addr) + header = FlatFileBlobHeader.read_at(@f, addr) if header.length != length PEROBS.log.fatal "Length in free list (#{length}) and header " + "(#{header.length}) don't match." end if raw_obj.length > header.length PEROBS.log.fatal "Object (#{raw_obj.length}) is longer than " + "blob space (#{header.length})." end - if header.mark != 0 - PEROBS.log.fatal "Mark (#{header.mark}) is not 0." + if header.is_valid? + PEROBS.log.fatal "Entry (mark: #{header.mark}) is already used." end end @f.seek(addr) - @f.write([ 1, raw_obj.length, id, checksum(raw_obj)]. - pack(BLOB_HEADER_FORMAT)) + FlatFileBlobHeader.new(compressed ? (1 << 2) | 1 : 1, raw_obj.length, + id, crc).write(@f) @f.write(raw_obj) if length != -1 && raw_obj.length < length # The new object was not appended and it did not completely fill the # free space. So we have to write a new header to mark the remaining # empty space. - unless length - raw_obj.length >= BLOB_HEADER_LENGTH + unless length - raw_obj.length >= FlatFileBlobHeader::LENGTH PEROBS.log.fatal "Not enough space to append the empty space " + "header (space: #{length} bytes, object: #{raw_obj.length} " + "bytes)." end space_address = @f.pos - space_length = length - BLOB_HEADER_LENGTH - raw_obj.length - @f.write([ 0, space_length, 0, 0 ].pack(BLOB_HEADER_FORMAT)) + space_length = length - FlatFileBlobHeader::LENGTH - raw_obj.length + FlatFileBlobHeader.new(0, space_length, 0, 0).write(@f) # Register the new space with the space list. @space_list.add_space(space_address, space_length) if space_length > 0 end @f.flush @index.put_value(id, addr) @@ -228,25 +221,35 @@ # Read the object at the specified address. # @param addr [Integer] Offset in the flat file # @param id [Integer] ID of the data blob # @return [String] Raw object data def read_obj_by_address(addr, id) - header = read_blob_header(addr, id) + header = FlatFileBlobHeader.read_at(@f, addr, id) if header.id != id PEROBS.log.fatal "Database index corrupted: Index for object " + "#{id} points to object with ID #{header.id}" end + + buf = nil + begin - @f.seek(addr + BLOB_HEADER_LENGTH) + @f.seek(addr + FlatFileBlobHeader::LENGTH) buf = @f.read(header.length) - if checksum(buf) != header.crc - PEROBS.log.fatal "Checksum failure while reading blob ID #{id}" - end - return buf - rescue => e + rescue IOError => e PEROBS.log.fatal "Cannot read blob for ID #{id}: #{e.message}" end + + # Uncompress the data if the compression bit is set in the mark byte. + if header.is_compressed? + buf = Zlib.inflate(buf) + end + + if checksum(buf) != header.crc + PEROBS.log.fatal "Checksum failure while reading blob ID #{id}" + end + + buf end # Mark the object with the given ID. # @param id [Integer] ID of the object def mark_obj_by_id(id) @@ -257,27 +260,27 @@ # Mark the object at the specified address. # @param addr [Integer] Offset in the file # @param id [Integer] ID of the object def mark_obj_by_address(addr, id) - header = read_blob_header(addr, id) + header = FlatFileBlobHeader.read_at(@f, addr, id) begin @f.seek(addr) - @f.write([ header.mark | 2 ].pack('C')) + @f.write([ header.mark | (1 << 1) ].pack('C')) @f.flush - rescue => e + rescue IOError => e PEROBS.log.fatal "Marking of FlatFile blob with ID #{id} " + "failed: #{e.message}" end end # Return true if the object with the given ID is marked, false otherwise. # @param id [Integer] ID of the object def is_marked_by_id?(id) if (addr = find_obj_addr_by_id(id)) - header = read_blob_header(addr, id) - return (header.mark & 2) == 2 + header = FlatFileBlobHeader.read_at(@f, addr, id) + return header.is_marked? end false end @@ -287,20 +290,20 @@ PEROBS.log.info "Clearing all marks..." total_blob_count = 0 marked_blob_count = 0 - each_blob_header do |pos, mark, length, blob_id, crc| + each_blob_header do |pos, header| total_blob_count += 1 - if (mark & 3 == 3) + if header.is_valid? && header.is_marked? # Clear all valid and marked blocks. marked_blob_count += 1 begin @f.seek(pos) - @f.write([ mark & 0b11111101 ].pack('C')) + @f.write([ header.mark & 0b11111101 ].pack('C')) @f.flush - rescue => e + rescue IOError => e PEROBS.log.fatal "Unmarking of FlatFile blob with ID #{blob_id} " + "failed: #{e.message}" end end end @@ -315,14 +318,14 @@ deleted_blobs = 0 valid_blobs = 0 t = Time.now PEROBS.log.info "Defragmenting FlatFile" # Iterate over all entries. - each_blob_header do |pos, mark, length, blob_id, crc| + each_blob_header do |pos, header| # Total size of the current entry - entry_bytes = BLOB_HEADER_LENGTH + length - if (mark & 1 == 1) + entry_bytes = FlatFileBlobHeader::LENGTH + header.length + if header.is_valid? # We have found a valid entry. valid_blobs += 1 if distance > 0 begin # Read current entry into a buffer @@ -330,18 +333,18 @@ buf = @f.read(entry_bytes) # Write the buffer right after the end of the previous entry. @f.seek(pos - distance) @f.write(buf) # Update the index with the new position - @index.put_value(blob_id, pos - distance) + @index.put_value(header.id, pos - distance) # Mark the space between the relocated current entry and the # next valid entry as deleted space. - @f.write([ 0, distance - BLOB_HEADER_LENGTH, 0, 0 ]. - pack(BLOB_HEADER_FORMAT)) + FlatFileBlobHeader.new(0, distance - FlatFileBlobHeader::LENGTH, + 0, 0).write(@f) @f.flush - rescue => e - PEROBS.log.fatal "Error while moving blob for ID #{blob_id}: " + + rescue IOError => e + PEROBS.log.fatal "Error while moving blob for ID #{header.id}: " + e.message end end else deleted_blobs += 1 @@ -359,37 +362,70 @@ @space_list.clear sync end + # This method iterates over all entries in the FlatFile and removes the + # entry and inserts it again. This is useful to update all entries in + # cased the storage format has changed. + def refresh + # This iteration might look scary as we iterate over the entries while + # while we are rearranging them. Re-inserted items may be inserted + # before or at the current entry and this is fine. They also may be + # inserted after the current entry and will be re-read again unless they + # are inserted after the original file end. + file_size = @f.size + PEROBS.log.info "Refreshing the DB..." + t = Time.now + each_blob_header do |pos, header| + if header.is_valid? + buf = read_obj_by_address(pos, header.id) + delete_obj_by_address(pos, header.id) + write_obj_by_id(header.id, buf) + end + + # Some re-inserted blobs may be inserted after the original file end. + # No need to process those blobs again. + break if pos >= file_size + end + PEROBS.log.info "DB refresh completed in #{Time.now - t} seconds" + + # Reclaim the space saved by compressing entries. + defragmentize + end + def check(repair = false) return unless @f t = Time.now PEROBS.log.info "Checking FlatFile database" + "#{repair ? ' in repair mode' : ''}..." # First check the database blob file. Each entry should be readable and # correct. - each_blob_header do |pos, mark, length, blob_id, crc| - if (mark & 1 == 1) + each_blob_header do |pos, header| + if header.is_valid? # We have a non-deleted entry. begin - @f.seek(pos + BLOB_HEADER_LENGTH) - buf = @f.read(length) - if crc && checksum(buf) != crc + @f.seek(pos + FlatFileBlobHeader::LENGTH) + buf = @f.read(header.length) + # Uncompress the data if the compression bit is set in the mark + # byte. + buf = Zlib.inflate(buf) if header.is_compressed? + + if header.crc && checksum(buf) != header.crc if repair PEROBS.log.error "Checksum failure while checking blob " + - "with ID #{id}. Deleting object." - delete_obj_by_address(pos, blob_id) + "with ID #{header.id}. Deleting object." + delete_obj_by_address(pos, header.id) else PEROBS.log.fatal "Checksum failure while checking blob " + - "with ID #{id}" + "with ID #{header.id}" end end - rescue => e - PEROBS.log.fatal "Check of blob with ID #{blob_id} failed: " + + rescue IOError => e + PEROBS.log.fatal "Check of blob with ID #{header.id} failed: " + e.message end end end @@ -414,126 +450,106 @@ def regenerate_index_and_spaces PEROBS.log.warn "Re-generating FlatFileDB index and space files" @index.clear @space_list.clear - each_blob_header do |pos, mark, length, id, crc| - if mark == 0 - @space_list.add_space(pos, length) if length > 0 + each_blob_header do |pos, header| + if header.is_valid? + @index.put_value(header.id, pos) else - @index.put_value(id, pos) + @space_list.add_space(pos, header.length) if header.length > 0 end end end def has_space?(address, size) - header = read_blob_header(address) + header = FlatFileBlobHeader.read_at(@f, address) header.length == size end def has_id_at?(id, address) - header = read_blob_header(address) + header = FlatFileBlobHeader.read_at(@f, address) header.id == id end def inspect s = '[' - each_blob_header do |pos, mark, length, blob_id, crc| - s << "{ :pos => #{pos}, :mark => #{mark}, " + - ":length => #{length}, :id => #{blob_id}, :crc => #{crc}" - if mark != 0 - s << ", :value => #{@f.read(length)}" + each_blob_header do |pos, header| + s << "{ :pos => #{pos}, :mark => #{header.mark}, " + + ":length => #{header.length}, :id => #{header.id}, " + + ":crc => #{header.crc}" + if header.is_valid? + s << ", :value => #{@f.read(header.length)}" end s << " }\n" end s + ']' end private - def read_blob_header(addr, id = nil) - buf = nil + def each_blob_header(&block) + pos = 0 begin - @f.seek(addr) - buf = @f.read(BLOB_HEADER_LENGTH) - rescue => e + @f.seek(0) + while (header = FlatFileBlobHeader.read(@f)) + yield(pos, header) + + pos += FlatFileBlobHeader::LENGTH + header.length + @f.seek(pos) + end + rescue IOError => e PEROBS.log.fatal "Cannot read blob in flat file DB: #{e.message}" end - if buf.nil? || buf.length != BLOB_HEADER_LENGTH - PEROBS.log.fatal "Cannot read blob header " + - "#{id ? "for ID #{id} " : ''}at address " + - "#{addr}" - end - header = Header.new(*buf.unpack(BLOB_HEADER_FORMAT)) - if id && header.id != id - PEROBS.log.fatal "Mismatch between FlatFile index and blob file " + - "found for entry with ID #{id}/#{header.id}" - end - - return header end def find_free_blob(bytes) address, size = @space_list.get_space(bytes) unless address # We have not found any suitable space. Return the end of the file. return [ @f.size, -1 ] end - if size == bytes || size - BLOB_HEADER_LENGTH >= bytes + if size == bytes || size - FlatFileBlobHeader::LENGTH >= bytes return [ address, size ] end # Return the found space again. It's too small for the new content plus # the gap header. @space_list.add_space(address, size) # We need a space that is large enough to hold the bytes and the gap # header. - @space_list.get_space(bytes + BLOB_HEADER_LENGTH) || [ @f.size, -1 ] + @space_list.get_space(bytes + FlatFileBlobHeader::LENGTH) || + [ @f.size, -1 ] end def checksum(raw_obj) Zlib.crc32(raw_obj, 0) end def cross_check_entries - each_blob_header do |pos, mark, length, blob_id, crc| - if mark == 0 - if length > 0 - unless @space_list.has_space?(pos, length) + each_blob_header do |pos, header| + if !header.is_valid? + if header.length > 0 + unless @space_list.has_space?(pos, header.length) PEROBS.log.error "FlatFile has free space " + - "(addr: #{pos}, len: #{length}) that is not in FreeSpaceManager" + "(addr: #{pos}, len: #{header.length}) that is not in " + + "FreeSpaceManager" return false end end else - unless @index.get_value(blob_id) == pos + unless @index.get_value(header.id) == pos PEROBS.log.error "FlatFile blob at address #{pos} is listed " + - "in index with address #{@index.get_value(blob_id)}" + "in index with address #{@index.get_value(header.id)}" return false end end end true - end - - def each_blob_header(&block) - pos = 0 - begin - @f.seek(0) - while (buf = @f.read(BLOB_HEADER_LENGTH)) - mark, length, id, crc = buf.unpack(BLOB_HEADER_FORMAT) - yield(pos, mark, length, id, crc) - - pos += BLOB_HEADER_LENGTH + length - @f.seek(pos) - end - rescue IOError => e - PEROBS.log.fatal "Cannot read blob in flat file DB: #{e.message}" - end end end end