lib/perobs/FlatFile.rb in perobs-3.0.2 vs lib/perobs/FlatFile.rb in perobs-4.0.0
- old
+ new
@@ -46,10 +46,11 @@
# @param dir [String] Directory path for the data base file
def initialize(dir)
@db_dir = dir
@f = nil
@index = BTree.new(@db_dir, 'index', INDEX_BTREE_ORDER)
+ @marks = BTree.new(@db_dir, 'marks', INDEX_BTREE_ORDER)
@space_list = SpaceTree.new(@db_dir)
end
# Open the flat file for reading and writing.
def open
@@ -69,10 +70,11 @@
end
unless @f.flock(File::LOCK_NB | File::LOCK_EX)
PEROBS.log.fatal "FlatFile database '#{file_name}' is locked by " +
"another process"
end
+ @f.sync = true
begin
@index.open(!new_db_created)
@space_list.open
rescue FatalError
@@ -101,19 +103,21 @@
@index.close
if @f
@f.flush
@f.flock(File::LOCK_UN)
+ @f.fsync
@f.close
@f = nil
end
end
# Force outstanding data to be written to the filesystem.
def sync
begin
@f.flush
+ @f.fsync
rescue IOError => e
PEROBS.log.fatal "Cannot sync flat file database: #{e.message}"
end
@index.sync
@space_list.sync
@@ -146,11 +150,11 @@
PEROBS.log.info "Deleting unmarked objects..."
t = Time.now
deleted_ids = []
each_blob_header do |pos, header|
- if header.is_valid? && !header.is_marked?
+ if header.is_valid? && @marks.get(header.id).nil?
delete_obj_by_address(pos, header.id)
deleted_ids << header.id
end
end
defragmentize
@@ -182,54 +186,48 @@
# If the raw_obj is larger then 256 characters we will compress it to
# safe some space in the database file. For smaller strings the
# performance impact of compression is not compensated by writing
# less data to the storage.
compressed = false
- if raw_obj.length > 256
+ if raw_obj.bytesize > 256
raw_obj = Zlib.deflate(raw_obj)
compressed = true
end
- addr, length = find_free_blob(raw_obj.length)
+ addr, length = find_free_blob(raw_obj.bytesize)
begin
if length != -1
# Just a safeguard so we don't overwrite current data.
header = FlatFileBlobHeader.read_at(@f, addr)
if header.length != length
PEROBS.log.fatal "Length in free list (#{length}) and header " +
"(#{header.length}) for address #{addr} don't match."
end
- if raw_obj.length > header.length
- PEROBS.log.fatal "Object (#{raw_obj.length}) is longer than " +
+ if raw_obj.bytesize > header.length
+ PEROBS.log.fatal "Object (#{raw_obj.bytesize}) is longer than " +
"blob space (#{header.length})."
end
if header.is_valid?
PEROBS.log.fatal "Entry at address #{addr} with flags: " +
"#{header.flags} is already used for ID #{header.id}."
end
end
flags = 1 << FlatFileBlobHeader::VALID_FLAG_BIT
flags |= (1 << FlatFileBlobHeader::COMPRESSED_FLAG_BIT) if compressed
- if old_addr && old_header.is_marked?
- # This method might be called in the middle of an operation that
- # uses the mark flag. We must ensure that the flag is carried over
- # to the new header.
- flags |= (1 << FlatFileBlobHeader::MARK_FLAG_BIT)
- end
- FlatFileBlobHeader.new(@f, addr, flags, raw_obj.length, id, crc).write
+ FlatFileBlobHeader.new(@f, addr, flags, raw_obj.bytesize, id, crc).write
@f.write(raw_obj)
- if length != -1 && raw_obj.length < length
+ if length != -1 && raw_obj.bytesize < length
# The new object was not appended and it did not completely fill the
# free space. So we have to write a new header to mark the remaining
# empty space.
- unless length - raw_obj.length >= FlatFileBlobHeader::LENGTH
+ unless length - raw_obj.bytesize >= FlatFileBlobHeader::LENGTH
PEROBS.log.fatal "Not enough space to append the empty space " +
- "header (space: #{length} bytes, object: #{raw_obj.length} " +
+ "header (space: #{length} bytes, object: #{raw_obj.bytesize} " +
"bytes)."
end
space_address = @f.pos
- space_length = length - FlatFileBlobHeader::LENGTH - raw_obj.length
+ space_length = length - FlatFileBlobHeader::LENGTH - raw_obj.bytesize
FlatFileBlobHeader.new(@f, space_address, 0, space_length,
0, 0).write
# Register the new space with the space list.
@space_list.add_space(space_address, space_length) if space_length > 0
end
@@ -270,10 +268,19 @@
end
nil
end
+ def search_object(id)
+ each_blob_header do |pos, header|
+ return read_obj_by_address(pos, id)
+ end
+
+ nil
+ end
+
+
# Read the object at the specified address.
# @param addr [Integer] Offset in the flat file
# @param id [Integer] ID of the data blob
# @return [String] Raw object data
def read_obj_by_address(addr, id)
@@ -310,51 +317,23 @@
end
# Mark the object with the given ID.
# @param id [Integer] ID of the object
def mark_obj_by_id(id)
- if (addr = find_obj_addr_by_id(id))
- mark_obj_by_address(addr, id)
- end
+ @marks.insert(id, 0)
end
- # Mark the object at the specified address.
- # @param addr [Integer] Offset in the file
- # @param id [Integer] ID of the object
- def mark_obj_by_address(addr, id)
- FlatFileBlobHeader.read_at(@f, addr, id).set_mark_flag
- end
-
# Return true if the object with the given ID is marked, false otherwise.
# @param id [Integer] ID of the object
def is_marked_by_id?(id)
- if (addr = find_obj_addr_by_id(id))
- header = FlatFileBlobHeader.read_at(@f, addr, id)
- return header.is_marked?
- end
-
- false
+ !@marks.get(id).nil?
end
# Clear alls marks.
def clear_all_marks
- t = Time.now
- PEROBS.log.info "Clearing all marks..."
-
- total_blob_count = 0
- marked_blob_count = 0
-
- each_blob_header do |pos, header|
- total_blob_count += 1
- if header.is_valid? && header.is_marked?
- # Clear all valid and marked blocks.
- marked_blob_count += 1
- header.clear_mark_flag
- end
- end
- PEROBS.log.info "#{marked_blob_count} marks in #{total_blob_count} " +
- "objects cleared in #{Time.now - t} seconds"
+ @marks.erase
+ @marks.open
end
# Eliminate all the holes in the file. This is an in-place
# implementation. No additional space will be needed on the file system.
def defragmentize
@@ -462,10 +441,10 @@
if header.is_valid?
# We have a non-deleted entry.
begin
@f.seek(pos + FlatFileBlobHeader::LENGTH)
buf = @f.read(header.length)
- if buf.length != header.length
+ if buf.bytesize != header.length
PEROBS.log.error "Premature end of file in blob with ID " +
"#{header.id}."
discard_damaged_blob(header) if repair
errors += 1
next