lib/rbbt/persist.rb in rbbt-util-5.11.1 vs lib/rbbt/persist.rb in rbbt-util-5.11.2

- old
+ new

@@ -170,63 +170,122 @@ else raise "Unknown persistence: #{ type }" end end - def self.tee_stream_fork(stream, path, type, callback = nil) + #def self.tee_stream_fork(stream, path, type, callback = nil) + # file, out = Misc.tee_stream(stream) + + # saver_pid = Process.fork do + # out.close + # stream.close + # Misc.purge_pipes + # begin + # Misc.lock(path) do + # save_file(path, type, file) + # end + # rescue Aborted + # stream.abort if stream.respond_to? :abort + # raise $! + # rescue Exception + # Log.exception $! + # Kernel.exit! -1 + # end + # Kernel.exit! 0 + # end + # file.close + # ConcurrentStream.setup(out, :pids => [saver_pid], :filename => path) + #end + + def self.tee_stream_thread(stream, path, type, callback = nil) file, out = Misc.tee_stream(stream) - saver_pid = Process.fork do - out.close - stream.close - Misc.purge_pipes + saver_thread = Thread.new(Thread.current, path, file) do |parent,path,file| begin + Thread.current["name"] = "file saver: " + path Misc.lock(path) do save_file(path, type, file) end rescue Aborted + Log.error "Persist stream thread aborted: #{ Log.color :blue, path }" stream.abort if stream.respond_to? :abort - raise $! rescue Exception + Log.error "Persist stream thread exception: #{ Log.color :blue, path }" Log.exception $! - Kernel.exit! -1 + stream.abort if stream.respond_to? :abort + parent.raise $! end - Kernel.exit! 0 end - file.close - ConcurrentStream.setup(out, :pids => [saver_pid], :filename => path) + ConcurrentStream.setup(out, :threads => saver_thread, :filename => path) end - def self.tee_stream_thread(stream, path, type, callback = nil) - file, out = Misc.tee_stream(stream) - - saver_thread = Thread.new(Thread.current, path, file) do |parent,path,file| + def self.tee_stream_pipe(stream, path, type, callback = nil) + parent_pid = Process.pid + out = Misc.open_pipe true, false do |sin| begin - Thread.current["name"] = "file saver: " + path - Misc.lock(path) do - save_file(path, type, file) + file, out = Misc.tee_stream(stream) + + saver_th = Thread.new(Thread.current, path, file) do |parent,path,file| + begin + Misc.lock(path) do + save_file(path, type, file) + end + Log.high "Stream pipe saved: #{path}" + rescue Aborted + Log.error "Persist stream pipe exception: #{ Log.color :blue, path }" + stream.abort if stream.respond_to? :abort + rescue Exception + Log.error "Persist stream pipe exception: #{ Log.color :blue, path }" + Log.exception $! + stream.abort if stream.respond_to? :abort + parent.raise $! + end end + + tee_th = Thread.new(Thread.current) do |parent| + begin + while block = out.read(2028) + sin.write block + end + rescue Aborted + Log.error "Tee stream thread aborted" + stream.abort if stream.respond_to? :abort + rescue Exception + stream.abort if stream.respond_to? :abort + Log.exception $! + parent.raise $! + ensure + sin.close unless sin.closed? + end + end + saver_th.join + tee_th.join rescue Aborted - Log.error "Tee stream thread aborted" - stream.abort if stream.respond_to? :abort + tee_th.raise Aborted.new if tee_th and tee_th.alive? + saver_th.raise Aborted.new if saver_th and saver_th.alive? + Kernel.exit! -1 rescue Exception - stream.abort if stream.respond_to? :abort + tee_th.raise Aborted.new if tee_th and tee_th.alive? + saver_th.raise Aborted.new if saver_th and saver_th.alive? Log.exception $! - parent.raise $! + Process.kill :INT, parent_pid + Kernel.exit! -1 end end - ConcurrentStream.setup(out, :threads => saver_thread, :filename => path) + stream.close + out end class << self alias tee_stream tee_stream_thread end + def self.persist(name, type = nil, persist_options = {}) type ||= :marshal - return (persist_options[:repo] || Persist::MEMORY)[persist_options[:file]] ||= yield if type ==:memory and persist_options[:file] + return (persist_options[:repo] || Persist::MEMORY)[persist_options[:file]] ||= yield if type ==:memory and persist_options[:file] and persist_options[:persist] and persist_options[:persist] != :update if FalseClass != persist_options[:persist] other_options = Misc.process_options persist_options, :other path = persistence_path(name, persist_options, other_options || {}) @@ -325,10 +384,11 @@ begin lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir}) Misc.lock lock_filename do |lockfile| + if is_persisted?(path, persist_options) Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}" return path if persist_options[:no_load] return load_file(path, type) end @@ -340,17 +400,25 @@ if persist_options[:no_load] == :stream case res when IO res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil) ConcurrentStream.setup res do - lockfile.unlock + begin + lockfile.unlock + rescue + Log.warn "Lockfile exception: " << $!.message + end end raise KeepLocked.new res when TSV::Dumper res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil) ConcurrentStream.setup res do - lockfile.unlock + begin + lockfile.unlock + rescue + Log.warn "Lockfile exception: " << $!.message + end end raise KeepLocked.new res end end @@ -365,19 +433,21 @@ else res.read end rescue res.abort if res.respond_to? :abort + raise $! ensure res.join if res.respond_to? :join end when TSV::Dumper begin io = res.stream res = TSV.open(io) rescue io.abort if io.respond_to? :abort + raise $! ensure io.join if io.respond_to? :join end end @@ -387,11 +457,11 @@ persist_options[:no_load] ? path : res end rescue - Log.high "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}" + Log.error "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}" FileUtils.rm path if Open.exists? path raise $! end end @@ -408,10 +478,10 @@ persist name, :memory, :file => name + "_" << options, &block else file = name repo = options.delete :repo if options and options.any? file << "_" << (options[:key] ? options[:key] : Misc.hash2md5(options)) if options and options.any? - persist name, :memory, options.merge(:repo => repo, :persist => true, :file => file), &block + persist name, :memory, {:repo => repo, :persist => true, :file => file}.merge(options), &block end end end module LocalPersist