lib/ffmprb/util.rb in ffmprb-0.7.0 vs lib/ffmprb/util.rb in ffmprb-0.7.3

- old
+ new

@@ -1,61 +1,92 @@ # require 'ffmprb/util/synchro' require 'ffmprb/util/thread' -require 'ffmprb/util/io_buffer' +require 'ffmprb/util/threaded_io_buffer' require 'open3' module Ffmprb module Util + class TimeLimitError < Error; end + class << self attr_accessor :ffmpeg_cmd, :ffprobe_cmd + attr_accessor :cmd_timeout - def ffprobe(*args) - sh *ffprobe_cmd, *args + def ffprobe(*args, limit: nil, timeout: cmd_timeout) + sh *ffprobe_cmd, *args, limit: limit, timeout: timeout end - def ffmpeg(*args) + def ffmpeg(*args, limit: nil, timeout: cmd_timeout) args = ['-loglevel', 'debug'] + args if Ffmprb.debug - sh *ffmpeg_cmd, '-y', *args, output: :stderr + sh *ffmpeg_cmd, '-y', *args, output: :stderr, limit: limit, timeout: timeout end - def sh(*cmd, output: :stdout, log: :stderr) - cmd = cmd.to_a.map(&:to_s) - cmd_str = cmd.join(' ') - Ffmprb.logger.info cmd_str - Open3.popen3(*cmd) do |stdin, stdout, stderr, wait_thr| - stdin.close + def sh(*cmd, output: :stdout, log: :stderr, limit: nil, timeout: cmd_timeout) + cmd = cmd.map &:to_s unless cmd.size == 1 + cmd_str = cmd.size != 1 ? cmd.map{|c| "\"#{c}\""}.join(' ') : cmd.first + timeout = [timeout, limit].compact.min + thr = Thread.new "`#{cmd_str}`" do + Ffmprb.logger.info "Popening `#{cmd_str}`..." + Open3.popen3(*cmd) do |stdin, stdout, stderr, wait_thr| + begin + stdin.close - # XXX process timeouting/cleanup here will be appreciated + log_cmd = cmd.first.upcase if log + stdout_r = Reader.new(stdout, output == :stdout, log == :stdout && log_cmd) + stderr_r = Reader.new(stderr, true, log == :stderr && log_cmd) - begin - log_cmd = "#{(cmd.respond_to?(:first)? cmd : cmd.split(' ')).first.upcase}: " if log - stdout_r = Reader.new(stdout, output == :stdout, log == :stdout && log_cmd) - stderr_r = Reader.new(stderr, true, log == :stderr && log_cmd) + Thread.timeout_or_live(limit, log: "while waiting for `#{cmd_str}`", timeout: timeout) do |time| + fail Error, "#{cmd_str}:\n#{stderr_r.read}" unless + wait_thr.value.exitstatus == 0 # NOTE blocking + end + Ffmprb.logger.debug "FINISHED: #{cmd_str}" - raise Error, "#{cmd_str}:\n#{stderr_r.read}" unless - wait_thr.value.exitstatus == 0 # NOTE blocks + Thread.join_children! limit, timeout: timeout - # NOTE only one of them will return non-nil, see above - stdout_r.read || stderr_r.read - ensure - begin - stdout_r.join if stdout_r - stdout_r = nil - stderr_r.join if stderr_r - rescue - Ffmprb.logger.error "Thread joining error: #{$!.message}" - stderr_r.join if stdout_r + # NOTE only one of them will return non-nil, see above + stdout_r.read || stderr_r.read + ensure + process_dead! wait_thr, cmd_str, limit end - Ffmprb.logger.debug "FINISHED: #{cmd_str}" end end + thr.value end + protected + + def process_dead!(wait_thr, cmd_str, limit) + grace = limit ? limit/4 : 1 + return unless wait_thr.alive? + + # NOTE a simplistic attempt to gracefully terminate a child process + # the successful completion is via exception... + begin + Ffmprb.logger.info "Sorry it came to this, but I'm terminating `#{cmd_str}`(#{wait_thr.pid})..." + ::Process.kill 'TERM', wait_thr.pid + sleep grace + Ffmprb.logger.info "Very sorry it came to this, but I'm terminating `#{cmd_str}`(#{wait_thr.pid}) again..." + ::Process.kill 'TERM', wait_thr.pid + sleep grace + Ffmprb.logger.warn "Die `#{cmd_str}`(#{wait_thr.pid}), die!.. (killing amok)" + ::Process.kill 'KILL', wait_thr.pid + sleep grace + Ffmprb.logger.warn "Checking if `#{cmd_str}`(#{wait_thr.pid}) finally dead..." + ::Process.kill 0, wait_thr.pid + Ffmprb.logger.error "Still alive -- `#{cmd_str}`(#{wait_thr.pid}), giving up..." + rescue Errno::ESRCH + Ffmprb.logger.info "Apparently `#{cmd_str}`(#{wait_thr.pid}) is dead..." + end + + fail Error, "System error or something: waiting for the thread running `#{cmd_str}`(#{wait_thr.pid})..." unless + wait_thr.join limit + end + end class Reader < Thread @@ -63,11 +94,11 @@ @output = '' @queue = Queue.new super "reader" do begin while s = input.gets - Ffmprb.logger.debug log + s.chomp if log + Ffmprb.logger.debug "#{log}: #{s.chomp}" if log @output << s if store end @queue.enq @output rescue Exception @queue.enq Error.new("Exception in a reader thread") @@ -76,10 +107,10 @@ end def read case res = @queue.deq when Exception - raise res + fail res when '' nil else res end