lib/daybreak/writer.rb in daybreak-0.1.2 vs lib/daybreak/writer.rb in daybreak-0.1.3

- old
+ new

@@ -38,24 +38,21 @@ end private def open! - @fd = File.open @file, 'a' - @fd.binmode + @fd = File.open @file, 'ab' if defined?(Fcntl::O_NONBLOCK) f = @fd.fcntl(Fcntl::F_GETFL, 0) @fd.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | f) end end # Workers handle the actual fiddly bits of asynchronous io and # and handle background writes. class Worker - include Locking - def initialize(fd) @queue = Queue.new @fd = fd @thread = Thread.new { work } at_exit { finish! } @@ -67,42 +64,51 @@ end # Loop and block if we don't have work to do or if # the file isn't ready for another write just yet. def work - buf = '' - loop do + buf, finished = '', false + until finished && buf.empty? record = @queue.pop - unless record - @fd.flush - break + if record + buf << Record.serialize(record) + else + finished = true end - buf << Record.serialize(record) read, write = IO.select [], [@fd] if write and fd = write.first - lock(fd, File::LOCK_EX) { buf = try_write fd, buf } + lock(fd) { buf = try_write fd, buf } end end + @fd.flush end # Try and write the buffer to the file via non blocking file writes. # If the write fails try again. def try_write(fd, buf) - begin - if defined?(Fcntl::O_NONBLOCK) - s = fd.write_nonblock(buf) - else - s = fd.write(buf) - end - if s < buf.length - buf = buf[s..-1] # didn't finish - else - buf = '' - end - rescue Errno::EAGAIN - buf = buf # try this again + if defined?(Fcntl::O_NONBLOCK) + s = fd.write_nonblock(buf) + else + s = fd.write(buf) end + if s < buf.length + buf = buf[s..-1] # didn't finish + else + buf = "" + end buf + rescue Errno::EAGAIN + buf + end + + # Lock a file with the type <tt>lock</tt> + def lock(fd) + fd.flock File::LOCK_EX + begin + yield + ensure + fd.flock File::LOCK_UN + end end # finish! and start up another worker thread. def flush! finish!