lib/rbbt/persist.rb in rbbt-util-5.11.1 vs lib/rbbt/persist.rb in rbbt-util-5.11.2
- old
+ new
@@ -170,63 +170,122 @@
else
raise "Unknown persistence: #{ type }"
end
end
- def self.tee_stream_fork(stream, path, type, callback = nil)
+ #def self.tee_stream_fork(stream, path, type, callback = nil)
+ # file, out = Misc.tee_stream(stream)
+
+ # saver_pid = Process.fork do
+ # out.close
+ # stream.close
+ # Misc.purge_pipes
+ # begin
+ # Misc.lock(path) do
+ # save_file(path, type, file)
+ # end
+ # rescue Aborted
+ # stream.abort if stream.respond_to? :abort
+ # raise $!
+ # rescue Exception
+ # Log.exception $!
+ # Kernel.exit! -1
+ # end
+ # Kernel.exit! 0
+ # end
+ # file.close
+ # ConcurrentStream.setup(out, :pids => [saver_pid], :filename => path)
+ #end
+
+ def self.tee_stream_thread(stream, path, type, callback = nil)
file, out = Misc.tee_stream(stream)
- saver_pid = Process.fork do
- out.close
- stream.close
- Misc.purge_pipes
+ saver_thread = Thread.new(Thread.current, path, file) do |parent,path,file|
begin
+ Thread.current["name"] = "file saver: " + path
Misc.lock(path) do
save_file(path, type, file)
end
rescue Aborted
+ Log.error "Persist stream thread aborted: #{ Log.color :blue, path }"
stream.abort if stream.respond_to? :abort
- raise $!
rescue Exception
+ Log.error "Persist stream thread exception: #{ Log.color :blue, path }"
Log.exception $!
- Kernel.exit! -1
+ stream.abort if stream.respond_to? :abort
+ parent.raise $!
end
- Kernel.exit! 0
end
- file.close
- ConcurrentStream.setup(out, :pids => [saver_pid], :filename => path)
+ ConcurrentStream.setup(out, :threads => saver_thread, :filename => path)
end
- def self.tee_stream_thread(stream, path, type, callback = nil)
- file, out = Misc.tee_stream(stream)
-
- saver_thread = Thread.new(Thread.current, path, file) do |parent,path,file|
+ def self.tee_stream_pipe(stream, path, type, callback = nil)
+ parent_pid = Process.pid
+ out = Misc.open_pipe true, false do |sin|
begin
- Thread.current["name"] = "file saver: " + path
- Misc.lock(path) do
- save_file(path, type, file)
+ file, out = Misc.tee_stream(stream)
+
+ saver_th = Thread.new(Thread.current, path, file) do |parent,path,file|
+ begin
+ Misc.lock(path) do
+ save_file(path, type, file)
+ end
+ Log.high "Stream pipe saved: #{path}"
+ rescue Aborted
+ Log.error "Persist stream pipe exception: #{ Log.color :blue, path }"
+ stream.abort if stream.respond_to? :abort
+ rescue Exception
+ Log.error "Persist stream pipe exception: #{ Log.color :blue, path }"
+ Log.exception $!
+ stream.abort if stream.respond_to? :abort
+ parent.raise $!
+ end
end
+
+ tee_th = Thread.new(Thread.current) do |parent|
+ begin
+ while block = out.read(2028)
+ sin.write block
+ end
+ rescue Aborted
+ Log.error "Tee stream thread aborted"
+ stream.abort if stream.respond_to? :abort
+ rescue Exception
+ stream.abort if stream.respond_to? :abort
+ Log.exception $!
+ parent.raise $!
+ ensure
+ sin.close unless sin.closed?
+ end
+ end
+ saver_th.join
+ tee_th.join
rescue Aborted
- Log.error "Tee stream thread aborted"
- stream.abort if stream.respond_to? :abort
+ tee_th.raise Aborted.new if tee_th and tee_th.alive?
+ saver_th.raise Aborted.new if saver_th and saver_th.alive?
+ Kernel.exit! -1
rescue Exception
- stream.abort if stream.respond_to? :abort
+ tee_th.raise Aborted.new if tee_th and tee_th.alive?
+ saver_th.raise Aborted.new if saver_th and saver_th.alive?
Log.exception $!
- parent.raise $!
+ Process.kill :INT, parent_pid
+ Kernel.exit! -1
end
end
- ConcurrentStream.setup(out, :threads => saver_thread, :filename => path)
+ stream.close
+ out
end
class << self
alias tee_stream tee_stream_thread
end
+
def self.persist(name, type = nil, persist_options = {})
type ||= :marshal
- return (persist_options[:repo] || Persist::MEMORY)[persist_options[:file]] ||= yield if type ==:memory and persist_options[:file]
+ return (persist_options[:repo] || Persist::MEMORY)[persist_options[:file]] ||= yield if type ==:memory and persist_options[:file] and persist_options[:persist] and persist_options[:persist] != :update
if FalseClass != persist_options[:persist]
other_options = Misc.process_options persist_options, :other
path = persistence_path(name, persist_options, other_options || {})
@@ -325,10 +384,11 @@
begin
lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir})
Misc.lock lock_filename do |lockfile|
+
if is_persisted?(path, persist_options)
Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}"
return path if persist_options[:no_load]
return load_file(path, type)
end
@@ -340,17 +400,25 @@
if persist_options[:no_load] == :stream
case res
when IO
res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil)
ConcurrentStream.setup res do
- lockfile.unlock
+ begin
+ lockfile.unlock
+ rescue
+ Log.warn "Lockfile exception: " << $!.message
+ end
end
raise KeepLocked.new res
when TSV::Dumper
res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil)
ConcurrentStream.setup res do
- lockfile.unlock
+ begin
+ lockfile.unlock
+ rescue
+ Log.warn "Lockfile exception: " << $!.message
+ end
end
raise KeepLocked.new res
end
end
@@ -365,19 +433,21 @@
else
res.read
end
rescue
res.abort if res.respond_to? :abort
+ raise $!
ensure
res.join if res.respond_to? :join
end
when TSV::Dumper
begin
io = res.stream
res = TSV.open(io)
rescue
io.abort if io.respond_to? :abort
+ raise $!
ensure
io.join if io.respond_to? :join
end
end
@@ -387,11 +457,11 @@
persist_options[:no_load] ? path : res
end
rescue
- Log.high "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}"
+ Log.error "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}"
FileUtils.rm path if Open.exists? path
raise $!
end
end
@@ -408,10 +478,10 @@
persist name, :memory, :file => name + "_" << options, &block
else
file = name
repo = options.delete :repo if options and options.any?
file << "_" << (options[:key] ? options[:key] : Misc.hash2md5(options)) if options and options.any?
- persist name, :memory, options.merge(:repo => repo, :persist => true, :file => file), &block
+ persist name, :memory, {:repo => repo, :persist => true, :file => file}.merge(options), &block
end
end
end
module LocalPersist