lib/rbbt/persist.rb in rbbt-util-5.13.3 vs lib/rbbt/persist.rb in rbbt-util-5.13.4

- old
+ new

@@ -188,11 +188,11 @@ else raise "Unknown persistence: #{ type }" end end - def self.tee_stream_thread(stream, path, type, callback = nil) + def self.tee_stream_thread(stream, path, type, callback = nil, abort_callback = nil) file, out = Misc.tee_stream(stream) saver_thread = Thread.new(Thread.current) do |parent| begin Thread.current["name"] = "file saver: " + path @@ -207,13 +207,16 @@ file.abort if file.respond_to? :abort parent.raise $! end end ConcurrentStream.setup(out, :threads => saver_thread, :filename => path) + out.callback = callback + out.abort_callback = abort_callback + out end - def self.tee_stream_pipe(stream, path, type, callback = nil) + def self.tee_stream_pipe(stream, path, type, callback = nil, abort_callback = nil) parent_pid = Process.pid out = Misc.open_pipe true, false do |sin| begin file, out = Misc.tee_stream(stream) @@ -278,42 +281,41 @@ res = yield if persist_options[:no_load] == :stream case res when IO - res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil) + res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil, res.respond_to?(:abort_callback)? res.abort_callback : nil) ConcurrentStream.setup res do begin lockfile.unlock if File.exists? lockfile.path and lockfile.locked? rescue - Log.exception $! Log.warn "Lockfile exception: " << $!.message end end res.abort_callback = Proc.new do begin lockfile.unlock if File.exists? lockfile.path and lockfile.locked? rescue - Log.exception $! Log.warn "Lockfile exception: " << $!.message end end raise KeepLocked.new res when TSV::Dumper - res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil) + stream = res.stream + res = tee_stream(stream, path, type) ConcurrentStream.setup res do begin + stream.callback lockfile.unlock if File.exists? lockfile.path and lockfile.locked? rescue - Log.exception $! Log.warn "Lockfile exception: " << $!.message end end res.abort_callback = Proc.new do begin + stream.abort lockfile.unlock if File.exists? lockfile.path and lockfile.locked? rescue - Log.exception $! Log.warn "Lockfile exception: " << $!.message end end raise KeepLocked.new res end