lib/scout/open/stream.rb in scout-gear-6.0.0 vs lib/scout/open/stream.rb in scout-gear-7.1.0

- old
+ new

@@ -29,17 +29,20 @@ consumer_thread = Thread.new(Thread.current) do |parent| Thread.current["name"] = "Consumer #{Log.fingerprint io}" Thread.current.report_on_exception = false consume_stream(io, false, into, into_close) end + io.threads.push(consumer_thread) if io.respond_to?(:threads) + Thread.pass until consumer_thread["name"] + consumer_thread else if into - Log.medium "Consuming stream #{Log.fingerprint io} -> #{Log.fingerprint into}" + Log.low "Consuming stream #{Log.fingerprint io} -> #{Log.fingerprint into}" else - Log.medium "Consuming stream #{Log.fingerprint io}" + Log.low "Consuming stream #{Log.fingerprint io}" end begin into = into.find if Path === into @@ -51,11 +54,10 @@ into.sync = true if IO === into into_close = false unless into.respond_to? :close io.sync = true - Log.high "started consuming stream #{Log.fingerprint io}" begin while c = io.readpartial(BLOCK_SIZE) into << c if into end rescue EOFError @@ -65,19 +67,18 @@ io.close unless io.closed? into.join if into and into_close and into.respond_to?(:joined?) and not into.joined? into.close if into and into_close and not into.closed? block.call if block_given? - Log.high "Done consuming stream #{Log.fingerprint io} into #{into_path || into}" c rescue Aborted - Log.high "Consume stream Aborted #{Log.fingerprint io} into #{into_path || into}" + Log.low "Consume stream Aborted #{Log.fingerprint io} into #{into_path || into}" io.abort $! if io.respond_to? :abort into.close if into.respond_to?(:closed?) && ! into.closed? FileUtils.rm into_path if into_path and File.exist?(into_path) rescue Exception - Log.high "Consume stream Exception reading #{Log.fingerprint io} into #{into_path || into} - #{$!.message}" + Log.low "Consume stream Exception reading #{Log.fingerprint io} into #{into_path || into} - #{$!.message}" exception = io.stream_exception || $! io.abort exception if io.respond_to? :abort into.close if into.respond_to?(:closed?) && ! into.closed? into_path = into if into_path.nil? && String === into if into_path and File.exist?(into_path) @@ -143,16 +144,16 @@ Open.touch path if File.exist? path content.join if content.respond_to?(:join) and not Path === content and not (content.respond_to?(:joined?) && content.joined?) Open.notify_write(path) rescue Aborted - Log.medium "Aborted sensible_write -- #{ Log.reset << Log.color(:blue, path) }" + Log.low "Aborted sensible_write -- #{ Log.reset << Log.color(:blue, path) }" content.abort if content.respond_to? :abort Open.rm path if File.exist? path rescue Exception exception = (AbortedStream === content and content.exception) ? content.exception : $! - Log.medium "Exception in sensible_write: [#{Process.pid}] #{exception.message} -- #{ Log.color :blue, path }" + Log.low "Exception in sensible_write: [#{Process.pid}] #{exception.message} -- #{ Log.color :blue, path }" content.abort if content.respond_to? :abort Open.rm path if File.exist? path raise exception rescue raise $! @@ -217,20 +218,19 @@ if do_fork #parent_pid = Process.pid pid = Process.fork { - purge_pipes(sin) - sout.close begin + purge_pipes(sin) + sout.close yield sin sin.close if close and not sin.closed? rescue Exception Log.exception $! - #Process.kill :INT, parent_pid Kernel.exit!(-1) end Kernel.exit! 0 } sin.close @@ -240,22 +240,22 @@ ConcurrentStream.setup sin, :pair => sout ConcurrentStream.setup sout, :pair => sin thread = Thread.new do - Thread.current["name"] = "Pipe input #{Log.fingerprint sin} => #{Log.fingerprint sout}" - Thread.current.report_on_exception = false begin + Thread.current.report_on_exception = false + Thread.current["name"] = "Pipe input #{Log.fingerprint sin} => #{Log.fingerprint sout}" yield sin sin.close if close and not sin.closed? and not sin.aborted? rescue Aborted - Log.medium "Aborted open_pipe: #{$!.message}" + Log.low "Aborted open_pipe: #{$!.message}" raise $! rescue Exception - Log.medium "Exception in open_pipe: #{$!.message}" + Log.low "Exception in open_pipe: #{$!.message}" begin sout.threads.delete(Thread.current) sout.pair = [] sout.abort($!) if sout.respond_to?(:abort) sin.threads.delete(Thread.current) @@ -267,10 +267,11 @@ end end sin.threads = [thread] sout.threads = [thread] + Thread.pass until thread["name"] end sout end @@ -285,12 +286,12 @@ filename = stream.filename if stream.respond_to? :filename splitter_thread = Thread.new(Thread.current) do |parent| begin - Thread.current["name"] = "Splitter #{Log.fingerprint stream}" Thread.current.report_on_exception = false + Thread.current["name"] = "Splitter #{Log.fingerprint stream}" skip = [false] * num begin while block = stream.readpartial(BLOCK_SIZE) @@ -315,11 +316,11 @@ rescue Aborted, Interrupt stream.abort if stream.respond_to? :abort out_pipes.each do |sout| sout.abort if sout.respond_to? :abort end - Log.medium "Tee aborting #{Log.fingerprint stream}" + Log.low "Tee aborting #{Log.fingerprint stream}" raise $! rescue Exception begin stream.abort($!) if stream.respond_to?(:abort) && ! stream.aborted? out_pipes.reverse.each do |sout| @@ -330,11 +331,11 @@ end end in_pipes.each do |sin| sin.close unless sin.closed? end - Log.medium "Tee exception #{Log.fingerprint stream}" + Log.low "Tee exception #{Log.fingerprint stream}" rescue Log.exception $! ensure in_pipes.each do |sin| sin.close unless sin.closed? @@ -346,10 +347,10 @@ out_pipes.each do |sout| ConcurrentStream.setup sout, :threads => splitter_thread, :filename => filename, :pair => stream end - splitter_thread.wakeup until splitter_thread["name"] + Thread.pass until splitter_thread["name"] main_pipe = out_pipes.first main_pipe.autojoin = true main_pipe.callback = Proc.new do