lib/rbbt/persist.rb in rbbt-util-5.13.30 vs lib/rbbt/persist.rb in rbbt-util-5.13.31
- old
+ new
@@ -16,11 +16,11 @@
end
attr_accessor :lock_dir
def lock_dir
- @lock_dir ||= Rbbt.tmp.tsv_open_locks.find
+ @lock_dir ||= Rbbt.tmp.persist_locks.find
end
end
MEMORY = {} unless defined? MEMORY
MAX_FILE_LENGTH = 150
@@ -280,15 +280,17 @@
alias tee_stream tee_stream_thread
end
def self.get_result(path, type, persist_options, lockfile, &block)
res = yield
+ stream = res if IO === res
+ stream = res.stream if res.respond_to? :stream
- if persist_options[:no_load] == :stream
- case res
- when IO
- res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil, res.respond_to?(:abort_callback)? res.abort_callback : nil)
+ if stream
+ if persist_options[:no_load] == :stream
+ res = tee_stream(stream, path, type, stream.respond_to?(:callback)? stream.callback : nil, stream.respond_to?(:abort_callback)? stream.abort_callback : nil)
+
ConcurrentStream.setup res do
begin
lockfile.unlock #if File.exists? lockfile.path and lockfile.locked?
rescue Exception
Log.medium "Lockfile exception: " << $!.message
@@ -300,31 +302,54 @@
rescue Exception
Log.medium "Lockfile exception: " << $!.message
end
end
raise KeepLocked.new res
- when TSV::Dumper
- stream = res.stream
- res = tee_stream(stream, path, type)
- ConcurrentStream.setup res do
- begin
- stream.callback
- lockfile.unlock #if File.exists? lockfile.path and lockfile.locked?
- rescue Exception
- Log.medium "Lockfile exception: " << $!.message
- end
+ else
+ begin
+ res = case type
+ when :array
+ res.read.split "\n"
+ when :tsv
+ TSV.open(res)
+ else
+ res.read
+ end
+ res.join if res.respond_to? :join
+ res
+ rescue
+ res.abort if res.respond_to? :abort
+ raise $!
end
- res.abort_callback = Proc.new do
- begin
- stream.abort
+ end
+ else
+ res
+ end
+ end
+
+ def self._get_result(path, type, persist_options, lockfile, &block)
+ res = yield
+
+ if persist_options[:no_load] == :stream
+ stream = IO === res ? res : res.stream
+ res = tee_stream(stream, path, type, stream.respond_to?(:callback)? stream.callback : nil, stream.respond_to?(:abort_callback)? stream.abort_callback : nil)
+
+ ConcurrentStream.setup res do
+ begin
lockfile.unlock #if File.exists? lockfile.path and lockfile.locked?
- rescue Exception
- Log.medium "Lockfile exception: " << $!.message
- end
+ rescue Exception
+ Log.medium "Lockfile exception: " << $!.message
end
- raise KeepLocked.new res
end
+ res.abort_callback = Proc.new do
+ begin
+ lockfile.unlock #if File.exists? lockfile.path and lockfile.locked?
+ rescue Exception
+ Log.medium "Lockfile exception: " << $!.message
+ end
+ end
+ raise KeepLocked.new res
end
case res
when IO
begin
@@ -354,28 +379,33 @@
res
end
def self.persist_file(path, type, persist_options, &block)
- if is_persisted?(path, persist_options)
- Log.low "Persist up-to-date: #{ path } - #{Misc.fingerprint persist_options}"
- return path if persist_options[:no_load]
- return load_file(path, type)
- else
+ begin
+ if is_persisted?(path, persist_options)
+ Log.low "Persist up-to-date: #{ path } - #{Misc.fingerprint persist_options}"
+ return path if persist_options[:no_load]
+ return load_file(path, type)
+ else
+ Open.rm path if Open.exists? path
+ end
+ rescue Exception
Open.rm path if Open.exists? path
end
+ lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir})
begin
-
- lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir})
Misc.lock lock_filename do |lockfile|
- if is_persisted?(path, persist_options)
- Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}"
- return path if persist_options[:no_load]
- return load_file(path, type)
+ Misc.insist do
+ if is_persisted?(path, persist_options)
+ Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}"
+ return path if persist_options[:no_load]
+ return load_file(path, type)
+ end
end
Log.medium "Persist create: #{ path } - #{type} #{Misc.fingerprint persist_options}"
res = get_result(path, type, persist_options, lockfile, &block)
@@ -385,11 +415,15 @@
end
persist_options[:no_load] ? path : res
end
- rescue
+ rescue Lockfile::StolenLockError
+ Log.medium "Lockfile stolen: #{path}"
+ retry
+ rescue Exception
Log.medium "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}"
+ Log.exception $!
FileUtils.rm path if Open.exists? path
raise $!
end
end