lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.25 vs lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.26
- old
+ new
@@ -85,10 +85,11 @@
clean_stream = Misc.open_pipe do |sin|
begin
copy_until_boundary(stream, sin, boundary)
rescue
+ Log.exception $!
end
end
ConcurrentStream.setup(clean_stream, :filename => filename)
@@ -99,34 +100,35 @@
Log.low "Running streaming job #{[workflow, task] * "/" }: #{Misc.fingerprint task_parameters}"
job = workflow.job(task, name, task_parameters)
job.clean if job.aborted?
- execution_type = case
- when workflow.exec_exports.include?(task)
- "exec"
- when workflow.synchronous_exports.include?(task)
- "synchronous"
- when workflow.asynchronous_exports.include?(task)
- "asynchronous"
- else
- raise "No known export type for #{ workflow } #{ task }. Accesses denied"
- end
+ execution_type = type_of_export(workflow, task)
+ #execution_type = case
+ # when workflow.exec_exports.include?(task)
+ # "exec"
+ # when workflow.synchronous_exports.include?(task)
+ # "synchronous"
+ # when workflow.asynchronous_exports.include?(task)
+ # "asynchronous"
+ # else
+ # raise "No known export type for #{ workflow } #{ task }. Accesses denied"
+ # end
execution_type = "exec" if inputs["_cache_type"] == 'exec'
begin
case execution_type
when "exec", nil
job.exec(:stream)
when "sync", "synchronous", "async", "asynchronous"
if job.done? or job.started?
done_consumer = Thread.new do
- Misc.consume_stream(stream, false)
+ Misc.consume_stream(stream)
end
job.join unless job.done?
else
- job.run(:stream)
+ job.fork(:stream)
end
else
raise "Unknown execution_type: #{execution_type}"
end