lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.9.6 vs lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.10.0

- old
+ new

@@ -1,48 +1,54 @@ +require 'ostruct' + module Ffmprb module Util # TODO the events mechanism is currently unused (and commented out) => synchro mechanism not needed - # XXX *partially* specc'ed in file_spec class ThreadedIoBuffer - # include Synchro + # XXX include Synchro + include ProcVis::Node class << self attr_accessor :blocks_max attr_accessor :block_size attr_accessor :timeout + attr_accessor :timeout_limit + attr_accessor :io_wait_timeout - def default_size - blocks_max * block_size - end - end + # NOTE input/output can be lambdas for single asynchronic io evaluation - # the labdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks) - # NOTE both ios are being opened and closed as soon as possible - def initialize(input, *outputs) # XXX SPEC ME!!! multiple outputs!! + # the lambdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks) + # NOTE all ios are being opened and closed as soon as possible + def initialize(input, *outputs, keep_outputs_open_on_input_idle_limit: nil) + super() # NOTE for the monitor, apparently + Ffmprb.logger.debug "ThreadedIoBuffer initializing with (#{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size})" + @input = input - @outputs = outputs.inject({}) do |hash, out| - hash[out] = SizedQueue.new(self.class.blocks_max) - hash + @outputs = outputs.map do |outp| + OpenStruct.new _io: outp, q: SizedQueue.new(ThreadedIoBuffer.blocks_max) end - @stat_blocks_max = 0 + @stats = Stats.new(self) @terminate = false + @keep_outputs_open_on_input_idle_limit = keep_outputs_open_on_input_idle_limit # @events = {} Thread.new "io buffer main" do init_reader! - outputs.each do |output| + @outputs.each do |output| init_writer_output! output init_writer! output end - Thread.join_children! + Thread.join_children!.tap do + Ffmprb.logger.debug "ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{@stats})" + end end end # # def once(event, &blk) # event = event.to_sym @@ -58,11 +64,11 @@ # end # end # handle_synchronously :once # # def reader_done! - # Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (blocks max: #{@stat_blocks_max})" + # Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{@stats})" # fire! :reader_done # end # # def terminated! # fire! :terminated @@ -70,11 +76,11 @@ # # def timeout! # fire! :timeout # end - protected + # protected # # def fire!(event) # wait_for_handler! # Ffmprb.logger.debug "ThreadedIoBuffer firing #{event}" # if blk = @events.to_h[event.to_sym] @@ -82,130 +88,236 @@ # end # @events[event.to_sym] = true # end # handle_synchronously :fire! # - def blocks_count - @outputs.values.map(&:size).max + + def label + "IObuff: Curr/Peak/Max=#{@stats.blocks_buff}/#{@stats.blocks_max}/#{ThreadedIoBuffer.blocks_max} In/Out=#{@stats.bytes_in}/#{@stats.bytes_out}" end private + class AllOutputsBrokenError < Error + end + def reader_input! # NOTE just for reader thread if @input.respond_to?(:call) Ffmprb.logger.debug "Opening buffer input" @input = @input.call Ffmprb.logger.debug "Opened buffer input: #{@input.path}" end @input end + # NOTE to be called after #init_writer_output! only def writer_output!(output) # NOTE just for writer thread - if @output_thrs[output] - @output_thrs[output].join - @output_thrs[output] = nil + if output.thr + output.thr.join + output.thr = nil end - @output_ios[output] + output.io end - # NOTE reads all of input, then closes the stream times out on buffer overflow + # NOTE reads roughly as much input as writers can write, then closes the stream; times out on buffer overflow def init_reader! Thread.new("buffer reader") do begin - while s = reader_input!.read(self.class.block_size) + input_io = reader_input! + loop do + s = '' begin - Timeout.timeout(self.class.timeout) do - output_enq s + while s.length < ThreadedIoBuffer.block_size + timeouts = 0 + logged_timeouts = 1 + begin + ss = input_io.read_nonblock(ThreadedIoBuffer.block_size - s.length) + @stats.add_bytes_in ss.length + s += ss + rescue IO::WaitReadable + if !@terminate && @stats.bytes_in > 0 && @stats.blocks_buff == 0 && @keep_outputs_open_on_input_idle_limit && timeouts * ThreadedIoBuffer.io_wait_timeout > @keep_outputs_open_on_input_idle_limit + if s.length > 0 + output_enq! s + s = '' # NOTE let's see if it helps outputting an incomplete block + else + Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving up after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{@stats.bytes_in}b closing outputs" + @terminate = true + output_enq! nil # NOTE EOF signal + end + else + timeouts += 1 + if !@terminate && timeouts > 2 * logged_timeouts + Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) retrying... (#{timeouts} reads): #{$!.class}" + logged_timeouts = timeouts + end + IO.select [input_io], nil, nil, ThreadedIoBuffer.io_wait_timeout + retry + end + rescue IO::WaitWritable # NOTE should not really happen, so just for conformance + Ffmprb.logger.error "ThreadedIoBuffer reader (from #{input_io.path}) gets a #{$!} - should not really happen." + IO.select nil, [input_io], nil, ThreadedIoBuffer.io_wait_timeout + retry + end end - rescue Timeout::Error # NOTE the queue is probably overflown - @terminate = Error.new("The reader has failed with timeout while queuing") - # timeout! - fail Error, "Looks like we're stuck (#{timeout}s idle) with #{self.class.blocks_max}x#{self.class.block_size}B blocks (buffering #{reader_input!.path}->...)..." + ensure + output_enq! s unless @terminate end - @stat_blocks_max = blocks_count if blocks_count > @stat_blocks_max end - @terminate = true - output_enq nil + rescue EOFError + unless @terminate + Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) breaking off" + @terminate = true + output_enq! nil # NOTE EOF signal + end + rescue AllOutputsBrokenError + Ffmprb.logger.info "All outputs broken" ensure begin reader_input!.close if reader_input!.respond_to?(:close) rescue - Ffmprb.logger.error "ThreadedIoBuffer input closing error: #{$!.message}" + Ffmprb.logger.error "#{$!.class.name} closing ThreadedIoBuffer input: #{$!.message}" end # reader_done! - Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (blocks max: #{@stat_blocks_max})" + Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{@stats})" end end end def init_writer_output!(output) - @output_ios ||= {} - return @output_ios[output] = output unless output.respond_to?(:call) + return output.io = output._io unless output._io.respond_to?(:call) - @output_thrs ||= {} - @output_thrs[output] = Thread.new("buffer writer output helper") do + output.thr = Thread.new("buffer writer output helper") do Ffmprb.logger.debug "Opening buffer output" - @output_ios[output] = - Thread.timeout_or_live nil, log: "in the buffer writer helper thread", timeout: self.class.timeout do |time| - fail Error, "giving up buffer writer init since the reader has failed (#{@terminate.message})" if @terminate.kind_of?(Exception) - output.call + output.io = + Thread.timeout_or_live nil, log: "in the buffer writer helper thread", timeout: ThreadedIoBuffer.timeout do |time| + fail Error, "giving up buffer writer init since the reader has failed (#{@terminate.message})" if @terminate.kind_of? Exception + output._io.call end - Ffmprb.logger.debug "Opened buffer output: #{@output_ios[output].path}" + Ffmprb.logger.debug "Opened buffer output: #{output.io.path}" end end # NOTE writes as much output as possible, then terminates when the reader dies def init_writer!(output) Thread.new("buffer writer") do - broken = false begin - while s = @outputs[output].deq - next if broken - written = 0 - tries = 1 - logged_tries = 1/2 - while !broken - fail @terminate if @terminate.kind_of?(Exception) - begin - output_io = writer_output!(output) - written = output_io.write_nonblock(s) if output # NOTE will only be nil if @terminate is an exception - break if written == s.length # NOTE kinda optimisation + output_io = writer_output!(output) + while s = output.q.deq # NOTE until EOF signal + @stats.blocks_for output, output.q.length + timeouts = 0 + logged_timeouts = 1 + begin + fail @terminate if @terminate.kind_of? Exception + written = output_io.write_nonblock(s) if output_io # NOTE will only be nil if @terminate is an exception + @stats.add_bytes_out written + + if written != s.length # NOTE kinda optimisation s = s[written..-1] - rescue Errno::EAGAIN, Errno::EWOULDBLOCK - if tries == 2 * logged_tries - Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) retrying... (#{tries} writes): #{$!.class}" - logged_tries = tries - end - sleep 0.01 - rescue Errno::EPIPE - broken = true - Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) broken" - ensure - tries += 1 + raise IO::EAGAINWaitWritable end + + rescue IO::WaitWritable + timeouts += 1 + if timeouts > 2 * logged_timeouts + Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) retrying... (#{timeouts} writes): #{$!.class}" + logged_timeouts = timeouts + end + IO.select nil, [output_io], nil, ThreadedIoBuffer.io_wait_timeout + retry + rescue IO::WaitReadable # NOTE should not really happen, so just for conformance + Ffmprb.logger.error "ThreadedIoBuffer writer (to #{output_io.path}) gets a #{$!} - should not really happen." + IO.select [output_io], nil, ThreadedIoBuffer.io_wait_timeout + retry end end + Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) breaking off" + rescue Errno::EPIPE + Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) broken" + output.broken = true ensure # terminated! begin - writer_output!(output).close if !broken && writer_output!(output).respond_to?(:close) + writer_output!(output).close if !output.broken && writer_output!(output).respond_to?(:close) + output.broken = true rescue - Ffmprb.logger.error "ThreadedIoBuffer output closing error: #{$!.message}" + Ffmprb.logger.error "#{$!.class.name} closing ThreadedIoBuffer output: #{$!.message}" end - Ffmprb.logger.debug "ThreadedIoBuffer writer terminated (blocks max: #{@stat_blocks_max})" + Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io && output_io.path}) terminated (#{@stats})" end end end # # def wait_for_handler! # @handler_thr.join if @handler_thr # @handler_thr = nil # end - def output_enq(item) - @outputs.values.each do |q| - q.enq item + def output_enq!(item) + fail AllOutputsBrokenError if + @outputs.select do |output| + next if output.broken + + timeouts = 0 + logged_timeouts = 1 + begin + # NOTE let's assume there's no race condition here between the possible timeout exception and enq + Timeout.timeout(ThreadedIoBuffer.timeout) do + output.q.enq item + end + @stats.blocks_for output, output.q.length + true + + rescue Timeout::Error + next if output.broken + + timeouts += 1 + if timeouts == 2 * logged_timeouts + Ffmprb.logger.warn "A little bit of timeout (>#{timeouts*ThreadedIoBuffer.timeout}s idle) with #{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size}b blocks (buffering #{reader_input!.path}->...; #{@outputs.reject(&:io).size}/#{@outputs.size} unopen/total)" + logged_timeouts = timeouts + end + + retry unless timeouts >= ThreadedIoBuffer.timeout_limit # NOTE the queue has probably overflown + + @terminate = Error.new("the writer has failed with timeout limit while queuing") + # timeout! + fail Error, "Looks like we're stuck (>#{ThreadedIoBuffer.timeout_limit*ThreadedIoBuffer.timeout}s idle) with #{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size}b blocks (buffering #{reader_input!.path}->...)..." + end + end.empty? + end + + class Stats < OpenStruct + include MonitorMixin + + def initialize(proc) + @proc = proc + super blocks_max: 0, bytes_in: 0, bytes_out: 0 end + + def add_bytes_in(n) + synchronize do + self.bytes_in += n + @proc.proc_vis_node @proc # NOTE update + end + end + + def add_bytes_out(n) + synchronize do + self.bytes_out += n + @proc.proc_vis_node @proc # NOTE update + end + end + + def blocks_for(outp, n) + synchronize do + if n > blocks_max + self.blocks_max = n + @proc.proc_vis_node @proc # NOTE update + end + (@_outp_blocks ||= {})[outp] = n + self.blocks_buff = @_outp_blocks.values.reduce(0, :+) + end + end + end end end