lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.7.5 vs lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.9.0

- old
+ new

@@ -1,11 +1,11 @@ module Ffmprb module Util - # XXX the events mechanism is currently unused (and commented out) => synchro mechanism not needed - # XXX partially specc'ed in file_spec + # TODO the events mechanism is currently unused (and commented out) => synchro mechanism not needed + # XXX *partially* specc'ed in file_spec class ThreadedIoBuffer # include Synchro class << self @@ -20,23 +20,27 @@ end # NOTE input/output can be lambdas for single asynchronic io evaluation # the labdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks) # NOTE both ios are being opened and closed as soon as possible - def initialize(input, output) + def initialize(input, *outputs) # XXX SPEC ME!!! multiple outputs!! @input = input - @output = output - @q = SizedQueue.new(self.class.blocks_max) + @outputs = outputs.inject({}) do |hash, out| + hash[out] = SizedQueue.new(self.class.blocks_max) + hash + end @stat_blocks_max = 0 @terminate = false # @events = {} Thread.new "io buffer main" do init_reader! - init_writer_output! - init_writer! + outputs.each do |output| + init_writer_output! output + init_writer! output + end Thread.join_children! end end # @@ -79,11 +83,11 @@ # @events[event.to_sym] = true # end # handle_synchronously :fire! # def blocks_count - @q.size + @outputs.values.map(&:size).max end private def reader_input! # NOTE just for reader thread @@ -93,50 +97,36 @@ Ffmprb.logger.debug "Opened buffer input: #{@input.path}" end @input end - def writer_output! # NOTE just for writer thread - if @output_thr - @output_thr.join - @output_thr = nil + def writer_output!(output) # NOTE just for writer thread + if @output_thrs[output] + @output_thrs[output].join + @output_thrs[output] = nil end - @output unless @output.respond_to?(:call) + @output_ios[output] end - def init_writer_output! - return unless @output.respond_to?(:call) - - @output_thr = Thread.new("buffer writer output helper") do - Ffmprb.logger.debug "Opening buffer output" - @output = - Thread.timeout_or_live nil, log: "in the buffer writer helper thread", timeout: self.class.timeout do |time| - fail Error, "giving up buffer writer init since the reader has failed (#{@terminate.message})" if @terminate.kind_of?(Exception) - @output.call - end - Ffmprb.logger.debug "Opened buffer output: #{@output.path}" - end - end - # NOTE reads all of input, then closes the stream times out on buffer overflow def init_reader! Thread.new("buffer reader") do begin while s = reader_input!.read(self.class.block_size) begin Timeout.timeout(self.class.timeout) do - @q.enq s + output_enq s end rescue Timeout::Error # NOTE the queue is probably overflown @terminate = Error.new("The reader has failed with timeout while queuing") # timeout! fail Error, "Looks like we're stuck (#{timeout}s idle) with #{self.class.blocks_max}x#{self.class.block_size}B blocks (buffering #{reader_input!.path}->...)..." end @stat_blocks_max = blocks_count if blocks_count > @stat_blocks_max end @terminate = true - @q.enq nil + output_enq nil ensure begin reader_input!.close if reader_input!.respond_to?(:close) rescue Ffmprb.logger.error "ThreadedIoBuffer input closing error: #{$!.message}" @@ -145,45 +135,61 @@ Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (blocks max: #{@stat_blocks_max})" end end end + def init_writer_output!(output) + @output_ios ||= {} + return @output_ios[output] = output unless output.respond_to?(:call) + + @output_thrs ||= {} + @output_thrs[output] = Thread.new("buffer writer output helper") do + Ffmprb.logger.debug "Opening buffer output" + @output_ios[output] = + Thread.timeout_or_live nil, log: "in the buffer writer helper thread", timeout: self.class.timeout do |time| + fail Error, "giving up buffer writer init since the reader has failed (#{@terminate.message})" if @terminate.kind_of?(Exception) + output.call + end + Ffmprb.logger.debug "Opened buffer output: #{@output_ios[output].path}" + end + end + # NOTE writes as much output as possible, then terminates when the reader dies - def init_writer! + def init_writer!(output) Thread.new("buffer writer") do broken = false begin - while s = @q.deq + while s = @outputs[output].deq next if broken written = 0 tries = 1 logged_tries = 1/2 while !broken fail @terminate if @terminate.kind_of?(Exception) begin - output = writer_output! - written = output.write_nonblock(s) if output # NOTE will only be nil if @terminate is an exception + output_io = writer_output!(output) + written = output_io.write_nonblock(s) if output # NOTE will only be nil if @terminate is an exception break if written == s.length # NOTE kinda optimisation s = s[written..-1] rescue Errno::EAGAIN, Errno::EWOULDBLOCK if tries == 2 * logged_tries - Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output.path}) retrying... (#{tries} writes): #{$!.class}" + Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) retrying... (#{tries} writes): #{$!.class}" logged_tries = tries end sleep 0.01 rescue Errno::EPIPE broken = true - Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output.path}) broken" + Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) broken" ensure tries += 1 end end end ensure # terminated! begin - writer_output!.close if !broken && writer_output!.respond_to?(:close) + writer_output!(output).close if !broken && writer_output!(output).respond_to?(:close) rescue Ffmprb.logger.error "ThreadedIoBuffer output closing error: #{$!.message}" end Ffmprb.logger.debug "ThreadedIoBuffer writer terminated (blocks max: #{@stat_blocks_max})" end @@ -192,9 +198,15 @@ # # def wait_for_handler! # @handler_thr.join if @handler_thr # @handler_thr = nil # end + + def output_enq(item) + @outputs.values.each do |q| + q.enq item + end + end end end