lib/scout/open/stream.rb in scout-gear-7.1.0 vs lib/scout/open/stream.rb in scout-gear-7.2.0
- old
+ new
@@ -1,7 +1,7 @@
module Open
- BLOCK_SIZE = 1024
+ BLOCK_SIZE = 1024 * 8
class << self
attr_accessor :sensible_write_lock_dir
def sensible_write_lock_dir
@@ -50,13 +50,11 @@
dir = File.dirname(into)
Open.mkdir dir unless File.exist?(dir)
into_path, into = into, File.open(into, 'w')
end
- into.sync = true if IO === into
into_close = false unless into.respond_to? :close
- io.sync = true
begin
while c = io.readpartial(BLOCK_SIZE)
into << c if into
end
@@ -75,11 +73,11 @@
io.abort $! if io.respond_to? :abort
into.close if into.respond_to?(:closed?) && ! into.closed?
FileUtils.rm into_path if into_path and File.exist?(into_path)
rescue Exception
Log.low "Consume stream Exception reading #{Log.fingerprint io} into #{into_path || into} - #{$!.message}"
- exception = io.stream_exception || $!
+ exception = (io.respond_to?(:stream_exception) && io.stream_exception) ? io.stream_exception : $!
io.abort exception if io.respond_to? :abort
into.close if into.respond_to?(:closed?) && ! into.closed?
into_path = into if into_path.nil? && String === into
if into_path and File.exist?(into_path)
FileUtils.rm into_path
@@ -108,22 +106,22 @@
if File.exist? path and not force
Log.warn "Path exists in sensible_write, not forcing update: #{ path }"
Open.consume_stream content
else
- FileUtils.mkdir_p File.dirname(tmp_path) unless File.directory? File.dirname(tmp_path)
+ FileUtils.mkdir_p File.dirname(tmp_path) unless File.directory?(File.dirname(tmp_path))
FileUtils.rm_f tmp_path if File.exist? tmp_path
begin
case
when block_given?
File.open(tmp_path, 'wb', &block)
when String === content
File.open(tmp_path, 'wb') do |f| f.write content end
when (IO === content or StringIO === content or File === content)
Open.write(tmp_path) do |f|
- f.sync = true
+ #f.sync = true
begin
while block = content.readpartial(BLOCK_SIZE)
f.write block
end
rescue EOFError
@@ -343,11 +341,10 @@
raise $!
end
end
end
-
out_pipes.each do |sout|
ConcurrentStream.setup sout, :threads => splitter_thread, :filename => filename, :pair => stream
end
Thread.pass until splitter_thread["name"]
@@ -398,7 +395,41 @@
more = stream.read(missing)
str << more
end
str
end
+
+ def self.sort_stream(stream, header_hash = "#", cmd_args = "-u")
+ Open.open_pipe do |sin|
+ line = stream.gets
+ while line =~ /^#{header_hash}/ do
+ sin.puts line
+ line = stream.gets
+ end
+
+ line_stream = Open.open_pipe do |line_stream_in|
+ line_stream_in.puts line
+ begin
+ Open.consume_stream(stream, false, line_stream_in)
+ rescue
+ raise $!
+ end
+ end
+
+ sorted = CMD.cmd("env LC_ALL=C sort #{cmd_args || ""}", :in => line_stream, :pipe => true)
+
+ begin
+ Open.consume_stream(sorted, false, sin)
+ rescue
+ Log.exception $!
+ begin
+ sorted.raise($!) if sorted.respond_to? :raise
+ stream.raise($!) if stream.respond_to? :raise
+ ensure
+ raise $!
+ end
+ end
+ end
+ end
+
end