lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.10.1 vs lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.11.2

- old
+ new

@@ -18,10 +18,13 @@ attr_accessor :io_wait_timeout end + attr_reader :stats + + # NOTE input/output can be lambdas for single asynchronic io evaluation # the lambdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks) # NOTE all ios are being opened and closed as soon as possible def initialize(input, *outputs, keep_outputs_open_on_input_idle_limit: nil) super() # NOTE for the monitor, apparently @@ -31,11 +34,10 @@ @input = input @outputs = outputs.map do |outp| OpenStruct.new _io: outp, q: SizedQueue.new(ThreadedIoBuffer.blocks_max) end @stats = Stats.new(self) - @terminate = false @keep_outputs_open_on_input_idle_limit = keep_outputs_open_on_input_idle_limit # @events = {} Thread.new "io buffer main" do init_reader! @@ -43,11 +45,11 @@ init_writer_output! output init_writer! output end Thread.join_children!.tap do - Ffmprb.logger.debug "ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{@stats})" + Ffmprb.logger.debug "ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{stats})" end end end # # def once(event, &blk) @@ -64,11 +66,11 @@ # end # end # handle_synchronously :once # # def reader_done! - # Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{@stats})" + # Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{stats})" # fire! :reader_done # end # # def terminated! # fire! :terminated @@ -90,11 +92,11 @@ # end # handle_synchronously :fire! # def label - "IObuff: Curr/Peak/Max=#{@stats.blocks_buff}/#{@stats.blocks_max}/#{ThreadedIoBuffer.blocks_max} In/Out=#{@stats.bytes_in}/#{@stats.bytes_out}" + "IObuff: Curr/Peak/Max=#{stats.blocks_buff}/#{stats.blocks_max}/#{ThreadedIoBuffer.blocks_max} In/Out=#{stats.bytes_in}/#{stats.bytes_out}" end private class AllOutputsBrokenError < Error @@ -121,65 +123,68 @@ # NOTE reads roughly as much input as writers can write, then closes the stream; times out on buffer overflow def init_reader! Thread.new("buffer reader") do begin input_io = reader_input! - loop do + loop do # NOTE until EOFError, see below s = '' - begin - while s.length < ThreadedIoBuffer.block_size - timeouts = 0 - logged_timeouts = 1 - begin - ss = input_io.read_nonblock(ThreadedIoBuffer.block_size - s.length) - @stats.add_bytes_in ss.length - s += ss - rescue IO::WaitReadable - if !@terminate && @stats.bytes_in > 0 && @stats.blocks_buff == 0 && @keep_outputs_open_on_input_idle_limit && timeouts * ThreadedIoBuffer.io_wait_timeout > @keep_outputs_open_on_input_idle_limit - if s.length > 0 - output_enq! s - s = '' # NOTE let's see if it helps outputting an incomplete block - else - Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving up after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{@stats.bytes_in}b closing outputs" - @terminate = true - output_enq! nil # NOTE EOF signal - end + while s.length < ThreadedIoBuffer.block_size + timeouts = 0 + logged_timeouts = 1 + begin + ss = input_io.read_nonblock(ThreadedIoBuffer.block_size - s.length) + stats.add_bytes_in ss.length + s += ss + rescue IO::WaitReadable + if @keep_outputs_open_on_input_idle_limit && stats.bytes_in > 0 && stats.blocks_buff == 0 && timeouts * ThreadedIoBuffer.io_wait_timeout > @keep_outputs_open_on_input_idle_limit + if s.length > 0 # NOTE let's see if it helps outputting an incomplete block + Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving a chance to write #{s.length}/#{ThreadedIoBuffer.block_size}b after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b" + break else - timeouts += 1 - if !@terminate && timeouts > 2 * logged_timeouts - Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) retrying... (#{timeouts} reads): #{$!.class}" - logged_timeouts = timeouts - end - IO.select [input_io], nil, nil, ThreadedIoBuffer.io_wait_timeout - retry + Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving up after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b, closing outputs" + raise EOFError end - rescue IO::WaitWritable # NOTE should not really happen, so just for conformance - Ffmprb.logger.error "ThreadedIoBuffer reader (from #{input_io.path}) gets a #{$!} - should not really happen." - IO.select nil, [input_io], nil, ThreadedIoBuffer.io_wait_timeout + else + Thread.current.live! + timeouts += 1 + if timeouts > 2 * logged_timeouts + Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) retrying... (#{timeouts} reads): #{$!.class}" + logged_timeouts = timeouts + end + IO.select [input_io], nil, nil, ThreadedIoBuffer.io_wait_timeout retry end + rescue EOFError + output_enq! s + raise + rescue IO::WaitWritable # NOTE should not really happen, so just for conformance + Ffmprb.logger.error "ThreadedIoBuffer reader (from #{input_io.path}) gets a #{$!} - should not really happen." + IO.select nil, [input_io], nil, ThreadedIoBuffer.io_wait_timeout + retry end - ensure - output_enq! s unless @terminate end + output_enq! s end rescue EOFError - unless @terminate - Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) breaking off" - @terminate = true - output_enq! nil # NOTE EOF signal - end + Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) breaking off" rescue AllOutputsBrokenError Ffmprb.logger.info "All outputs broken" + rescue Exception + @reader_failed = Error.new("Reader failed: #{$!}") + raise ensure begin - reader_input!.close if reader_input!.respond_to?(:close) + output_enq! nil # NOTE EOF signal rescue + end + begin + input_io.close if input_io.respond_to?(:close) + rescue Ffmprb.logger.error "#{$!.class.name} closing ThreadedIoBuffer input: #{$!.message}" end # reader_done! - Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{@stats})" + Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{stats})" end end end def init_writer_output!(output) @@ -187,11 +192,11 @@ output.thr = Thread.new("buffer writer output helper") do Ffmprb.logger.debug "Opening buffer output" output.io = Thread.timeout_or_live nil, log: "in the buffer writer helper thread", timeout: ThreadedIoBuffer.timeout do |time| - fail Error, "giving up buffer writer init since the reader has failed (#{@terminate.message})" if @terminate.kind_of? Exception + fail Error, "giving up buffer writer init since the reader has failed (#{@reader_failed.message})" if @reader_failed output._io.call end Ffmprb.logger.debug "Opened buffer output: #{output.io.path}" end end @@ -199,25 +204,25 @@ # NOTE writes as much output as possible, then terminates when the reader dies def init_writer!(output) Thread.new("buffer writer") do begin output_io = writer_output!(output) - while s = output.q.deq # NOTE until EOF signal - @stats.blocks_for output, output.q.length + while s = output_deq!(output) # NOTE until EOF signal timeouts = 0 logged_timeouts = 1 begin - fail @terminate if @terminate.kind_of? Exception - written = output_io.write_nonblock(s) if output_io # NOTE will only be nil if @terminate is an exception - @stats.add_bytes_out written + fail @reader_failed if @reader_failed # NOTE otherwise, output_io should not be nil + written = output_io.write_nonblock(s) + stats.add_bytes_out written - if written != s.length # NOTE kinda optimisation + if written != s.length s = s[written..-1] raise IO::EAGAINWaitWritable end rescue IO::WaitWritable + Thread.current.live! timeouts += 1 if timeouts > 2 * logged_timeouts Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) retrying... (#{timeouts} writes): #{$!.class}" logged_timeouts = timeouts end @@ -234,16 +239,16 @@ Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) broken" output.broken = true ensure # terminated! begin - writer_output!(output).close if !output.broken && writer_output!(output).respond_to?(:close) - output.broken = true + output_io.close if !output.broken && output_io && output_io.respond_to?(:close) rescue Ffmprb.logger.error "#{$!.class.name} closing ThreadedIoBuffer output: #{$!.message}" end - Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io && output_io.path}) terminated (#{@stats})" + output.broken = true + Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io && output_io.path}) terminated (#{stats})" end end end # # def wait_for_handler! @@ -261,12 +266,11 @@ begin # NOTE let's assume there's no race condition here between the possible timeout exception and enq Timeout.timeout(ThreadedIoBuffer.timeout) do output.q.enq item end - @stats.blocks_for output, output.q.length - true + stats.blocks_for output rescue Timeout::Error next if output.broken timeouts += 1 @@ -275,23 +279,30 @@ logged_timeouts = timeouts end retry unless timeouts >= ThreadedIoBuffer.timeout_limit # NOTE the queue has probably overflown - @terminate = Error.new("the writer has failed with timeout limit while queuing") + @reader_failed ||= Error.new("the writer has failed with timeout limit while queuing") # NOTE screw the race condition # timeout! fail Error, "Looks like we're stuck (>#{ThreadedIoBuffer.timeout_limit*ThreadedIoBuffer.timeout}s idle) with #{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size}b blocks (buffering #{reader_input!.path}->...)..." end end.empty? end + def output_deq!(outp) + outp.q.deq.tap do + stats.blocks_for outp + end + end + class Stats < OpenStruct include MonitorMixin def initialize(proc) @proc = proc - super blocks_max: 0, bytes_in: 0, bytes_out: 0 + @output_blocks = {} + super blocks_buff: 0, blocks_max: 0, bytes_in: 0, bytes_out: 0 end def add_bytes_in(n) synchronize do self.bytes_in += n @@ -304,17 +315,17 @@ self.bytes_out += n @proc.proc_vis_node @proc # NOTE update end end - def blocks_for(outp, n) + def blocks_for(outp) synchronize do - if n > blocks_max - self.blocks_max = n + blocks = @output_blocks[outp.object_id] = outp.q.length + if blocks > blocks_max + self.blocks_max = blocks @proc.proc_vis_node @proc # NOTE update end - (@_outp_blocks ||= {})[outp] = n - self.blocks_buff = @_outp_blocks.values.reduce(0, :+) + self.blocks_buff = @output_blocks.values.reduce(0, :+) end end end