lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.10.1 vs lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.11.2
- old
+ new
@@ -18,10 +18,13 @@
attr_accessor :io_wait_timeout
end
+ attr_reader :stats
+
+
# NOTE input/output can be lambdas for single asynchronic io evaluation
# the lambdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks)
# NOTE all ios are being opened and closed as soon as possible
def initialize(input, *outputs, keep_outputs_open_on_input_idle_limit: nil)
super() # NOTE for the monitor, apparently
@@ -31,11 +34,10 @@
@input = input
@outputs = outputs.map do |outp|
OpenStruct.new _io: outp, q: SizedQueue.new(ThreadedIoBuffer.blocks_max)
end
@stats = Stats.new(self)
- @terminate = false
@keep_outputs_open_on_input_idle_limit = keep_outputs_open_on_input_idle_limit
# @events = {}
Thread.new "io buffer main" do
init_reader!
@@ -43,11 +45,11 @@
init_writer_output! output
init_writer! output
end
Thread.join_children!.tap do
- Ffmprb.logger.debug "ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{@stats})"
+ Ffmprb.logger.debug "ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{stats})"
end
end
end
#
# def once(event, &blk)
@@ -64,11 +66,11 @@
# end
# end
# handle_synchronously :once
#
# def reader_done!
- # Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{@stats})"
+ # Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{stats})"
# fire! :reader_done
# end
#
# def terminated!
# fire! :terminated
@@ -90,11 +92,11 @@
# end
# handle_synchronously :fire!
#
def label
- "IObuff: Curr/Peak/Max=#{@stats.blocks_buff}/#{@stats.blocks_max}/#{ThreadedIoBuffer.blocks_max} In/Out=#{@stats.bytes_in}/#{@stats.bytes_out}"
+ "IObuff: Curr/Peak/Max=#{stats.blocks_buff}/#{stats.blocks_max}/#{ThreadedIoBuffer.blocks_max} In/Out=#{stats.bytes_in}/#{stats.bytes_out}"
end
private
class AllOutputsBrokenError < Error
@@ -121,65 +123,68 @@
# NOTE reads roughly as much input as writers can write, then closes the stream; times out on buffer overflow
def init_reader!
Thread.new("buffer reader") do
begin
input_io = reader_input!
- loop do
+ loop do # NOTE until EOFError, see below
s = ''
- begin
- while s.length < ThreadedIoBuffer.block_size
- timeouts = 0
- logged_timeouts = 1
- begin
- ss = input_io.read_nonblock(ThreadedIoBuffer.block_size - s.length)
- @stats.add_bytes_in ss.length
- s += ss
- rescue IO::WaitReadable
- if !@terminate && @stats.bytes_in > 0 && @stats.blocks_buff == 0 && @keep_outputs_open_on_input_idle_limit && timeouts * ThreadedIoBuffer.io_wait_timeout > @keep_outputs_open_on_input_idle_limit
- if s.length > 0
- output_enq! s
- s = '' # NOTE let's see if it helps outputting an incomplete block
- else
- Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving up after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{@stats.bytes_in}b closing outputs"
- @terminate = true
- output_enq! nil # NOTE EOF signal
- end
+ while s.length < ThreadedIoBuffer.block_size
+ timeouts = 0
+ logged_timeouts = 1
+ begin
+ ss = input_io.read_nonblock(ThreadedIoBuffer.block_size - s.length)
+ stats.add_bytes_in ss.length
+ s += ss
+ rescue IO::WaitReadable
+ if @keep_outputs_open_on_input_idle_limit && stats.bytes_in > 0 && stats.blocks_buff == 0 && timeouts * ThreadedIoBuffer.io_wait_timeout > @keep_outputs_open_on_input_idle_limit
+ if s.length > 0 # NOTE let's see if it helps outputting an incomplete block
+ Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving a chance to write #{s.length}/#{ThreadedIoBuffer.block_size}b after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b"
+ break
else
- timeouts += 1
- if !@terminate && timeouts > 2 * logged_timeouts
- Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) retrying... (#{timeouts} reads): #{$!.class}"
- logged_timeouts = timeouts
- end
- IO.select [input_io], nil, nil, ThreadedIoBuffer.io_wait_timeout
- retry
+ Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving up after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b, closing outputs"
+ raise EOFError
end
- rescue IO::WaitWritable # NOTE should not really happen, so just for conformance
- Ffmprb.logger.error "ThreadedIoBuffer reader (from #{input_io.path}) gets a #{$!} - should not really happen."
- IO.select nil, [input_io], nil, ThreadedIoBuffer.io_wait_timeout
+ else
+ Thread.current.live!
+ timeouts += 1
+ if timeouts > 2 * logged_timeouts
+ Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) retrying... (#{timeouts} reads): #{$!.class}"
+ logged_timeouts = timeouts
+ end
+ IO.select [input_io], nil, nil, ThreadedIoBuffer.io_wait_timeout
retry
end
+ rescue EOFError
+ output_enq! s
+ raise
+ rescue IO::WaitWritable # NOTE should not really happen, so just for conformance
+ Ffmprb.logger.error "ThreadedIoBuffer reader (from #{input_io.path}) gets a #{$!} - should not really happen."
+ IO.select nil, [input_io], nil, ThreadedIoBuffer.io_wait_timeout
+ retry
end
- ensure
- output_enq! s unless @terminate
end
+ output_enq! s
end
rescue EOFError
- unless @terminate
- Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) breaking off"
- @terminate = true
- output_enq! nil # NOTE EOF signal
- end
+ Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) breaking off"
rescue AllOutputsBrokenError
Ffmprb.logger.info "All outputs broken"
+ rescue Exception
+ @reader_failed = Error.new("Reader failed: #{$!}")
+ raise
ensure
begin
- reader_input!.close if reader_input!.respond_to?(:close)
+ output_enq! nil # NOTE EOF signal
rescue
+ end
+ begin
+ input_io.close if input_io.respond_to?(:close)
+ rescue
Ffmprb.logger.error "#{$!.class.name} closing ThreadedIoBuffer input: #{$!.message}"
end
# reader_done!
- Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{@stats})"
+ Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{stats})"
end
end
end
def init_writer_output!(output)
@@ -187,11 +192,11 @@
output.thr = Thread.new("buffer writer output helper") do
Ffmprb.logger.debug "Opening buffer output"
output.io =
Thread.timeout_or_live nil, log: "in the buffer writer helper thread", timeout: ThreadedIoBuffer.timeout do |time|
- fail Error, "giving up buffer writer init since the reader has failed (#{@terminate.message})" if @terminate.kind_of? Exception
+ fail Error, "giving up buffer writer init since the reader has failed (#{@reader_failed.message})" if @reader_failed
output._io.call
end
Ffmprb.logger.debug "Opened buffer output: #{output.io.path}"
end
end
@@ -199,25 +204,25 @@
# NOTE writes as much output as possible, then terminates when the reader dies
def init_writer!(output)
Thread.new("buffer writer") do
begin
output_io = writer_output!(output)
- while s = output.q.deq # NOTE until EOF signal
- @stats.blocks_for output, output.q.length
+ while s = output_deq!(output) # NOTE until EOF signal
timeouts = 0
logged_timeouts = 1
begin
- fail @terminate if @terminate.kind_of? Exception
- written = output_io.write_nonblock(s) if output_io # NOTE will only be nil if @terminate is an exception
- @stats.add_bytes_out written
+ fail @reader_failed if @reader_failed # NOTE otherwise, output_io should not be nil
+ written = output_io.write_nonblock(s)
+ stats.add_bytes_out written
- if written != s.length # NOTE kinda optimisation
+ if written != s.length
s = s[written..-1]
raise IO::EAGAINWaitWritable
end
rescue IO::WaitWritable
+ Thread.current.live!
timeouts += 1
if timeouts > 2 * logged_timeouts
Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) retrying... (#{timeouts} writes): #{$!.class}"
logged_timeouts = timeouts
end
@@ -234,16 +239,16 @@
Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) broken"
output.broken = true
ensure
# terminated!
begin
- writer_output!(output).close if !output.broken && writer_output!(output).respond_to?(:close)
- output.broken = true
+ output_io.close if !output.broken && output_io && output_io.respond_to?(:close)
rescue
Ffmprb.logger.error "#{$!.class.name} closing ThreadedIoBuffer output: #{$!.message}"
end
- Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io && output_io.path}) terminated (#{@stats})"
+ output.broken = true
+ Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io && output_io.path}) terminated (#{stats})"
end
end
end
#
# def wait_for_handler!
@@ -261,12 +266,11 @@
begin
# NOTE let's assume there's no race condition here between the possible timeout exception and enq
Timeout.timeout(ThreadedIoBuffer.timeout) do
output.q.enq item
end
- @stats.blocks_for output, output.q.length
- true
+ stats.blocks_for output
rescue Timeout::Error
next if output.broken
timeouts += 1
@@ -275,23 +279,30 @@
logged_timeouts = timeouts
end
retry unless timeouts >= ThreadedIoBuffer.timeout_limit # NOTE the queue has probably overflown
- @terminate = Error.new("the writer has failed with timeout limit while queuing")
+ @reader_failed ||= Error.new("the writer has failed with timeout limit while queuing") # NOTE screw the race condition
# timeout!
fail Error, "Looks like we're stuck (>#{ThreadedIoBuffer.timeout_limit*ThreadedIoBuffer.timeout}s idle) with #{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size}b blocks (buffering #{reader_input!.path}->...)..."
end
end.empty?
end
+ def output_deq!(outp)
+ outp.q.deq.tap do
+ stats.blocks_for outp
+ end
+ end
+
class Stats < OpenStruct
include MonitorMixin
def initialize(proc)
@proc = proc
- super blocks_max: 0, bytes_in: 0, bytes_out: 0
+ @output_blocks = {}
+ super blocks_buff: 0, blocks_max: 0, bytes_in: 0, bytes_out: 0
end
def add_bytes_in(n)
synchronize do
self.bytes_in += n
@@ -304,17 +315,17 @@
self.bytes_out += n
@proc.proc_vis_node @proc # NOTE update
end
end
- def blocks_for(outp, n)
+ def blocks_for(outp)
synchronize do
- if n > blocks_max
- self.blocks_max = n
+ blocks = @output_blocks[outp.object_id] = outp.q.length
+ if blocks > blocks_max
+ self.blocks_max = blocks
@proc.proc_vis_node @proc # NOTE update
end
- (@_outp_blocks ||= {})[outp] = n
- self.blocks_buff = @_outp_blocks.values.reduce(0, :+)
+ self.blocks_buff = @output_blocks.values.reduce(0, :+)
end
end
end