lib/rbbt/persist.rb in rbbt-util-5.12.3 vs lib/rbbt/persist.rb in rbbt-util-5.13.0
- old
+ new
@@ -109,11 +109,21 @@
res = Open.read(path).split("\n", -1)
res.pop
res
when :marshal
Open.open(path) do |stream|
- Marshal.load(stream)
+ case stream
+ when StringIO
+ begin
+ Marshal.load(stream)
+ rescue
+ Log.exception $!
+ raise $!
+ end
+ else
+ Marshal.load(stream)
+ end
end
when :yaml
Open.open(path) do |stream|
YAML.load(stream)
end
@@ -206,23 +216,23 @@
#end
def self.tee_stream_thread(stream, path, type, callback = nil)
file, out = Misc.tee_stream(stream)
- saver_thread = Thread.new(Thread.current, path, file) do |parent,path,file|
+ saver_thread = Thread.new(Thread.current) do |parent|
begin
Thread.current["name"] = "file saver: " + path
Misc.lock(path) do
save_file(path, type, file)
end
rescue Aborted
Log.error "Persist stream thread aborted: #{ Log.color :blue, path }"
- stream.abort if stream.respond_to? :abort
+ file.abort if file.respond_to? :abort
rescue Exception
Log.error "Persist stream thread exception: #{ Log.color :blue, path }"
Log.exception $!
- stream.abort if stream.respond_to? :abort
+ file.abort if file.respond_to? :abort
parent.raise $!
end
end
ConcurrentStream.setup(out, :threads => saver_thread, :filename => path)
end
@@ -288,12 +298,123 @@
class << self
alias tee_stream tee_stream_thread
end
+ def self.get_result(path, type, persist_options, lockfile, &block)
+ res = yield
- def self.persist(name, type = nil, persist_options = {})
+ if persist_options[:no_load] == :stream
+ case res
+ when IO
+ res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil)
+ ConcurrentStream.setup res do
+ begin
+ lockfile.unlock if lockfile.locked?
+ rescue
+ Log.exception $!
+ Log.warn "Lockfile exception: " << $!.message
+ end
+ end
+ res.abort_callback = Proc.new do
+ begin
+ lockfile.unlock if lockfile.locked?
+ rescue
+ Log.exception $!
+ Log.warn "Lockfile exception: " << $!.message
+ end
+ end
+ raise KeepLocked.new res
+ when TSV::Dumper
+ res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil)
+ ConcurrentStream.setup res do
+ begin
+ lockfile.unlock
+ rescue
+ Log.exception $!
+ Log.warn "Lockfile exception: " << $!.message
+ end
+ end
+ res.abort_callback = Proc.new do
+ begin
+ lockfile.unlock
+ rescue
+ Log.exception $!
+ Log.warn "Lockfile exception: " << $!.message
+ end
+ end
+ raise KeepLocked.new res
+ end
+ end
+
+ case res
+ when IO
+ begin
+ res = case
+ when :array
+ res.read.split "\n"
+ when :tsv
+ TSV.open(res)
+ else
+ res.read
+ end
+ res.join if res.respond_to? :join
+ rescue
+ res.abort if res.respond_to? :abort
+ raise $!
+ end
+ when (defined? TSV and TSV::Dumper)
+ begin
+ io = res.stream
+ res = TSV.open(io)
+ io.join if io.respond_to? :join
+ rescue
+ io.abort if io.respond_to? :abort
+ raise $!
+ end
+ end
+ res
+ end
+
+ def self.persist_file(path, type, persist_options, &block)
+
+ if is_persisted?(path, persist_options)
+ Log.low "Persist up-to-date: #{ path } - #{Misc.fingerprint persist_options}"
+ return path if persist_options[:no_load]
+ return load_file(path, type)
+ end
+
+ begin
+
+ lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir})
+ Misc.lock lock_filename do |lockfile|
+
+ if is_persisted?(path, persist_options)
+ Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}"
+ return path if persist_options[:no_load]
+ return load_file(path, type)
+ end
+
+ Log.medium "Persist create: #{ path } - #{type} #{Misc.fingerprint persist_options}"
+
+ res = get_result(path, type, persist_options, lockfile, &block)
+
+ Misc.lock(path) do
+ save_file(path, type, res)
+ end
+
+ persist_options[:no_load] ? path : res
+ end
+
+ rescue
+ Log.error "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}"
+ FileUtils.rm path if Open.exists? path
+ raise $!
+ end
+ end
+
+ def self.persist(name, type = nil, persist_options = {}, &block)
type ||= :marshal
return (persist_options[:repo] || Persist::MEMORY)[persist_options[:file]] ||= yield if type ==:memory and persist_options[:file] and persist_options[:persist] and persist_options[:persist] != :update
if FalseClass == persist_options[:persist]
@@ -386,113 +507,10 @@
entities
end
else
-
- if is_persisted?(path, persist_options)
- Log.low "Persist up-to-date: #{ path } - #{Misc.fingerprint persist_options}"
- return path if persist_options[:no_load]
- return load_file(path, type)
- end
-
- begin
-
- lock_filename = Persist.persistence_path(path + '.persist', {:dir => Persist.lock_dir})
- Misc.lock lock_filename do |lockfile|
-
- if is_persisted?(path, persist_options)
- Log.low "Persist up-to-date (suddenly): #{ path } - #{Misc.fingerprint persist_options}"
- return path if persist_options[:no_load]
- return load_file(path, type)
- end
-
- Log.medium "Persist create: #{ path } - #{Misc.fingerprint persist_options}"
-
- res = yield
-
- if persist_options[:no_load] == :stream
- case res
- when IO
- res = tee_stream(res, path, type, res.respond_to?(:callback)? res.callback : nil)
- ConcurrentStream.setup res do
- begin
- lockfile.unlock if lockfile.locked?
- rescue
- Log.exception $!
- Log.warn "Lockfile exception: " << $!.message
- end
- end
- res.abort_callback = Proc.new do
- begin
- lockfile.unlock if lockfile.locked?
- rescue
- Log.exception $!
- Log.warn "Lockfile exception: " << $!.message
- end
- end
- raise KeepLocked.new res
- when TSV::Dumper
- res = tee_stream(res.stream, path, type, res.respond_to?(:callback)? res.callback : nil)
- ConcurrentStream.setup res do
- begin
- lockfile.unlock
- rescue
- Log.exception $!
- Log.warn "Lockfile exception: " << $!.message
- end
- end
- res.abort_callback = Proc.new do
- begin
- lockfile.unlock
- rescue
- Log.exception $!
- Log.warn "Lockfile exception: " << $!.message
- end
- end
- raise KeepLocked.new res
- end
- end
-
- case res
- when IO
- begin
- res = case
- when :array
- res.read.split "\n"
- when :tsv
- TSV.open(res)
- else
- res.read
- end
- res.join if res.respond_to? :join
- rescue
- res.abort if res.respond_to? :abort
- raise $!
- end
- when (defined? TSV and TSV::Dumper)
- begin
- io = res.stream
- res = TSV.open(io)
- io.join if io.respond_to? :join
- rescue
- io.abort if io.respond_to? :abort
- raise $!
- end
- end
-
- Misc.lock(path) do
- save_file(path, type, res)
- end
-
- persist_options[:no_load] ? path : res
- end
-
- rescue
- Log.error "Error in persist: #{path}#{Open.exists?(path) ? Log.color(:red, " Erasing") : ""}"
- FileUtils.rm path if Open.exists? path
- raise $!
- end
+ persist_file(path, type, persist_options, &block)
end
end
end