lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.11.3 vs lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.11.4

- old
+ new

@@ -27,11 +27,11 @@ # the lambdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks) # NOTE all ios are being opened and closed as soon as possible def initialize(input, *outputs, keep_outputs_open_on_input_idle_limit: nil) super() # NOTE for the monitor, apparently - Ffmprb.logger.debug "ThreadedIoBuffer initializing with (#{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size})" + Ffmprb.logger.debug{"ThreadedIoBuffer initializing with (#{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size})"} @input = input @outputs = outputs.map do |outp| OpenStruct.new _io: outp, q: SizedQueue.new(ThreadedIoBuffer.blocks_max) end @@ -45,32 +45,33 @@ init_writer_output! output init_writer! output end Thread.join_children!.tap do - Ffmprb.logger.debug "ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{stats})" + Ffmprb.logger.debug{"ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{stats})"} end end end + # TODO? # # def once(event, &blk) # event = event.to_sym # wait_for_handler! # if @events[event].respond_to? :call # fail Error, "Once upon a time (one #once(event) at a time) please" # elsif @events[event] - # Ffmprb.logger.debug "ThreadedIoBuffer (post-)reacting to #{event}" + # Ffmprb.logger.debug{"ThreadedIoBuffer (post-)reacting to #{event}"} # @handler_thr = Util::Thread.new "#{event} handler", &blk # else - # Ffmprb.logger.debug "ThreadedIoBuffer subscribing to #{event}" + # Ffmprb.logger.debug{"ThreadedIoBuffer subscribing to #{event}"} # @events[event] = blk # end # end # handle_synchronously :once # # def reader_done! - # Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{stats})" + # Ffmprb.logger.debug{"ThreadedIoBuffer reader terminated (#{stats})"} # fire! :reader_done # end # # def terminated! # fire! :terminated @@ -82,11 +83,11 @@ # protected # # def fire!(event) # wait_for_handler! - # Ffmprb.logger.debug "ThreadedIoBuffer firing #{event}" + # Ffmprb.logger.debug{"ThreadedIoBuffer firing #{event}"} # if blk = @events.to_h[event.to_sym] # @handler_thr = Util::Thread.new "#{event} handler", &blk # end # @events[event.to_sym] = true # end @@ -102,13 +103,13 @@ class AllOutputsBrokenError < Error end def reader_input! # NOTE just for reader thread if @input.respond_to?(:call) - Ffmprb.logger.debug "Opening buffer input" + Ffmprb.logger.debug{"Opening buffer input"} @input = @input.call - Ffmprb.logger.debug "Opened buffer input: #{@input.path}" + Ffmprb.logger.debug{"Opened buffer input: #{@input.path}"} end @input end # NOTE to be called after #init_writer_output! only @@ -135,21 +136,21 @@ stats.add_bytes_in ss.length s += ss rescue IO::WaitReadable if @keep_outputs_open_on_input_idle_limit && stats.bytes_in > 0 && stats.blocks_buff == 0 && timeouts * ThreadedIoBuffer.io_wait_timeout > @keep_outputs_open_on_input_idle_limit if s.length > 0 # NOTE let's see if it helps outputting an incomplete block - Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving a chance to write #{s.length}/#{ThreadedIoBuffer.block_size}b after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b" + Ffmprb.logger.debug{"ThreadedIoBuffer reader (from #{input_io.path}) giving a chance to write #{s.length}/#{ThreadedIoBuffer.block_size}b after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b"} break else - Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving up after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b, closing outputs" + Ffmprb.logger.debug{"ThreadedIoBuffer reader (from #{input_io.path}) giving up after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b, closing outputs"} raise EOFError end else Thread.current.live! timeouts += 1 if timeouts > 2 * logged_timeouts - Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) retrying... (#{timeouts} reads): #{$!.class}" + Ffmprb.logger.debug{"ThreadedIoBuffer reader (from #{input_io.path}) retrying... (#{timeouts} reads): #{$!.class}"} logged_timeouts = timeouts end IO.select [input_io], nil, nil, ThreadedIoBuffer.io_wait_timeout retry end @@ -163,11 +164,11 @@ end end output_enq! s end rescue EOFError - Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) breaking off" + Ffmprb.logger.debug{"ThreadedIoBuffer reader (from #{input_io.path}) breaking off"} rescue AllOutputsBrokenError Ffmprb.logger.info "All outputs broken" rescue Exception @reader_failed = Error.new("Reader failed: #{$!}") raise @@ -180,26 +181,26 @@ input_io.close if input_io.respond_to?(:close) rescue Ffmprb.logger.error "#{$!.class.name} closing ThreadedIoBuffer input: #{$!.message}" end # reader_done! - Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{stats})" + Ffmprb.logger.debug{"ThreadedIoBuffer reader terminated (#{stats})"} end end end def init_writer_output!(output) return output.io = output._io unless output._io.respond_to?(:call) output.thr = Thread.new("buffer writer output helper") do - Ffmprb.logger.debug "Opening buffer output" + Ffmprb.logger.debug{"Opening buffer output"} output.io = Thread.timeout_or_live nil, log: "in the buffer writer helper thread", timeout: ThreadedIoBuffer.timeout do |time| fail Error, "giving up buffer writer init since the reader has failed (#{@reader_failed.message})" if @reader_failed output._io.call end - Ffmprb.logger.debug "Opened buffer output: #{output.io.path}" + Ffmprb.logger.debug{"Opened buffer output: #{output.io.path}"} end end # NOTE writes as much output as possible, then terminates when the reader dies def init_writer!(output) @@ -221,33 +222,33 @@ rescue IO::WaitWritable Thread.current.live! timeouts += 1 if timeouts > 2 * logged_timeouts - Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) retrying... (#{timeouts} writes): #{$!.class}" + Ffmprb.logger.debug{"ThreadedIoBuffer writer (to #{output_io.path}) retrying... (#{timeouts} writes): #{$!.class}"} logged_timeouts = timeouts end IO.select nil, [output_io], nil, ThreadedIoBuffer.io_wait_timeout retry rescue IO::WaitReadable # NOTE should not really happen, so just for conformance Ffmprb.logger.error "ThreadedIoBuffer writer (to #{output_io.path}) gets a #{$!} - should not really happen." IO.select [output_io], nil, ThreadedIoBuffer.io_wait_timeout retry end end - Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) breaking off" + Ffmprb.logger.debug{"ThreadedIoBuffer writer (to #{output_io.path}) breaking off"} rescue Errno::EPIPE - Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) broken" + Ffmprb.logger.debug{"ThreadedIoBuffer writer (to #{output_io.path}) broken"} output.broken = true ensure # terminated! begin output_io.close if !output.broken && output_io && output_io.respond_to?(:close) rescue Ffmprb.logger.error "#{$!.class.name} closing ThreadedIoBuffer output: #{$!.message}" end output.broken = true - Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io && output_io.path}) terminated (#{stats})" + Ffmprb.logger.debug{"ThreadedIoBuffer writer (to #{output_io && output_io.path}) terminated (#{stats})"} end end end # # def wait_for_handler!