lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.15 vs lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.16
- old
+ new
@@ -77,10 +77,11 @@
def run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil)
name = inputs.delete "jobname"
task_parameters = prepare_job_inputs(workflow, task, inputs)
+ IndiferentHash.setup task_parameters
Misc.add_stream_filename(stream, filename) if filename
clean_stream = Misc.open_pipe do |sin|
begin
@@ -89,15 +90,18 @@
end
end
ConcurrentStream.setup(clean_stream, :filename => filename)
- task_parameters[stream_input] = clean_stream
+ task_parameters[stream_input.to_sym] = clean_stream
task = task.to_sym
+ Log.low "Running streaming job #{[workflow, task] * "/" }: #{Misc.fingerprint task_parameters}"
job = workflow.job(task, name, task_parameters)
+
+ job.clean if job.aborted?
execution_type = case
when workflow.exec_exports.include?(task)
"exec"
when workflow.synchronous_exports.include?(task)
@@ -229,11 +233,10 @@
content_type = env["CONTENT_TYPE"]
tcp_merged_io = Misc.open_pipe do |sin| merge_chunks(tcp_io, sin, buffer) end
-
inputs, stream_input, filename, stream, boundary = get_inputs(content_type, tcp_merged_io)
workflow, task = parse_uri(env)
job = run_job(workflow, task, inputs, stream_input, stream, boundary, filename)
@@ -244,31 +247,31 @@
raise job.messages.last if job.error?
out_stream = TSV.get_stream job
begin
- Log.low "Write response #{tcp_io} "
+ Log.low "Write response #{Misc.fingerprint tcp_io} "
tcp_io.write "HTTP/1.1 200\r\n"
tcp_io.write "Connection: close\r\n"
tcp_io.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n"
tcp_io.write "\r\n"
- Log.low "Comsuming response #{tcp_io}"
+ Log.low "Comsuming response #{Misc.fingerprint tcp_io}"
Misc.consume_stream(out_stream, false, tcp_io)
- Log.low "Comsumed response #{tcp_io}"
+ Log.low "Comsumed response #{Misc.fingerprint tcp_io}"
rescue Exception
Log.exception $!
end if out_stream
tcp_io.close unless tcp_io.closed?
Log.low "Closed io #{tcp_io}"
[-1, {}, []]
rescue
Log.exception $!
- io.write "HTTP/1.1 515\r\n"
- io.write "Connection: close\r\n"
- io.write "\r\n"
- io.close_write
+ tcp_io.write "HTTP/1.1 515\r\n"
+ tcp_io.write "Connection: close\r\n"
+ tcp_io.write "\r\n"
+ tcp_io.close_write
raise $!
end
else
Log.low "NOT Hijacking post data"