#!/usr/bin/env ruby
$: << File.join(File.dirname(__FILE__), '..', 'lib')
require 'daybreak'
def convert(oldfile, newfile)
olddb = Daybreak1::DB.new(oldfile)
newdb = Daybreak::DB.new(newfile)
olddb.each do |key, value|
newdb[key] = value
end
olddb.close!
newdb.close
end
module Daybreak1
# Daybreak::DB contains the public api for Daybreak, you may extend it like
# any other Ruby class (i.e. to overwrite serialize and parse). It includes
# Enumerable for functional goodies like map, each, reduce and friends.
class DB
include Enumerable
# Create a new Daybreak::DB. The second argument is the default value
# to store when accessing a previously unset key, this follows the
# Hash standard.
# @param [String] file the path to the db file
# @param default the default value to store and return when a key is
# not yet in the database.
# @yield [key] a block that will return the default value to store.
# @yieldparam [String] key the key to be stored.
def initialize(file, default=nil, &blk)
@table = {}
@file_name = file
@writer = Writer.new(@file_name)
@default = block_given? ? blk : default
read!
end
# Set a key in the database to be written at some future date. If the data
# needs to be persisted immediately, call db.set(key, value, true).
# @param [#to_s] key the key of the storage slot in the database
# @param value the value to store
# @param [Boolean] sync if true, sync this value immediately
def []=(key, value, sync = false)
key = key.to_s
write key, value, sync
@table[key] = value
end
alias_method :set, :"[]="
# set! flushes data immediately to disk.
# @param [#to_s] key the key of the storage slot in the database
# @param value the value to store
def set!(key, value)
set key, value, true
end
# Delete a key from the database
# @param [#to_s] key the key of the storage slot in the database
# @param [Boolean] sync if true, sync this deletion immediately
def delete(key, sync = false)
key = key.to_s
write key, '', sync, true
@table.delete key
end
# delete! immediately deletes the key on disk.
# @param [#to_s] key the key of the storage slot in the database
def delete!(key)
delete key, true
end
# Retrieve a value at key from the database. If the default value was specified
# when this database was created, that value will be set and returned. Aliased
# as get.
# @param [#to_s] key the value to retrieve from the database.
def [](key)
key = key.to_s
if @table.has_key? key
@table[key]
elsif default?
set key, Proc === @default ? @default.call(key) : @default
end
end
alias_method :get, :"[]"
# Iterate over the key, value pairs in the database.
# @yield [key, value] blk the iterator for each key value pair.
# @yieldparam [String] key the key.
# @yieldparam value the value from the database.
def each
keys.each { |k| yield(k, get(k)) }
end
# Does this db have a default value.
def default?
!@default.nil?
end
# Does this db have a value for this key?
# @param [key#to_s] key the key to check if the DB has a key.
def has_key?(key)
@table.has_key? key.to_s
end
# Return the keys in the db.
# @return [Array]
def keys
@table.keys
end
# Return the number of stored items.
# @return [Integer]
def length
@table.keys.length
end
alias_method :size, :length
# Serialize the data for writing to disk, if you don't want to use Marshal
# overwrite this method.
# @param value the value to be serialized
# @return [String]
def serialize(value)
Marshal.dump(value)
end
# Parse the serialized value from disk, like serialize if you want to use a
# different serialization method overwrite this method.
# @param value the value to be parsed
# @return [String]
def parse(value)
Marshal.load(value)
end
# Empty the database file.
def empty!
@writer.truncate!
@table.clear
read!
end
alias_method :clear, :empty!
# Force all queued commits to be written to disk.
def flush!
@writer.flush!
end
# Close the database for reading and writing.
def close!
@writer.close!
end
# Compact the database to remove stale commits and reduce the file size.
def compact!
# Create a new temporary database
tmp_file = @file_name + "-#{$$}-#{Thread.current.object_id}"
copy_db = self.class.new tmp_file
# Copy the database key by key into the temporary table
each do |key, value|
copy_db.set(key, get(key))
end
copy_db.close!
close!
# Move the copy into place
File.rename tmp_file, @file_name
# Reopen this database
@writer = Writer.new(@file_name)
@table.clear
read!
end
# Read all values from the log file. If you want to check for changed data
# call this again.
def read!
buf = nil
File.open(@file_name, 'rb') do |fd|
fd.flock(File::LOCK_SH)
buf = fd.read
end
until buf.empty?
key, data, deleted = Record.deserialize(buf)
if deleted
@table.delete key
else
@table[key] = parse(data)
end
end
end
private
def write(key, value, sync = false, delete = false)
@writer.write([key, serialize(value), delete])
flush! if sync
end
end
# Records define how data is serialized and read from disk.
module Record
# Thrown when either key or data is missing
class UnnacceptableDataError < Exception; end
# Thrown when there is a CRC mismatch between the data from the disk
# and what was written to disk previously.
class CorruptDataError < Exception; end
extend self
# The mask a record uses to check for deletion.
DELETION_MASK = 1 << 31
# The serialized representation of the key value pair plus the CRC.
# @return [String]
def serialize(record)
raise UnnacceptableDataError, 'key and data must be defined' unless record[0] && record[1]
s = key_data_string(record)
s << crc_string(s)
end
# Create a new record to read from IO.
# @param [#read] io an IO instance to read from
def deserialize(buf)
record = []
masked = read32(buf)
# Read the record's key bytes
record << buf.slice!(0, masked & (DELETION_MASK - 1)) <<
# Read the record's value bytes
buf.slice!(0, read32(buf)) <<
# Set the deletion flag
((masked & DELETION_MASK) != 0)
raise CorruptDataError, 'CRC mismatch' unless buf.slice!(0, 4) == crc_string(key_data_string(record))
record
end
private
# Return the deletion flag plus two length prefixed cells
def key_data_string(record)
part(record[0], record[0].bytesize + (record[2] ? DELETION_MASK : 0)) << part(record[1], record[1].bytesize)
end
def crc_string(s)
[Zlib.crc32(s, 0)].pack('N')
end
def part(data, length)
[length].pack('N') << data
end
def read32(buf)
buf.slice!(0, 4).unpack('N')[0]
end
end
# Writer's handle the actually fiddly task of committing data to disk.
# They have a Worker instance that writes in a select loop.
class Writer
# Open up the file, ready it for binary and nonblocking writing.
def initialize(file)
@file = file
open!
@worker = Worker.new(@fd)
end
# Send a record to the workers queue.
def write(record)
@worker.enqueue record
end
# Finish writing
def finish!
@worker.finish!
end
# Flush pending commits, and restart the worker.
def flush!
@worker.flush!
end
# Finish writing and close the file descriptor.
def close!
finish!
@fd.close
end
# Truncate the file.
def truncate!
finish!
@fd.truncate(0)
@fd.pos = 0
end
private
def open!
@fd = File.open @file, 'ab'
if defined?(Fcntl::O_NONBLOCK)
f = @fd.fcntl(Fcntl::F_GETFL, 0)
@fd.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | f)
end
end
# Workers handle the actual fiddly bits of asynchronous io and
# and handle background writes.
class Worker
def initialize(fd)
@queue = Queue.new
@fd = fd
@thread = Thread.new { work }
at_exit { finish! }
end
# Queue up a write to be committed later.
def enqueue(record)
@queue << record
end
# Loop and block if we don't have work to do or if
# the file isn't ready for another write just yet.
def work
buf, finished = '', false
until finished && buf.empty?
record = @queue.pop
if record
buf << Record.serialize(record)
else
finished = true
end
read, write = IO.select [], [@fd]
if write and fd = write.first
lock(fd) { buf = try_write fd, buf }
end
end
@fd.flush
end
# Try and write the buffer to the file via non blocking file writes.
# If the write fails try again.
def try_write(fd, buf)
if defined?(Fcntl::O_NONBLOCK)
s = fd.write_nonblock(buf)
else
s = fd.write(buf)
end
if s < buf.length
buf = buf[s..-1] # didn't finish
else
buf = ""
end
buf
rescue Errno::EAGAIN
buf
end
# Lock a file with the type lock
def lock(fd)
fd.flock File::LOCK_EX
begin
yield
ensure
fd.flock File::LOCK_UN
end
end
# finish! and start up another worker thread.
def flush!
finish!
@thread = Thread.new { work }
true
end
# Push a nil through the queue and block until the write loop is finished.
def finish!
@queue.push nil
@thread.join
end
end
end
end
if ARGV.size != 2
puts "Usage: #{$0} olddb newdb"
else
convert(ARGV[0], ARGV[1])
end