lib/scout/open/stream.rb in scout-gear-7.1.0 vs lib/scout/open/stream.rb in scout-gear-7.2.0

- old
+ new

@@ -1,7 +1,7 @@ module Open - BLOCK_SIZE = 1024 + BLOCK_SIZE = 1024 * 8 class << self attr_accessor :sensible_write_lock_dir def sensible_write_lock_dir @@ -50,13 +50,11 @@ dir = File.dirname(into) Open.mkdir dir unless File.exist?(dir) into_path, into = into, File.open(into, 'w') end - into.sync = true if IO === into into_close = false unless into.respond_to? :close - io.sync = true begin while c = io.readpartial(BLOCK_SIZE) into << c if into end @@ -75,11 +73,11 @@ io.abort $! if io.respond_to? :abort into.close if into.respond_to?(:closed?) && ! into.closed? FileUtils.rm into_path if into_path and File.exist?(into_path) rescue Exception Log.low "Consume stream Exception reading #{Log.fingerprint io} into #{into_path || into} - #{$!.message}" - exception = io.stream_exception || $! + exception = (io.respond_to?(:stream_exception) && io.stream_exception) ? io.stream_exception : $! io.abort exception if io.respond_to? :abort into.close if into.respond_to?(:closed?) && ! into.closed? into_path = into if into_path.nil? && String === into if into_path and File.exist?(into_path) FileUtils.rm into_path @@ -108,22 +106,22 @@ if File.exist? path and not force Log.warn "Path exists in sensible_write, not forcing update: #{ path }" Open.consume_stream content else - FileUtils.mkdir_p File.dirname(tmp_path) unless File.directory? File.dirname(tmp_path) + FileUtils.mkdir_p File.dirname(tmp_path) unless File.directory?(File.dirname(tmp_path)) FileUtils.rm_f tmp_path if File.exist? tmp_path begin case when block_given? File.open(tmp_path, 'wb', &block) when String === content File.open(tmp_path, 'wb') do |f| f.write content end when (IO === content or StringIO === content or File === content) Open.write(tmp_path) do |f| - f.sync = true + #f.sync = true begin while block = content.readpartial(BLOCK_SIZE) f.write block end rescue EOFError @@ -343,11 +341,10 @@ raise $! end end end - out_pipes.each do |sout| ConcurrentStream.setup sout, :threads => splitter_thread, :filename => filename, :pair => stream end Thread.pass until splitter_thread["name"] @@ -398,7 +395,41 @@ more = stream.read(missing) str << more end str end + + def self.sort_stream(stream, header_hash = "#", cmd_args = "-u") + Open.open_pipe do |sin| + line = stream.gets + while line =~ /^#{header_hash}/ do + sin.puts line + line = stream.gets + end + + line_stream = Open.open_pipe do |line_stream_in| + line_stream_in.puts line + begin + Open.consume_stream(stream, false, line_stream_in) + rescue + raise $! + end + end + + sorted = CMD.cmd("env LC_ALL=C sort #{cmd_args || ""}", :in => line_stream, :pipe => true) + + begin + Open.consume_stream(sorted, false, sin) + rescue + Log.exception $! + begin + sorted.raise($!) if sorted.respond_to? :raise + stream.raise($!) if stream.respond_to? :raise + ensure + raise $! + end + end + end + end + end