lib/rbbt/persist.rb in rbbt-util-5.13.3 vs lib/rbbt/persist.rb in rbbt-util-5.13.4
- old
+ new
@@ -188,11 +188,11 @@
else
raise "Unknown persistence: #{ type }"
end
end
- def self.tee_stream_thread(stream, path, type, callback = nil)
+ def self.tee_stream_thread(stream, path, type, callback = nil, abort_callback = nil)
file, out = Misc.tee_stream(stream)
saver_thread = Thread.new(Thread.current) do |parent|
begin
Thread.current["name"] = "file saver: " + path
@@ -207,13 +207,16 @@
file.abort if file.respond_to? :abort
parent.raise $!
end
end
ConcurrentStream.setup(out, :threads => saver_thread, :filename => path)
+ out.callback = callback
+ out.abort_callback = abort_callback
+ out
end
- def self.tee_stream_pipe(stream, path, type, callback = nil)
+ def self.tee_stream_pipe(stream, path, type, callback = nil, abort_callback = nil)
parent_pid = Process.pid
out = Misc.open_pipe true, false do |sin|
begin
file, out = Misc.tee_stream(stream)
@@ -278,42 +281,41 @@
res = yield
if persist_options[:no_load] == :stream
case res
when IO
- res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil)
+ res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil, res.respond_to?(:abort_callback)? res.abort_callback : nil)
ConcurrentStream.setup res do
begin
lockfile.unlock if File.exists? lockfile.path and lockfile.locked?
rescue
- Log.exception $!
Log.warn "Lockfile exception: " << $!.message
end
end
res.abort_callback = Proc.new do
begin
lockfile.unlock if File.exists? lockfile.path and lockfile.locked?
rescue
- Log.exception $!
Log.warn "Lockfile exception: " << $!.message
end
end
raise KeepLocked.new res
when TSV::Dumper
- res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil)
+ stream = res.stream
+ res = tee_stream(stream, path, type)
ConcurrentStream.setup res do
begin
+ stream.callback
lockfile.unlock if File.exists? lockfile.path and lockfile.locked?
rescue
- Log.exception $!
Log.warn "Lockfile exception: " << $!.message
end
end
res.abort_callback = Proc.new do
begin
+ stream.abort
lockfile.unlock if File.exists? lockfile.path and lockfile.locked?
rescue
- Log.exception $!
Log.warn "Lockfile exception: " << $!.message
end
end
raise KeepLocked.new res
end