lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.15 vs lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.16

- old
+ new

@@ -77,10 +77,11 @@ def run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil) name = inputs.delete "jobname" task_parameters = prepare_job_inputs(workflow, task, inputs) + IndiferentHash.setup task_parameters Misc.add_stream_filename(stream, filename) if filename clean_stream = Misc.open_pipe do |sin| begin @@ -89,15 +90,18 @@ end end ConcurrentStream.setup(clean_stream, :filename => filename) - task_parameters[stream_input] = clean_stream + task_parameters[stream_input.to_sym] = clean_stream task = task.to_sym + Log.low "Running streaming job #{[workflow, task] * "/" }: #{Misc.fingerprint task_parameters}" job = workflow.job(task, name, task_parameters) + + job.clean if job.aborted? execution_type = case when workflow.exec_exports.include?(task) "exec" when workflow.synchronous_exports.include?(task) @@ -229,11 +233,10 @@ content_type = env["CONTENT_TYPE"] tcp_merged_io = Misc.open_pipe do |sin| merge_chunks(tcp_io, sin, buffer) end - inputs, stream_input, filename, stream, boundary = get_inputs(content_type, tcp_merged_io) workflow, task = parse_uri(env) job = run_job(workflow, task, inputs, stream_input, stream, boundary, filename) @@ -244,31 +247,31 @@ raise job.messages.last if job.error? out_stream = TSV.get_stream job begin - Log.low "Write response #{tcp_io} " + Log.low "Write response #{Misc.fingerprint tcp_io} " tcp_io.write "HTTP/1.1 200\r\n" tcp_io.write "Connection: close\r\n" tcp_io.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n" tcp_io.write "\r\n" - Log.low "Comsuming response #{tcp_io}" + Log.low "Comsuming response #{Misc.fingerprint tcp_io}" Misc.consume_stream(out_stream, false, tcp_io) - Log.low "Comsumed response #{tcp_io}" + Log.low "Comsumed response #{Misc.fingerprint tcp_io}" rescue Exception Log.exception $! end if out_stream tcp_io.close unless tcp_io.closed? Log.low "Closed io #{tcp_io}" [-1, {}, []] rescue Log.exception $! - io.write "HTTP/1.1 515\r\n" - io.write "Connection: close\r\n" - io.write "\r\n" - io.close_write + tcp_io.write "HTTP/1.1 515\r\n" + tcp_io.write "Connection: close\r\n" + tcp_io.write "\r\n" + tcp_io.close_write raise $! end else Log.low "NOT Hijacking post data"