lib/rbbt/persist.rb in rbbt-util-5.10.2 vs lib/rbbt/persist.rb in rbbt-util-5.11.1
- old
+ new
@@ -153,15 +153,11 @@
Misc.sensiblewrite(path, "")
else
Misc.sensiblewrite(path, content * "\n" + "\n")
end
when IO
- Misc.sensiblewrite(path) do |file|
- while block = content.read(2048)
- file.write block
- end
- end
+ Misc.sensiblewrite(path, content)
else
Misc.sensiblewrite(path, content.to_s)
end
when :marshal_tsv
Misc.sensiblewrite(path, Marshal.dump(content.dup))
@@ -179,18 +175,24 @@
def self.tee_stream_fork(stream, path, type, callback = nil)
file, out = Misc.tee_stream(stream)
saver_pid = Process.fork do
out.close
- Misc.purge_pipes(stream)
+ stream.close
+ Misc.purge_pipes
begin
Misc.lock(path) do
save_file(path, type, file)
end
+ rescue Aborted
+ stream.abort if stream.respond_to? :abort
+ raise $!
rescue Exception
Log.exception $!
+ Kernel.exit! -1
end
+ Kernel.exit! 0
end
file.close
ConcurrentStream.setup(out, :pids => [saver_pid], :filename => path)
end
@@ -201,11 +203,15 @@
begin
Thread.current["name"] = "file saver: " + path
Misc.lock(path) do
save_file(path, type, file)
end
+ rescue Aborted
+ Log.error "Tee stream thread aborted"
+ stream.abort if stream.respond_to? :abort
rescue Exception
+ stream.abort if stream.respond_to? :abort
Log.exception $!
parent.raise $!
end
end
ConcurrentStream.setup(out, :threads => saver_thread, :filename => path)
@@ -316,39 +322,73 @@
return path if persist_options[:no_load]
return load_file(path, type)
end
begin
+
lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir})
- Misc.lock lock_filename do
+ Misc.lock lock_filename do |lockfile|
if is_persisted?(path, persist_options)
Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}"
return path if persist_options[:no_load]
return load_file(path, type)
end
- Log.medium "Persist create: #{ path } - #{persist_options.inspect[0..100]}"
+ Log.medium "Persist create: #{ path } - #{Misc.fingerprint persist_options}"
+
res = yield
+ if persist_options[:no_load] == :stream
+ case res
+ when IO
+ res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil)
+ ConcurrentStream.setup res do
+ lockfile.unlock
+ end
+ raise KeepLocked.new res
+ when TSV::Dumper
+ res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil)
+ ConcurrentStream.setup res do
+ lockfile.unlock
+ end
+ raise KeepLocked.new res
+ end
+ end
+
case res
- when nil
- res = load_file(path) unless persist_options[:no_load]
- when IO, StringIO
- res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil)
- return res if persist_options[:no_load] == :stream
+ when IO
+ begin
+ res = case
+ when :array
+ res.read.split "\n"
+ when :tsv
+ TSV.open(res)
+ else
+ res.read
+ end
+ rescue
+ res.abort if res.respond_to? :abort
+ ensure
+ res.join if res.respond_to? :join
+ end
when TSV::Dumper
- res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil)
- return res if persist_options[:no_load] == :stream
- else
- Misc.lock(path) do
- save_file(path, type, res)
+ begin
+ io = res.stream
+ res = TSV.open(io)
+ rescue
+ io.abort if io.respond_to? :abort
+ ensure
+ io.join if io.respond_to? :join
end
end
- return path if persist_options[:no_load]
+ Misc.lock(path) do
+ save_file(path, type, res)
+ end
- res
+ persist_options[:no_load] ? path : res
end
+
rescue
Log.high "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}"
FileUtils.rm path if Open.exists? path
raise $!
end