lib/scout/open/stream.rb in scout-gear-6.0.0 vs lib/scout/open/stream.rb in scout-gear-7.1.0
- old
+ new
@@ -29,17 +29,20 @@
consumer_thread = Thread.new(Thread.current) do |parent|
Thread.current["name"] = "Consumer #{Log.fingerprint io}"
Thread.current.report_on_exception = false
consume_stream(io, false, into, into_close)
end
+
io.threads.push(consumer_thread) if io.respond_to?(:threads)
+ Thread.pass until consumer_thread["name"]
+
consumer_thread
else
if into
- Log.medium "Consuming stream #{Log.fingerprint io} -> #{Log.fingerprint into}"
+ Log.low "Consuming stream #{Log.fingerprint io} -> #{Log.fingerprint into}"
else
- Log.medium "Consuming stream #{Log.fingerprint io}"
+ Log.low "Consuming stream #{Log.fingerprint io}"
end
begin
into = into.find if Path === into
@@ -51,11 +54,10 @@
into.sync = true if IO === into
into_close = false unless into.respond_to? :close
io.sync = true
- Log.high "started consuming stream #{Log.fingerprint io}"
begin
while c = io.readpartial(BLOCK_SIZE)
into << c if into
end
rescue EOFError
@@ -65,19 +67,18 @@
io.close unless io.closed?
into.join if into and into_close and into.respond_to?(:joined?) and not into.joined?
into.close if into and into_close and not into.closed?
block.call if block_given?
- Log.high "Done consuming stream #{Log.fingerprint io} into #{into_path || into}"
c
rescue Aborted
- Log.high "Consume stream Aborted #{Log.fingerprint io} into #{into_path || into}"
+ Log.low "Consume stream Aborted #{Log.fingerprint io} into #{into_path || into}"
io.abort $! if io.respond_to? :abort
into.close if into.respond_to?(:closed?) && ! into.closed?
FileUtils.rm into_path if into_path and File.exist?(into_path)
rescue Exception
- Log.high "Consume stream Exception reading #{Log.fingerprint io} into #{into_path || into} - #{$!.message}"
+ Log.low "Consume stream Exception reading #{Log.fingerprint io} into #{into_path || into} - #{$!.message}"
exception = io.stream_exception || $!
io.abort exception if io.respond_to? :abort
into.close if into.respond_to?(:closed?) && ! into.closed?
into_path = into if into_path.nil? && String === into
if into_path and File.exist?(into_path)
@@ -143,16 +144,16 @@
Open.touch path if File.exist? path
content.join if content.respond_to?(:join) and not Path === content and not (content.respond_to?(:joined?) && content.joined?)
Open.notify_write(path)
rescue Aborted
- Log.medium "Aborted sensible_write -- #{ Log.reset << Log.color(:blue, path) }"
+ Log.low "Aborted sensible_write -- #{ Log.reset << Log.color(:blue, path) }"
content.abort if content.respond_to? :abort
Open.rm path if File.exist? path
rescue Exception
exception = (AbortedStream === content and content.exception) ? content.exception : $!
- Log.medium "Exception in sensible_write: [#{Process.pid}] #{exception.message} -- #{ Log.color :blue, path }"
+ Log.low "Exception in sensible_write: [#{Process.pid}] #{exception.message} -- #{ Log.color :blue, path }"
content.abort if content.respond_to? :abort
Open.rm path if File.exist? path
raise exception
rescue
raise $!
@@ -217,20 +218,19 @@
if do_fork
#parent_pid = Process.pid
pid = Process.fork {
- purge_pipes(sin)
- sout.close
begin
+ purge_pipes(sin)
+ sout.close
yield sin
sin.close if close and not sin.closed?
rescue Exception
Log.exception $!
- #Process.kill :INT, parent_pid
Kernel.exit!(-1)
end
Kernel.exit! 0
}
sin.close
@@ -240,22 +240,22 @@
ConcurrentStream.setup sin, :pair => sout
ConcurrentStream.setup sout, :pair => sin
thread = Thread.new do
- Thread.current["name"] = "Pipe input #{Log.fingerprint sin} => #{Log.fingerprint sout}"
- Thread.current.report_on_exception = false
begin
+ Thread.current.report_on_exception = false
+ Thread.current["name"] = "Pipe input #{Log.fingerprint sin} => #{Log.fingerprint sout}"
yield sin
sin.close if close and not sin.closed? and not sin.aborted?
rescue Aborted
- Log.medium "Aborted open_pipe: #{$!.message}"
+ Log.low "Aborted open_pipe: #{$!.message}"
raise $!
rescue Exception
- Log.medium "Exception in open_pipe: #{$!.message}"
+ Log.low "Exception in open_pipe: #{$!.message}"
begin
sout.threads.delete(Thread.current)
sout.pair = []
sout.abort($!) if sout.respond_to?(:abort)
sin.threads.delete(Thread.current)
@@ -267,10 +267,11 @@
end
end
sin.threads = [thread]
sout.threads = [thread]
+ Thread.pass until thread["name"]
end
sout
end
@@ -285,12 +286,12 @@
filename = stream.filename if stream.respond_to? :filename
splitter_thread = Thread.new(Thread.current) do |parent|
begin
- Thread.current["name"] = "Splitter #{Log.fingerprint stream}"
Thread.current.report_on_exception = false
+ Thread.current["name"] = "Splitter #{Log.fingerprint stream}"
skip = [false] * num
begin
while block = stream.readpartial(BLOCK_SIZE)
@@ -315,11 +316,11 @@
rescue Aborted, Interrupt
stream.abort if stream.respond_to? :abort
out_pipes.each do |sout|
sout.abort if sout.respond_to? :abort
end
- Log.medium "Tee aborting #{Log.fingerprint stream}"
+ Log.low "Tee aborting #{Log.fingerprint stream}"
raise $!
rescue Exception
begin
stream.abort($!) if stream.respond_to?(:abort) && ! stream.aborted?
out_pipes.reverse.each do |sout|
@@ -330,11 +331,11 @@
end
end
in_pipes.each do |sin|
sin.close unless sin.closed?
end
- Log.medium "Tee exception #{Log.fingerprint stream}"
+ Log.low "Tee exception #{Log.fingerprint stream}"
rescue
Log.exception $!
ensure
in_pipes.each do |sin|
sin.close unless sin.closed?
@@ -346,10 +347,10 @@
out_pipes.each do |sout|
ConcurrentStream.setup sout, :threads => splitter_thread, :filename => filename, :pair => stream
end
- splitter_thread.wakeup until splitter_thread["name"]
+ Thread.pass until splitter_thread["name"]
main_pipe = out_pipes.first
main_pipe.autojoin = true
main_pipe.callback = Proc.new do