module Daybreak
# Daybreak::DB contains the public api for Daybreak. It includes
# Enumerable for functional goodies like map, each, reduce and friends.
# @api public
class DB
include Enumerable
# Accessors for the database file, and a counter of how many records are in
# sync with the file.
attr_reader :file, :logsize
attr_writer :default
@databases = []
@databases_mutex = Mutex.new
# A handler that will ensure that databases are closed and synced when the
# current process exits.
at_exit do
loop do
db = @databases_mutex.synchronize { @databases.first }
break unless db
warn "Daybreak database #{db.file} was not closed, state might be inconsistent"
begin
db.close
rescue Exception => ex
warn "Failed to close daybreak database: #{ex.message}"
end
end
end
class << self
# @api private
def register(db)
@databases_mutex.synchronize { @databases << db }
end
# @api private
def unregister(db)
@databases_mutex.synchronize { @databases.delete(db) }
end
end
# Create a new Daybreak::DB. The second argument is the default value
# to store when accessing a previously unset key, this follows the
# Hash standard.
# @param [String] file the path to the db file
# @param [Hash] options a hash that contains the options for creating a new
# database. You can pass in :serializer, :format or :default.
# @yield [key] a block that will return the default value to store.
# @yieldparam [String] key the key to be stored.
def initialize(file, options = {}, &block)
@file = file
@serializer = (options[:serializer] || Serializer::Default).new
@format = (options[:format] || Format).new
@default = block ? block : options[:default]
@queue = Queue.new
@table = {}
open
@mutex = Mutex.new # Mutex to make #lock thread safe
@worker = Thread.new(&method(:worker))
@worker.priority = -1
update
self.class.register(self)
end
# Return default value belonging to key
# @param key the default value to retrieve.
def default(key = nil)
@default.respond_to?(:call) ? @default.call(key) : @default
end
# Retrieve a value at key from the database. If the default value was specified
# when this database was created, that value will be set and returned. Aliased
# as get.
# @param key the value to retrieve from the database.
def [](key)
skey = @serializer.key_for(key)
value = @table[skey]
if value != nil || @table.has_key?(skey)
value
elsif @default
value = default(key)
@queue << [skey, value]
@table[skey] = value
end
end
alias_method :get, :'[]'
# Set a key in the database to be written at some future date. If the data
# needs to be persisted immediately, call db.set(key, value, true).
# @param [#to_s] key the key of the storage slot in the database
# @param value the value to store
def []=(key, value)
key = @serializer.key_for(key)
@queue << [key, value]
@table[key] = value
end
alias_method :set, :'[]='
# set! flushes data immediately to disk.
# @param key the key of the storage slot in the database
# @param value the value to store
def set!(key, value)
set(key, value)
flush
value
end
# Delete a key from the database
# @param key the key of the storage slot in the database
def delete(key)
key = @serializer.key_for(key)
@queue << [key]
@table.delete(key)
end
# Immediately delete the key on disk.
# @param key the key of the storage slot in the database
def delete!(key)
value = delete(key)
flush
value
end
# Does this db have a value for this key?
# @param key the key to check if the DB has a key.
def has_key?(key)
@table.has_key?(@serializer.key_for(key))
end
alias_method :key?, :has_key?
alias_method :include?, :has_key?
alias_method :member?, :has_key?
def has_value?(value)
@table.has_value?(value)
end
alias_method :value?, :has_value?
# Return the number of stored items.
# @return [Integer]
def size
@table.size
end
alias_method :length, :size
# Return true if database is empty.
# @return [Boolean]
def empty?
@table.empty?
end
# Iterate over the key, value pairs in the database.
# @yield [key, value] blk the iterator for each key value pair.
# @yieldparam key the key.
# @yieldparam value the value from the database.
def each(&block)
@table.each(&block)
end
# Return the keys in the db.
# @return [Array]
def keys
@table.keys
end
# Flush all changes to disk.
def flush
@queue.flush
end
# Sync the database with what is on disk, by first flushing changes, and
# then reading the file if necessary.
def sync
flush
update
end
# Lock the database for an exclusive commit accross processes and threads
# @yield a block where every change to the database is synced
def lock
@mutex.synchronize do
# We need a flush before exclusive
# so that @exclusive is not modified by the worker
flush
exclusive do
update
result = yield
flush
result
end
end
end
# Remove all keys and values from the database.
def clear
flush
with_tmpfile do |path, file|
file.write(@format.header)
file.close
# Clear acts like a compactification
File.rename(path, @file)
end
@table.clear
open
self
end
# Compact the database to remove stale commits and reduce the file size.
def compact
sync
with_tmpfile do |path, file|
compactsize = file.write(dump)
exclusive do
stat = @fd.stat
# Check if database was compactified at the same time
if stat.nlink > 0 && stat.ino == @inode
# Compactified database has the same size -> return
return self if stat.size == compactsize
# Append changed journal records if the database changed during compactification
file.write(read)
file.close
File.rename(path, @file)
end
end
end
open
update
self
end
# Close the database for reading and writing.
def close
@queue << nil
@worker.join
@fd.close
@queue.stop if @queue.respond_to?(:stop)
self.class.unregister(self)
nil
end
# Check to see if we've already closed the database.
def closed?
@fd.closed?
end
private
# Update the @table with records read from the file, and increment @logsize
def update
buf = new_records
until buf.empty?
record = @format.parse(buf)
if record.size == 1
@table.delete(record.first)
else
@table[record.first] = @serializer.load(record.last)
end
@logsize += 1
end
end
# Read new records from journal log and return buffer
def new_records
loop do
unless @exclusive
# HACK: JRuby returns false if the process is already hold by the same process
# see https://github.com/jruby/jruby/issues/496
Thread.pass until @fd.flock(File::LOCK_SH)
end
# Check if database was compactified in the meantime
# break if not
stat = @fd.stat
break if stat.nlink > 0 && stat.ino == @inode
open
end
# Read new journal records
read
ensure
@fd.flock(File::LOCK_UN) unless @exclusive
end
def open
@fd.close if @fd
@fd = File.open(@file, 'ab+')
@fd.advise(:sequential) if @fd.respond_to? :advise
stat = @fd.stat
@inode = stat.ino
@logsize = 0
if stat.size == 0
@fd.write(@format.header)
@fd.flush
end
@pos = nil
end
def read
# File was opened
unless @pos
@fd.pos = 0
@format.read_header(@fd)
else
@fd.pos = @pos
end
buf = @fd.read
@pos = @fd.pos
buf
end
# Return database dump as string
def dump
dump = @format.header
# each is faster than inject
@table.each do |record|
record[1] = @serializer.dump(record.last)
dump << @format.dump(record)
end
dump
end
# Worker thread
def worker
loop do
record = @queue.next
write_record(record) if record
@queue.pop
break unless record
end
rescue Exception => ex
warn "Daybreak worker: #{ex.message}"
retry
end
# Write record to output stream and
# advance input stream
def write_record(record)
record[1] = @serializer.dump(record.last) if record.size > 1
record = @format.dump(record)
exclusive do
@fd.write(record)
# Flush to make sure the file is really updated
@fd.flush
end
@pos = @fd.pos if @pos && @fd.pos == @pos + record.bytesize
@logsize += 1
end
# Lock database exclusively
def exclusive
return yield if @exclusive
begin
loop do
# HACK: JRuby returns false if the process is already hold by the same process
# see https://github.com/jruby/jruby/issues/496
Thread.pass until @fd.flock(File::LOCK_EX)
# Check if database was compactified in the meantime
# break if not
stat = @fd.stat
break if stat.nlink > 0 && stat.ino == @inode
open
end
@exclusive = true
yield
ensure
@fd.flock(File::LOCK_UN)
@exclusive = false
end
end
# Open temporary file and pass it to the block
def with_tmpfile
path = [@file, $$.to_s(36), Thread.current.object_id.to_s(36)].join
file = File.open(path, 'wb')
yield(path, file)
ensure
file.close unless file.closed?
File.unlink(path) if File.exists?(path)
end
end
end