lib/rbbt/persist.rb in rbbt-util-5.13.30 vs lib/rbbt/persist.rb in rbbt-util-5.13.31

- old
+ new

@@ -16,11 +16,11 @@ end attr_accessor :lock_dir def lock_dir - @lock_dir ||= Rbbt.tmp.tsv_open_locks.find + @lock_dir ||= Rbbt.tmp.persist_locks.find end end MEMORY = {} unless defined? MEMORY MAX_FILE_LENGTH = 150 @@ -280,15 +280,17 @@ alias tee_stream tee_stream_thread end def self.get_result(path, type, persist_options, lockfile, &block) res = yield + stream = res if IO === res + stream = res.stream if res.respond_to? :stream - if persist_options[:no_load] == :stream - case res - when IO - res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil, res.respond_to?(:abort_callback)? res.abort_callback : nil) + if stream + if persist_options[:no_load] == :stream + res = tee_stream(stream, path, type, stream.respond_to?(:callback)? stream.callback : nil, stream.respond_to?(:abort_callback)? stream.abort_callback : nil) + ConcurrentStream.setup res do begin lockfile.unlock #if File.exists? lockfile.path and lockfile.locked? rescue Exception Log.medium "Lockfile exception: " << $!.message @@ -300,31 +302,54 @@ rescue Exception Log.medium "Lockfile exception: " << $!.message end end raise KeepLocked.new res - when TSV::Dumper - stream = res.stream - res = tee_stream(stream, path, type) - ConcurrentStream.setup res do - begin - stream.callback - lockfile.unlock #if File.exists? lockfile.path and lockfile.locked? - rescue Exception - Log.medium "Lockfile exception: " << $!.message - end + else + begin + res = case type + when :array + res.read.split "\n" + when :tsv + TSV.open(res) + else + res.read + end + res.join if res.respond_to? :join + res + rescue + res.abort if res.respond_to? :abort + raise $! end - res.abort_callback = Proc.new do - begin - stream.abort + end + else + res + end + end + + def self._get_result(path, type, persist_options, lockfile, &block) + res = yield + + if persist_options[:no_load] == :stream + stream = IO === res ? res : res.stream + res = tee_stream(stream, path, type, stream.respond_to?(:callback)? stream.callback : nil, stream.respond_to?(:abort_callback)? stream.abort_callback : nil) + + ConcurrentStream.setup res do + begin lockfile.unlock #if File.exists? lockfile.path and lockfile.locked? - rescue Exception - Log.medium "Lockfile exception: " << $!.message - end + rescue Exception + Log.medium "Lockfile exception: " << $!.message end - raise KeepLocked.new res end + res.abort_callback = Proc.new do + begin + lockfile.unlock #if File.exists? lockfile.path and lockfile.locked? + rescue Exception + Log.medium "Lockfile exception: " << $!.message + end + end + raise KeepLocked.new res end case res when IO begin @@ -354,28 +379,33 @@ res end def self.persist_file(path, type, persist_options, &block) - if is_persisted?(path, persist_options) - Log.low "Persist up-to-date: #{ path } - #{Misc.fingerprint persist_options}" - return path if persist_options[:no_load] - return load_file(path, type) - else + begin + if is_persisted?(path, persist_options) + Log.low "Persist up-to-date: #{ path } - #{Misc.fingerprint persist_options}" + return path if persist_options[:no_load] + return load_file(path, type) + else + Open.rm path if Open.exists? path + end + rescue Exception Open.rm path if Open.exists? path end + lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir}) begin - - lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir}) Misc.lock lock_filename do |lockfile| - if is_persisted?(path, persist_options) - Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}" - return path if persist_options[:no_load] - return load_file(path, type) + Misc.insist do + if is_persisted?(path, persist_options) + Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}" + return path if persist_options[:no_load] + return load_file(path, type) + end end Log.medium "Persist create: #{ path } - #{type} #{Misc.fingerprint persist_options}" res = get_result(path, type, persist_options, lockfile, &block) @@ -385,11 +415,15 @@ end persist_options[:no_load] ? path : res end - rescue + rescue Lockfile::StolenLockError + Log.medium "Lockfile stolen: #{path}" + retry + rescue Exception Log.medium "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}" + Log.exception $! FileUtils.rm path if Open.exists? path raise $! end end