lib/rbbt/persist.rb in rbbt-util-5.12.3 vs lib/rbbt/persist.rb in rbbt-util-5.13.0

- old
+ new

@@ -109,11 +109,21 @@ res = Open.read(path).split("\n", -1) res.pop res when :marshal Open.open(path) do |stream| - Marshal.load(stream) + case stream + when StringIO + begin + Marshal.load(stream) + rescue + Log.exception $! + raise $! + end + else + Marshal.load(stream) + end end when :yaml Open.open(path) do |stream| YAML.load(stream) end @@ -206,23 +216,23 @@ #end def self.tee_stream_thread(stream, path, type, callback = nil) file, out = Misc.tee_stream(stream) - saver_thread = Thread.new(Thread.current, path, file) do |parent,path,file| + saver_thread = Thread.new(Thread.current) do |parent| begin Thread.current["name"] = "file saver: " + path Misc.lock(path) do save_file(path, type, file) end rescue Aborted Log.error "Persist stream thread aborted: #{ Log.color :blue, path }" - stream.abort if stream.respond_to? :abort + file.abort if file.respond_to? :abort rescue Exception Log.error "Persist stream thread exception: #{ Log.color :blue, path }" Log.exception $! - stream.abort if stream.respond_to? :abort + file.abort if file.respond_to? :abort parent.raise $! end end ConcurrentStream.setup(out, :threads => saver_thread, :filename => path) end @@ -288,12 +298,123 @@ class << self alias tee_stream tee_stream_thread end + def self.get_result(path, type, persist_options, lockfile, &block) + res = yield - def self.persist(name, type = nil, persist_options = {}) + if persist_options[:no_load] == :stream + case res + when IO + res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil) + ConcurrentStream.setup res do + begin + lockfile.unlock if lockfile.locked? + rescue + Log.exception $! + Log.warn "Lockfile exception: " << $!.message + end + end + res.abort_callback = Proc.new do + begin + lockfile.unlock if lockfile.locked? + rescue + Log.exception $! + Log.warn "Lockfile exception: " << $!.message + end + end + raise KeepLocked.new res + when TSV::Dumper + res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil) + ConcurrentStream.setup res do + begin + lockfile.unlock + rescue + Log.exception $! + Log.warn "Lockfile exception: " << $!.message + end + end + res.abort_callback = Proc.new do + begin + lockfile.unlock + rescue + Log.exception $! + Log.warn "Lockfile exception: " << $!.message + end + end + raise KeepLocked.new res + end + end + + case res + when IO + begin + res = case + when :array + res.read.split "\n" + when :tsv + TSV.open(res) + else + res.read + end + res.join if res.respond_to? :join + rescue + res.abort if res.respond_to? :abort + raise $! + end + when (defined? TSV and TSV::Dumper) + begin + io = res.stream + res = TSV.open(io) + io.join if io.respond_to? :join + rescue + io.abort if io.respond_to? :abort + raise $! + end + end + res + end + + def self.persist_file(path, type, persist_options, &block) + + if is_persisted?(path, persist_options) + Log.low "Persist up-to-date: #{ path } - #{Misc.fingerprint persist_options}" + return path if persist_options[:no_load] + return load_file(path, type) + end + + begin + + lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir}) + Misc.lock lock_filename do |lockfile| + + if is_persisted?(path, persist_options) + Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}" + return path if persist_options[:no_load] + return load_file(path, type) + end + + Log.medium "Persist create: #{ path } - #{type} #{Misc.fingerprint persist_options}" + + res = get_result(path, type, persist_options, lockfile, &block) + + Misc.lock(path) do + save_file(path, type, res) + end + + persist_options[:no_load] ? path : res + end + + rescue + Log.error "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}" + FileUtils.rm path if Open.exists? path + raise $! + end + end + + def self.persist(name, type = nil, persist_options = {}, &block) type ||= :marshal return (persist_options[:repo] || Persist::MEMORY)[persist_options[:file]] ||= yield if type ==:memory and persist_options[:file] and persist_options[:persist] and persist_options[:persist] != :update if FalseClass == persist_options[:persist] @@ -386,113 +507,10 @@ entities end else - - if is_persisted?(path, persist_options) - Log.low "Persist up-to-date: #{ path } - #{Misc.fingerprint persist_options}" - return path if persist_options[:no_load] - return load_file(path, type) - end - - begin - - lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir}) - Misc.lock lock_filename do |lockfile| - - if is_persisted?(path, persist_options) - Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}" - return path if persist_options[:no_load] - return load_file(path, type) - end - - Log.medium "Persist create: #{ path } - #{Misc.fingerprint persist_options}" - - res = yield - - if persist_options[:no_load] == :stream - case res - when IO - res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil) - ConcurrentStream.setup res do - begin - lockfile.unlock if lockfile.locked? - rescue - Log.exception $! - Log.warn "Lockfile exception: " << $!.message - end - end - res.abort_callback = Proc.new do - begin - lockfile.unlock if lockfile.locked? - rescue - Log.exception $! - Log.warn "Lockfile exception: " << $!.message - end - end - raise KeepLocked.new res - when TSV::Dumper - res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil) - ConcurrentStream.setup res do - begin - lockfile.unlock - rescue - Log.exception $! - Log.warn "Lockfile exception: " << $!.message - end - end - res.abort_callback = Proc.new do - begin - lockfile.unlock - rescue - Log.exception $! - Log.warn "Lockfile exception: " << $!.message - end - end - raise KeepLocked.new res - end - end - - case res - when IO - begin - res = case - when :array - res.read.split "\n" - when :tsv - TSV.open(res) - else - res.read - end - res.join if res.respond_to? :join - rescue - res.abort if res.respond_to? :abort - raise $! - end - when (defined? TSV and TSV::Dumper) - begin - io = res.stream - res = TSV.open(io) - io.join if io.respond_to? :join - rescue - io.abort if io.respond_to? :abort - raise $! - end - end - - Misc.lock(path) do - save_file(path, type, res) - end - - persist_options[:no_load] ? path : res - end - - rescue - Log.error "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}" - FileUtils.rm path if Open.exists? path - raise $! - end + persist_file(path, type, persist_options, &block) end end end