lib/daybreak/writer.rb in daybreak-0.1.2 vs lib/daybreak/writer.rb in daybreak-0.1.3
- old
+ new
@@ -38,24 +38,21 @@
end
private
def open!
- @fd = File.open @file, 'a'
- @fd.binmode
+ @fd = File.open @file, 'ab'
if defined?(Fcntl::O_NONBLOCK)
f = @fd.fcntl(Fcntl::F_GETFL, 0)
@fd.fcntl(Fcntl::F_SETFL, Fcntl::O_NONBLOCK | f)
end
end
# Workers handle the actual fiddly bits of asynchronous io and
# and handle background writes.
class Worker
- include Locking
-
def initialize(fd)
@queue = Queue.new
@fd = fd
@thread = Thread.new { work }
at_exit { finish! }
@@ -67,42 +64,51 @@
end
# Loop and block if we don't have work to do or if
# the file isn't ready for another write just yet.
def work
- buf = ''
- loop do
+ buf, finished = '', false
+ until finished && buf.empty?
record = @queue.pop
- unless record
- @fd.flush
- break
+ if record
+ buf << Record.serialize(record)
+ else
+ finished = true
end
- buf << Record.serialize(record)
read, write = IO.select [], [@fd]
if write and fd = write.first
- lock(fd, File::LOCK_EX) { buf = try_write fd, buf }
+ lock(fd) { buf = try_write fd, buf }
end
end
+ @fd.flush
end
# Try and write the buffer to the file via non blocking file writes.
# If the write fails try again.
def try_write(fd, buf)
- begin
- if defined?(Fcntl::O_NONBLOCK)
- s = fd.write_nonblock(buf)
- else
- s = fd.write(buf)
- end
- if s < buf.length
- buf = buf[s..-1] # didn't finish
- else
- buf = ''
- end
- rescue Errno::EAGAIN
- buf = buf # try this again
+ if defined?(Fcntl::O_NONBLOCK)
+ s = fd.write_nonblock(buf)
+ else
+ s = fd.write(buf)
end
+ if s < buf.length
+ buf = buf[s..-1] # didn't finish
+ else
+ buf = ""
+ end
buf
+ rescue Errno::EAGAIN
+ buf
+ end
+
+ # Lock a file with the type <tt>lock</tt>
+ def lock(fd)
+ fd.flock File::LOCK_EX
+ begin
+ yield
+ ensure
+ fd.flock File::LOCK_UN
+ end
end
# finish! and start up another worker thread.
def flush!
finish!