lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.25 vs lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.26

- old
+ new

@@ -85,10 +85,11 @@ clean_stream = Misc.open_pipe do |sin| begin copy_until_boundary(stream, sin, boundary) rescue + Log.exception $! end end ConcurrentStream.setup(clean_stream, :filename => filename) @@ -99,34 +100,35 @@ Log.low "Running streaming job #{[workflow, task] * "/" }: #{Misc.fingerprint task_parameters}" job = workflow.job(task, name, task_parameters) job.clean if job.aborted? - execution_type = case - when workflow.exec_exports.include?(task) - "exec" - when workflow.synchronous_exports.include?(task) - "synchronous" - when workflow.asynchronous_exports.include?(task) - "asynchronous" - else - raise "No known export type for #{ workflow } #{ task }. Accesses denied" - end + execution_type = type_of_export(workflow, task) + #execution_type = case + # when workflow.exec_exports.include?(task) + # "exec" + # when workflow.synchronous_exports.include?(task) + # "synchronous" + # when workflow.asynchronous_exports.include?(task) + # "asynchronous" + # else + # raise "No known export type for #{ workflow } #{ task }. Accesses denied" + # end execution_type = "exec" if inputs["_cache_type"] == 'exec' begin case execution_type when "exec", nil job.exec(:stream) when "sync", "synchronous", "async", "asynchronous" if job.done? or job.started? done_consumer = Thread.new do - Misc.consume_stream(stream, false) + Misc.consume_stream(stream) end job.join unless job.done? else - job.run(:stream) + job.fork(:stream) end else raise "Unknown execution_type: #{execution_type}" end