lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.7.5 vs lib/ffmprb/util/threaded_io_buffer.rb in ffmprb-0.9.0
- old
+ new
@@ -1,11 +1,11 @@
module Ffmprb
module Util
- # XXX the events mechanism is currently unused (and commented out) => synchro mechanism not needed
- # XXX partially specc'ed in file_spec
+ # TODO the events mechanism is currently unused (and commented out) => synchro mechanism not needed
+ # XXX *partially* specc'ed in file_spec
class ThreadedIoBuffer
# include Synchro
class << self
@@ -20,23 +20,27 @@
end
# NOTE input/output can be lambdas for single asynchronic io evaluation
# the labdas must be timeout-interrupt-safe (since they are wrapped in timeout blocks)
# NOTE both ios are being opened and closed as soon as possible
- def initialize(input, output)
+ def initialize(input, *outputs) # XXX SPEC ME!!! multiple outputs!!
@input = input
- @output = output
- @q = SizedQueue.new(self.class.blocks_max)
+ @outputs = outputs.inject({}) do |hash, out|
+ hash[out] = SizedQueue.new(self.class.blocks_max)
+ hash
+ end
@stat_blocks_max = 0
@terminate = false
# @events = {}
Thread.new "io buffer main" do
init_reader!
- init_writer_output!
- init_writer!
+ outputs.each do |output|
+ init_writer_output! output
+ init_writer! output
+ end
Thread.join_children!
end
end
#
@@ -79,11 +83,11 @@
# @events[event.to_sym] = true
# end
# handle_synchronously :fire!
#
def blocks_count
- @q.size
+ @outputs.values.map(&:size).max
end
private
def reader_input! # NOTE just for reader thread
@@ -93,50 +97,36 @@
Ffmprb.logger.debug "Opened buffer input: #{@input.path}"
end
@input
end
- def writer_output! # NOTE just for writer thread
- if @output_thr
- @output_thr.join
- @output_thr = nil
+ def writer_output!(output) # NOTE just for writer thread
+ if @output_thrs[output]
+ @output_thrs[output].join
+ @output_thrs[output] = nil
end
- @output unless @output.respond_to?(:call)
+ @output_ios[output]
end
- def init_writer_output!
- return unless @output.respond_to?(:call)
-
- @output_thr = Thread.new("buffer writer output helper") do
- Ffmprb.logger.debug "Opening buffer output"
- @output =
- Thread.timeout_or_live nil, log: "in the buffer writer helper thread", timeout: self.class.timeout do |time|
- fail Error, "giving up buffer writer init since the reader has failed (#{@terminate.message})" if @terminate.kind_of?(Exception)
- @output.call
- end
- Ffmprb.logger.debug "Opened buffer output: #{@output.path}"
- end
- end
-
# NOTE reads all of input, then closes the stream times out on buffer overflow
def init_reader!
Thread.new("buffer reader") do
begin
while s = reader_input!.read(self.class.block_size)
begin
Timeout.timeout(self.class.timeout) do
- @q.enq s
+ output_enq s
end
rescue Timeout::Error # NOTE the queue is probably overflown
@terminate = Error.new("The reader has failed with timeout while queuing")
# timeout!
fail Error, "Looks like we're stuck (#{timeout}s idle) with #{self.class.blocks_max}x#{self.class.block_size}B blocks (buffering #{reader_input!.path}->...)..."
end
@stat_blocks_max = blocks_count if blocks_count > @stat_blocks_max
end
@terminate = true
- @q.enq nil
+ output_enq nil
ensure
begin
reader_input!.close if reader_input!.respond_to?(:close)
rescue
Ffmprb.logger.error "ThreadedIoBuffer input closing error: #{$!.message}"
@@ -145,45 +135,61 @@
Ffmprb.logger.debug "ThreadedIoBuffer reader terminated (blocks max: #{@stat_blocks_max})"
end
end
end
+ def init_writer_output!(output)
+ @output_ios ||= {}
+ return @output_ios[output] = output unless output.respond_to?(:call)
+
+ @output_thrs ||= {}
+ @output_thrs[output] = Thread.new("buffer writer output helper") do
+ Ffmprb.logger.debug "Opening buffer output"
+ @output_ios[output] =
+ Thread.timeout_or_live nil, log: "in the buffer writer helper thread", timeout: self.class.timeout do |time|
+ fail Error, "giving up buffer writer init since the reader has failed (#{@terminate.message})" if @terminate.kind_of?(Exception)
+ output.call
+ end
+ Ffmprb.logger.debug "Opened buffer output: #{@output_ios[output].path}"
+ end
+ end
+
# NOTE writes as much output as possible, then terminates when the reader dies
- def init_writer!
+ def init_writer!(output)
Thread.new("buffer writer") do
broken = false
begin
- while s = @q.deq
+ while s = @outputs[output].deq
next if broken
written = 0
tries = 1
logged_tries = 1/2
while !broken
fail @terminate if @terminate.kind_of?(Exception)
begin
- output = writer_output!
- written = output.write_nonblock(s) if output # NOTE will only be nil if @terminate is an exception
+ output_io = writer_output!(output)
+ written = output_io.write_nonblock(s) if output # NOTE will only be nil if @terminate is an exception
break if written == s.length # NOTE kinda optimisation
s = s[written..-1]
rescue Errno::EAGAIN, Errno::EWOULDBLOCK
if tries == 2 * logged_tries
- Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output.path}) retrying... (#{tries} writes): #{$!.class}"
+ Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) retrying... (#{tries} writes): #{$!.class}"
logged_tries = tries
end
sleep 0.01
rescue Errno::EPIPE
broken = true
- Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output.path}) broken"
+ Ffmprb.logger.debug "ThreadedIoBuffer writer (to #{output_io.path}) broken"
ensure
tries += 1
end
end
end
ensure
# terminated!
begin
- writer_output!.close if !broken && writer_output!.respond_to?(:close)
+ writer_output!(output).close if !broken && writer_output!(output).respond_to?(:close)
rescue
Ffmprb.logger.error "ThreadedIoBuffer output closing error: #{$!.message}"
end
Ffmprb.logger.debug "ThreadedIoBuffer writer terminated (blocks max: #{@stat_blocks_max})"
end
@@ -192,9 +198,15 @@
#
# def wait_for_handler!
# @handler_thr.join if @handler_thr
# @handler_thr = nil
# end
+
+ def output_enq(item)
+ @outputs.values.each do |q|
+ q.enq item
+ end
+ end
end
end