#!/usr/bin/env ruby $: << File.join(File.dirname(__FILE__), '..', 'lib') require 'daybreak' def convert(oldfile, newfile) olddb = Daybreak1::DB.new(oldfile) newdb = Daybreak::DB.new(newfile) olddb.each do |key, value| newdb[key] = value end olddb.close! newdb.close end module Daybreak1 # Daybreak::DB contains the public api for Daybreak, you may extend it like # any other Ruby class (i.e. to overwrite serialize and parse). It includes # Enumerable for functional goodies like map, each, reduce and friends. class DB include Enumerable # Create a new Daybreak::DB. The second argument is the default value # to store when accessing a previously unset key, this follows the # Hash standard. # @param [String] file the path to the db file # @param default the default value to store and return when a key is # not yet in the database. # @yield [key] a block that will return the default value to store. # @yieldparam [String] key the key to be stored. def initialize(file, default=nil, &blk) @table = {} @file_name = file @writer = Writer.new(@file_name) @default = block_given? ? blk : default read! end # Set a key in the database to be written at some future date. If the data # needs to be persisted immediately, call db.set(key, value, true). # @param [#to_s] key the key of the storage slot in the database # @param value the value to store # @param [Boolean] sync if true, sync this value immediately def []=(key, value, sync = false) key = key.to_s write key, value, sync @table[key] = value end alias_method :set, :"[]=" # set! flushes data immediately to disk. # @param [#to_s] key the key of the storage slot in the database # @param value the value to store def set!(key, value) set key, value, true end # Delete a key from the database # @param [#to_s] key the key of the storage slot in the database # @param [Boolean] sync if true, sync this deletion immediately def delete(key, sync = false) key = key.to_s write key, '', sync, true @table.delete key end # delete! immediately deletes the key on disk. # @param [#to_s] key the key of the storage slot in the database def delete!(key) delete key, true end # Retrieve a value at key from the database. If the default value was specified # when this database was created, that value will be set and returned. Aliased # as get. # @param [#to_s] key the value to retrieve from the database. def [](key) key = key.to_s if @table.has_key? key @table[key] elsif default? set key, Proc === @default ? @default.call(key) : @default end end alias_method :get, :"[]" # Iterate over the key, value pairs in the database. # @yield [key, value] blk the iterator for each key value pair. # @yieldparam [String] key the key. # @yieldparam value the value from the database. def each keys.each { |k| yield(k, get(k)) } end # Does this db have a default value. def default? !@default.nil? end # Does this db have a value for this key? # @param [key#to_s] key the key to check if the DB has a key. def has_key?(key) @table.has_key? key.to_s end # Return the keys in the db. # @return [Array] def keys @table.keys end # Return the number of stored items. # @return [Integer] def length @table.keys.length end alias_method :size, :length # Serialize the data for writing to disk, if you don't want to use Marshal # overwrite this method. # @param value the value to be serialized # @return [String] def serialize(value) Marshal.dump(value) end # Parse the serialized value from disk, like serialize if you want to use a # different serialization method overwrite this method. # @param value the value to be parsed # @return [String] def parse(value) Marshal.load(value) end # Empty the database file. def empty! @writer.truncate! @table.clear read! end alias_method :clear, :empty! # Force all queued commits to be written to disk. def flush! @writer.flush! end # Close the database for reading and writing. def close! @writer.close! end # Compact the database to remove stale commits and reduce the file size. def compact! # Create a new temporary database tmp_file = @file_name + "-#{$$}-#{Thread.current.object_id}" copy_db = self.class.new tmp_file # Copy the database key by key into the temporary table each do |key, value| copy_db.set(key, get(key)) end copy_db.close! close! # Move the copy into place File.rename tmp_file, @file_name # Reopen this database @writer = Writer.new(@file_name) @table.clear read! end # Read all values from the log file. If you want to check for changed data # call this again. def read! buf = nil File.open(@file_name, 'rb') do |fd| fd.flock(File::LOCK_SH) buf = fd.read end until buf.empty? key, data, deleted = Record.deserialize(buf) if deleted @table.delete key else @table[key] = parse(data) end end end private def write(key, value, sync = false, delete = false) @writer.write([key, serialize(value), delete]) flush! if sync end end # Records define how data is serialized and read from disk. module Record # Thrown when either key or data is missing class UnnacceptableDataError < Exception; end # Thrown when there is a CRC mismatch between the data from the disk # and what was written to disk previously. class CorruptDataError < Exception; end extend self # The mask a record uses to check for deletion. DELETION_MASK = 1 << 31 # The serialized representation of the key value pair plus the CRC. # @return [String] def serialize(record) raise UnnacceptableDataError, 'key and data must be defined' unless record[0] && record[1] s = key_data_string(record) s << crc_string(s) end # Create a new record to read from IO. # @param [#read] io an IO instance to read from def deserialize(buf) record = [] masked = read32(buf) # Read the record's key bytes record << buf.slice!(0, masked & (DELETION_MASK - 1)) << # Read the record's value bytes buf.slice!(0, read32(buf)) << # Set the deletion flag ((masked & DELETION_MASK) != 0) raise CorruptDataError, 'CRC mismatch' unless buf.slice!(0, 4) == crc_string(key_data_string(record)) record end private # Return the deletion flag plus two length prefixed cells def key_data_string(record) part(record[0], record[0].bytesize + (record[2] ? DELETION_MASK : 0)) << part(record[1], record[1].bytesize) end def crc_string(s) [Zlib.crc32(s, 0)].pack('N') end def part(data, length) [length].pack('N') << data end def read32(buf) buf.slice!(0, 4).unpack('N')[0] end end # Writer's handle the actually fiddly task of committing data to disk. # They have a Worker instance that writes in a select loop. class Writer # Open up the file, ready it for binary and nonblocking writing. def initialize(file) @file = file open! @worker = Worker.new(@fd) end # Send a record to the workers queue. def write(record) @worker.enqueue record end # Finish writing def finish! @worker.finish! end # Flush pending commits, and restart the worker. def flush! @worker.flush! end # Finish writing and close the file descriptor. def close! finish! @fd.close end # Truncate the file. def truncate! finish! @fd.truncate(0) @fd.pos = 0 end private def open! @fd = File.open @file, 'ab' if defined?(Fcntl::O_NONBLOCK) f = @fd.fcntl(Fcntl::F_GETFL, 0) @fd.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | f) end end # Workers handle the actual fiddly bits of asynchronous io and # and handle background writes. class Worker def initialize(fd) @queue = Queue.new @fd = fd @thread = Thread.new { work } at_exit { finish! } end # Queue up a write to be committed later. def enqueue(record) @queue << record end # Loop and block if we don't have work to do or if # the file isn't ready for another write just yet. def work buf, finished = '', false until finished && buf.empty? record = @queue.pop if record buf << Record.serialize(record) else finished = true end read, write = IO.select [], [@fd] if write and fd = write.first lock(fd) { buf = try_write fd, buf } end end @fd.flush end # Try and write the buffer to the file via non blocking file writes. # If the write fails try again. def try_write(fd, buf) if defined?(Fcntl::O_NONBLOCK) s = fd.write_nonblock(buf) else s = fd.write(buf) end if s < buf.length buf = buf[s..-1] # didn't finish else buf = "" end buf rescue Errno::EAGAIN buf end # Lock a file with the type lock def lock(fd) fd.flock File::LOCK_EX begin yield ensure fd.flock File::LOCK_UN end end # finish! and start up another worker thread. def flush! finish! @thread = Thread.new { work } true end # Push a nil through the queue and block until the write loop is finished. def finish! @queue.push nil @thread.join end end end end if ARGV.size != 2 puts "Usage: #{$0} olddb newdb" else convert(ARGV[0], ARGV[1]) end