lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.11.3 vs lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.11.4
- old
+ new
@@ -27,11 +27,11 @@
# the lambdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks)
# NOTE all ios are being opened and closed as soon as possible
def initialize(input, *outputs, keep_outputs_open_on_input_idle_limit: nil)
super() # NOTE for the monitor, apparently
- Ffmprb.logger.debug "ThreadedIoBuffer initializing with (#{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size})"
+ Ffmprb.logger.debug{"ThreadedIoBuffer initializing with (#{ThreadedIoBuffer.blocks_max}x#{ThreadedIoBuffer.block_size})"}
@input = input
@outputs = outputs.map do |outp|
OpenStruct.new _io: outp, q: SizedQueue.new(ThreadedIoBuffer.blocks_max)
end
@@ -45,32 +45,33 @@
init_writer_output! output
init_writer! output
end
Thread.join_children!.tap do
- Ffmprb.logger.debug "ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{stats})"
+ Ffmprb.logger.debug{"ThreadedIoBuffer (#{@input.path}->#{@outputs.map(&:io).map(&:path)}) terminated successfully (#{stats})"}
end
end
end
+ # TODO?
#
# def once(event, &blk)
# event = event.to_sym
# wait_for_handler!
# if @events[event].respond_to? :call
# fail Error, "Once upon a time (one #once(event) at a time) please"
# elsif @events[event]
- # Ffmprb.logger.debug "ThreadedIoBuffer (post-)reacting to #{event}"
+ # Ffmprb.logger.debug{"ThreadedIoBuffer (post-)reacting to #{event}"}
# @handler_thr = Util::Thread.new "#{event} handler", &blk
# else
- # Ffmprb.logger.debug "ThreadedIoBuffer subscribing to #{event}"
+ # Ffmprb.logger.debug{"ThreadedIoBuffer subscribing to #{event}"}
# @events[event] = blk
# end
# end
# handle_synchronously :once
#
# def reader_done!
- # Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{stats})"
+ # Ffmprb.logger.debug{"ThreadedIoBuffer reader terminated (#{stats})"}
# fire! :reader_done
# end
#
# def terminated!
# fire! :terminated
@@ -82,11 +83,11 @@
# protected
#
# def fire!(event)
# wait_for_handler!
- # Ffmprb.logger.debug "ThreadedIoBuffer firing #{event}"
+ # Ffmprb.logger.debug{"ThreadedIoBuffer firing #{event}"}
# if blk = @events.to_h[event.to_sym]
# @handler_thr = Util::Thread.new "#{event} handler", &blk
# end
# @events[event.to_sym] = true
# end
@@ -102,13 +103,13 @@
class AllOutputsBrokenError < Error
end
def reader_input! # NOTE just for reader thread
if @input.respond_to?(:call)
- Ffmprb.logger.debug "Opening buffer input"
+ Ffmprb.logger.debug{"Opening buffer input"}
@input = @input.call
- Ffmprb.logger.debug "Opened buffer input: #{@input.path}"
+ Ffmprb.logger.debug{"Opened buffer input: #{@input.path}"}
end
@input
end
# NOTE to be called after #init_writer_output! only
@@ -135,21 +136,21 @@
stats.add_bytes_in ss.length
s += ss
rescue IO::WaitReadable
if @keep_outputs_open_on_input_idle_limit && stats.bytes_in > 0 && stats.blocks_buff == 0 && timeouts * ThreadedIoBuffer.io_wait_timeout > @keep_outputs_open_on_input_idle_limit
if s.length > 0 # NOTE let's see if it helps outputting an incomplete block
- Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving a chance to write #{s.length}/#{ThreadedIoBuffer.block_size}b after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b"
+ Ffmprb.logger.debug{"ThreadedIoBuffer reader (from #{input_io.path}) giving a chance to write #{s.length}/#{ThreadedIoBuffer.block_size}b after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b"}
break
else
- Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) giving up after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b, closing outputs"
+ Ffmprb.logger.debug{"ThreadedIoBuffer reader (from #{input_io.path}) giving up after waiting >#{@keep_outputs_open_on_input_idle_limit}s, after reading #{stats.bytes_in}b, closing outputs"}
raise EOFError
end
else
Thread.current.live!
timeouts += 1
if timeouts > 2 * logged_timeouts
- Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) retrying... (#{timeouts} reads): #{$!.class}"
+ Ffmprb.logger.debug{"ThreadedIoBuffer reader (from #{input_io.path}) retrying... (#{timeouts} reads): #{$!.class}"}
logged_timeouts = timeouts
end
IO.select [input_io], nil, nil, ThreadedIoBuffer.io_wait_timeout
retry
end
@@ -163,11 +164,11 @@
end
end
output_enq! s
end
rescue EOFError
- Ffmprb.logger.debug "ThreadedIoBuffer reader (from #{input_io.path}) breaking off"
+ Ffmprb.logger.debug{"ThreadedIoBuffer reader (from #{input_io.path}) breaking off"}
rescue AllOutputsBrokenError
Ffmprb.logger.info "All outputs broken"
rescue Exception
@reader_failed = Error.new("Reader failed: #{$!}")
raise
@@ -180,26 +181,26 @@
input_io.close if input_io.respond_to?(:close)
rescue
Ffmprb.logger.error "#{$!.class.name} closing ThreadedIoBuffer input: #{$!.message}"
end
# reader_done!
- Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (#{stats})"
+ Ffmprb.logger.debug{"ThreadedIoBuffer reader terminated (#{stats})"}
end
end
end
def init_writer_output!(output)
return output.io = output._io unless output._io.respond_to?(:call)
output.thr = Thread.new("buffer writer output helper") do
- Ffmprb.logger.debug "Opening buffer output"
+ Ffmprb.logger.debug{"Opening buffer output"}
output.io =
Thread.timeout_or_live nil, log: "in the buffer writer helper thread", timeout: ThreadedIoBuffer.timeout do |time|
fail Error, "giving up buffer writer init since the reader has failed (#{@reader_failed.message})" if @reader_failed
output._io.call
end
- Ffmprb.logger.debug "Opened buffer output: #{output.io.path}"
+ Ffmprb.logger.debug{"Opened buffer output: #{output.io.path}"}
end
end
# NOTE writes as much output as possible, then terminates when the reader dies
def init_writer!(output)
@@ -221,33 +222,33 @@
rescue IO::WaitWritable
Thread.current.live!
timeouts += 1
if timeouts > 2 * logged_timeouts
- Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) retrying... (#{timeouts} writes): #{$!.class}"
+ Ffmprb.logger.debug{"ThreadedIoBuffer writer (to #{output_io.path}) retrying... (#{timeouts} writes): #{$!.class}"}
logged_timeouts = timeouts
end
IO.select nil, [output_io], nil, ThreadedIoBuffer.io_wait_timeout
retry
rescue IO::WaitReadable # NOTE should not really happen, so just for conformance
Ffmprb.logger.error "ThreadedIoBuffer writer (to #{output_io.path}) gets a #{$!} - should not really happen."
IO.select [output_io], nil, ThreadedIoBuffer.io_wait_timeout
retry
end
end
- Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) breaking off"
+ Ffmprb.logger.debug{"ThreadedIoBuffer writer (to #{output_io.path}) breaking off"}
rescue Errno::EPIPE
- Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) broken"
+ Ffmprb.logger.debug{"ThreadedIoBuffer writer (to #{output_io.path}) broken"}
output.broken = true
ensure
# terminated!
begin
output_io.close if !output.broken && output_io && output_io.respond_to?(:close)
rescue
Ffmprb.logger.error "#{$!.class.name} closing ThreadedIoBuffer output: #{$!.message}"
end
output.broken = true
- Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io && output_io.path}) terminated (#{stats})"
+ Ffmprb.logger.debug{"ThreadedIoBuffer writer (to #{output_io && output_io.path}) terminated (#{stats})"}
end
end
end
#
# def wait_for_handler!