lib/rbbt/persist.rb in rbbt-util-5.10.2 vs lib/rbbt/persist.rb in rbbt-util-5.11.1

- old
+ new

@@ -153,15 +153,11 @@ Misc.sensiblewrite(path, "") else Misc.sensiblewrite(path, content * "\n" + "\n") end when IO - Misc.sensiblewrite(path) do |file| - while block = content.read(2048) - file.write block - end - end + Misc.sensiblewrite(path, content) else Misc.sensiblewrite(path, content.to_s) end when :marshal_tsv Misc.sensiblewrite(path, Marshal.dump(content.dup)) @@ -179,18 +175,24 @@ def self.tee_stream_fork(stream, path, type, callback = nil) file, out = Misc.tee_stream(stream) saver_pid = Process.fork do out.close - Misc.purge_pipes(stream) + stream.close + Misc.purge_pipes begin Misc.lock(path) do save_file(path, type, file) end + rescue Aborted + stream.abort if stream.respond_to? :abort + raise $! rescue Exception Log.exception $! + Kernel.exit! -1 end + Kernel.exit! 0 end file.close ConcurrentStream.setup(out, :pids => [saver_pid], :filename => path) end @@ -201,11 +203,15 @@ begin Thread.current["name"] = "file saver: " + path Misc.lock(path) do save_file(path, type, file) end + rescue Aborted + Log.error "Tee stream thread aborted" + stream.abort if stream.respond_to? :abort rescue Exception + stream.abort if stream.respond_to? :abort Log.exception $! parent.raise $! end end ConcurrentStream.setup(out, :threads => saver_thread, :filename => path) @@ -316,39 +322,73 @@ return path if persist_options[:no_load] return load_file(path, type) end begin + lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir}) - Misc.lock lock_filename do + Misc.lock lock_filename do |lockfile| if is_persisted?(path, persist_options) Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}" return path if persist_options[:no_load] return load_file(path, type) end - Log.medium "Persist create: #{ path } - #{persist_options.inspect[0..100]}" + Log.medium "Persist create: #{ path } - #{Misc.fingerprint persist_options}" + res = yield + if persist_options[:no_load] == :stream + case res + when IO + res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil) + ConcurrentStream.setup res do + lockfile.unlock + end + raise KeepLocked.new res + when TSV::Dumper + res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil) + ConcurrentStream.setup res do + lockfile.unlock + end + raise KeepLocked.new res + end + end + case res - when nil - res = load_file(path) unless persist_options[:no_load] - when IO, StringIO - res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil) - return res if persist_options[:no_load] == :stream + when IO + begin + res = case + when :array + res.read.split "\n" + when :tsv + TSV.open(res) + else + res.read + end + rescue + res.abort if res.respond_to? :abort + ensure + res.join if res.respond_to? :join + end when TSV::Dumper - res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil) - return res if persist_options[:no_load] == :stream - else - Misc.lock(path) do - save_file(path, type, res) + begin + io = res.stream + res = TSV.open(io) + rescue + io.abort if io.respond_to? :abort + ensure + io.join if io.respond_to? :join end end - return path if persist_options[:no_load] + Misc.lock(path) do + save_file(path, type, res) + end - res + persist_options[:no_load] ? path : res end + rescue Log.high "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}" FileUtils.rm path if Open.exists? path raise $! end