lib/rbbt/workflow/remote_workflow/remote_step.rb in rbbt-util-5.27.5 vs lib/rbbt/workflow/remote_workflow/remote_step.rb in rbbt-util-5.27.6
- old
+ new
@@ -23,12 +23,17 @@
def clean_name
@base_name
end
def cache_file
- digest = Misc.obj2digest([base_url, task, base_name, inputs])
- Rbbt.var.cache.REST[[task, clean_name, digest] * "."].find
+ begin
+ digest = Misc.obj2digest([base_url, task, base_name, inputs])
+ Rbbt.var.cache.REST[[task, clean_name, digest].compact * "."].find
+ rescue
+ Log.exception $!
+ raise $!
+ end
end
def cache_files
Dir.glob(cache_file + '.*')
end
@@ -60,11 +65,10 @@
return @result if no_load == :stream
no_load ? Misc.add_GET_param(path, "_format", "raw") : @result
end
-
def self.get_streams(inputs, stream_input = nil)
new_inputs = {}
inputs.each do |k,v|
stream = stream_input.to_s == k.to_s
if Step === v
@@ -238,10 +242,11 @@
def join
return true if cache_files.any?
init_job unless @url
Log.debug{ "Joining RemoteStep: #{path}" }
+
if IO === @result
res = @result
@result = nil
Misc.consume_stream(res, true)
end
@@ -251,9 +256,10 @@
return self if self.done? || self.aborted? || self.error?
sleep 0.2 unless self.done? || self.aborted? || self.error?
sleep 1 unless self.done? || self.aborted? || self.error?
while not (self.done? || self.aborted? || self.error?)
sleep 3
+ iif [self.done?, self.status, self.info]
end
end
self
end