require 'fcntl' class MPlayer class Worker # :nodoc:all include ColorDebugMessages class Stream include ColorDebugMessages MATCH_STDOUT = { # :nodoc: :version => { :re => /^MPlayer\s(\S+)\s\(C\)/, :stat => [:version] }, :server => { :re => /^Connecting to server (\S+)\[(\d+\.\d+\.\d+\.\d+)\]:/, :stat => [:server, :server_ip] }, :stream_info => { :re => /^ICY Info: StreamTitle='(.*?)';StreamUrl='(.*?)';/, :stat => [:stream_title, :stream_url] }, :position_percent => { :re => /^ANS_PERCENT_POSITION=(\d+[.0-9]*)/, :stat => [:position] }, :position_seconds => { :re => /^ANS_TIME_POSITION=(\d+[.0-9]*)/, :stat => [:played_seconds] }, :total_time => { :re => /^ANS_LENGTH=(\d+[.0-9]*)/, :stat => [:total_time] }, :audio_info => { :re => /^AUDIO: (\d+) Hz, (\d+) ch, (\S+), ([0-9.]+) kbit/, :stat => [:audio_sample_rate, :audio_channels, :audio_format, :audio_data_rate], :call => :audio_stats }, :video_info => { :re => /^VIDEO:\s+\[(\S{4})\]\s+(\d+)x(\d+)\s+(\d+)bpp\s+(\d+\.\d+)\s+fps/, :stat => [:video_fourcc, :video_x_size, :video_y_size, :video_bpp, :video_fps], :call => :video_stats }, :video_decoder => { :re => /^Opening video decoder: \[(\S+)\]/, :stat => [:video_decoder] }, :audio_decoder => { :re => /^Opening audio decoder: \[(\S+)\]/, :stat => [:audio_decoder] }, :video_codec => { :re => /^Selected video codec: \[(\S+)\]/, :stat => [:video_codec] }, :audio_codec => { :re => /^Selected audio codec: \[(\S+)\]/, :stat => [:audio_codec] } } MATCH_STDERR = { # :nodoc: :file_not_found => { :re => /^File not found: /, :call => :file_error } } attr_reader :parent, :type, :io def initialize(p, w, stream_type, stream_io) @parent = p @worker = w @type = stream_type @io = stream_io @line = '' @outlist = Array.new @stats = Hash.new @select_wait_time = p.opts[:select_wait_time] @sent_update_position = false end def prefix(msg) "STREAM [#{@type}] #{msg}" end def debug(msg); super prefix(msg); end def info(msg); super prefix(msg); end def warn(msg); super prefix(msg); end def stream_error(type) @worker.flag_stream_error(type) end def callback!(name, *args) case name when :update_stat stat = args[0] val = args[1] if @stats[stat] == val return # only propagate changes else @stats[stat] = val end end @worker.queue_callback [name, args] end def check_line(patterns, line) patterns.each_pair do |name, pat| if md = pat[:re].match(line) args = md.captures.map do |x| case x when /^\d+$/ then Integer(x) when /^\d+\.\d+$/ then Float(x) else x end end (pat[:stat] || []).each do |field| callback! :update_stat, field, args.shift end callback! pat[:call] if pat[:call] return name end end nil end def process_stdout(line) check_line(MATCH_STDOUT, line) end def process_stderr(line) if check_line(MATCH_STDERR, line) stream_error(:stderr) end end def process_line(line) # debug "LINE> \"#{line}\"" send "process_#{@type}", line # callback! @type, line end def process_stream lines = @io.gets("\r") or return stream_error(:eof) lines.split(/\n/).each do |line| process_line(line.chomp) end end def run @thread = Thread.new do @alive = true begin # @io.fcntl(Fcntl::F_SETFL,Fcntl::O_NONBLOCK) debug "start" process_stream while @alive debug "clean end!" rescue IOError => e if e.to_s =~ /stream closed/ debug "stream closed!" else raise BadStream, e.to_s end rescue => e warn "Unexpected error when parsing MPlayer's IO stream!" warn "error was: #{e}" stream_error(:exception) ensure cleanup end end end def cleanup @io.close unless @io.closed? end def kill @alive = false cleanup end def join @thread.join if @thread @thread = nil end end attr_reader :parent, :io def initialize(p) @parent = p @pid = nil @streams = Array.new @pending = Array.new @mutex = Mutex.new @failed = nil @thread_safe_callbacks = @parent.opts[:thread_safe_callbacks] @shutdown_in_progress = false begin info "running mplayer >>> #{cmdline}" @io_stdin, @io_stdout, @io_stderr = Open3.popen3(cmdline) create_stream(:stdout, @io_stdout) create_stream(:stderr, @io_stderr) send_each_stream :run rescue raise BadStream, "couldn't create streams to mplayer: #{$!}" end debug "mplayer threads created!" end def cmdline(target = parent.opts[:path]) cmd = "#{parent.opts[:program]} -slave " cmd += "-wid #{parent.opts[:embed].call} " if parent.opts[:embed] cmd += "-playlist " if target=~ /\.m3u$/ cmd += '"' + target.to_s + '"' end def lock! @mutex.synchronize do yield end end def queue_callback(args) if @thread_safe_callbacks lock! do @pending.push(args) end else @parent.callback! args.first, *(args.last) end end def dispatch_callbacks return unless @thread_safe_callbacks list = nil lock! do list = @pending @pending = Array.new end list.each do |args| @parent.callback! args.first, *(args.last) end end def send_to_stdin(str) begin @io_stdin.puts str rescue => e warn "Couldn't write to mplayer's stdin!" warn "error was: #{e}" shutdown! end end def send_command(*args) cmd = args.join(' ') if @io_stdin.nil? debug "cannot send \"#{cmd}\" - stdin closed" else Command.validate! args send_to_stdin cmd end end def create_stream(type, io) returning Stream.new(parent, self, type, io) do |stream| @streams.push(stream) end end def send_each_stream(*args) cmd = args.shift cmd_str = "#{cmd.to_s}(#{args.join(', ')})" if @streams.length < 1 warn "No streams available for \"cmd_str\"" else debug "Sending each stream: #{cmd_str}" @streams.each do |stream| if stream.respond_to? cmd stream.send(cmd, *args) else raise BadStream, "stream command not valid: #{cmd_str}" end end end end def flag_stream_error(type) lock! do @failed = type if @failed.nil? end end def ok? get_position dispatch_callbacks err = nil lock! do err = @failed end return true if err.nil? and @streams.length > 0 case err when :eof info "MPlayer process shut itself down!" close_stdin when :stderr warn "Caugh error message on MPlayer's STDERR" when :exception warn "Unexpected IO stream failure!" end shutdown! end def close_stdin @io_stdin.close if @io_stdin and !@io_stdin.closed? @io_stdin = nil end def get_position send_command :get_time_pos send_command :get_percent_pos end def startup! send_command :get_time_length @parent.callback! :startup end def shutdown! if @shutdown_in_progress debug "shutdown already in progress, skipping shutdown call..." return end @parent.callback! :pre_shutdown # give mplayer it's close signal debug "Sending QUIT to mplayer..." @shutdown_in_progress = true send_command :quit # close our side of the IO close_stdin # then wait for the threads to cleanup after themselves info "Waiting for worker thread to exit..." send_each_stream :kill send_each_stream :join @streams = Array.new info "MPlayer process cleaned up!" @parent.callback! :shutdown end end end