lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.10 vs lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.11

- old
+ new

@@ -1,312 +1,259 @@ class StreamWorkflowTask + include WorkflowRESTHelpers + include RbbtRESTHelpers + def initialize(app) @app = app end + EOL = "\r\n" + + def parse_uri(env) + uri = env["REQUEST_URI"] + _n, workflow, task = uri.split("/") + workflow = begin + Kernel.const_get(workflow) + rescue + raise "Could not accept task for workflow: #{ workflow }" + end + [workflow, task] + end + def read_normal_inputs(io, boundary, stream_input) - content = "" - content_start = false - variable = nil - filename = nil inputs = {} + input_name = nil + variable_chunk = nil + filename = nil + while line = io.gets line.chomp! - if line == "--" + boundary - if variable - inputs[variable] = content + chunk_start = line == "--" + boundary + + if chunk_start + if input_name + inputs[input_name] = variable_chunk end content_start = false - content = "" + elsif content_start + if variable_chunk.empty? + variable_chunk << line + else + variable_chunk << "\n" << line + end elsif line =~ /^Content.* name="([^\s;"]*)"/ - variable = $1 + input_name = $1 filename = line.match(/filename="([^"]+)"/)[1] if line =~ /filename/ - content = "" elsif line.empty? + variable_chunk = "" + break if input_name == stream_input content_start = true - break if variable == stream_input - else - content << line if content_start end end [inputs, filename] end - def parse_uri(env) - uri = env["REQUEST_URI"] - _n, workflow, task = uri.split("/") - workflow = begin - Kernel.const_get(workflow) - rescue - raise "Could not accept task for workflow: #{ workflow }" - end - [workflow, task] + def copy_until_boundary(sin, sout, boundary) + while line = sin.gets + break if line.include? boundary + sout.write line + end end - EOL = "\r\n" - def read_chunk(sin, rest = "") - parts = [] - c = sin.read(1024) - #c = sin.gets - raise "Early connection close" if c.nil? - c = rest << c unless rest.empty? - c = c[2..-1] if c[0..1] == EOL - index = c.index EOL - while index - part = c[0..index-1] - parts << part - c = c[index+2..-1] - index = c.index EOL - end - rest = c - [parts, rest] + def get_inputs(content_type, stream) + boundary = content_type.match(/boundary=([^\s;]*)/)[1] + stream_input = content_type.match(/stream=([^\s;]*)/)[1] + inputs, filename = read_normal_inputs(stream, boundary, stream_input) + + IndiferentHash.setup(inputs) + + [inputs, stream_input, filename, stream, boundary] end - def copy_chunked_stream(sin, sout, boundary) + def run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil) + name = inputs.delete "jobname" - rest = "" - done = false - content = false + task_parameters = prepare_job_inputs(workflow, task, inputs) - while not done - parts, rest = read_chunk(sin, rest) - while parts.any? - part = parts.shift - if content - part.split("\n").each do |line| - sout.puts line - if line.include? boundary - done = true - break - end + Misc.add_stream_filename(stream, filename) if filename + + task_parameters[stream_input] = stream + + task = task.to_sym + + job = workflow.job(task, name, task_parameters) + + execution_type = case + when workflow.exec_exports.include?(task) + "exec" + when workflow.synchronous_exports.include?(task) + "synchronous" + when workflow.asynchronous_exports.include?(task) + "asynchronous" + else + raise "No known export type for #{ workflow } #{ task }. Accesses denied" + end + + execution_type = "exec" if inputs["_cache_type"] == 'exec' + + begin + case execution_type + when "exec", nil + job.exec(:stream) + when "sync", "synchronous", "async", "asynchronous" + if job.done? or job.started? + done_consumer = Thread.new do + Misc.consume_stream(stream, false) end - content = false + job.join unless job.done? else - content = true + job.run(:stream) end + else + raise "Unknown execution_type: #{execution_type}" end + + rescue Aborted, Interrupt + job.abort + stream.write "HTTP/1.1 500\r\n" + stream.close_write + rescue Exception + job.exception $! + stream.write "HTTP/1.1 500\r\n" + stream.close_write end - sout.write rest - sout.close + job end - def merge_chunks(sin, sout) + def _merge_chunks(sin, sout) - rest = "" - done = false - content = true - - while not done - chunk_size_str = "" - while chunk_size_str.strip.empty? - chunk_size_str = sin.gets - raise "Error reading chunks" if chunk_size_str.nil? + begin + while true + chunk_size_str = "" + stop = false + while chunk_size_str.strip.empty? + chunk_size_str = sin.gets + raise "Empty chunk size" if chunk_size_str.nil? or chunk_size_str.strip.empty? + chunk_size_str = "" if chunk_size_str.nil? + end + break if stop + size = chunk_size_str.strip.to_i(16) + break if size == 0 + chunk = sin.read(size) + bound = sin.read(2) + raise "bound not right: #{ bound }" if bound != EOL + raise "Size does not match: #{[chunk.length, size] * " != "}" if chunk.length != size + sout.write chunk end - size = chunk_size_str.strip.to_i(16) - break if size == 0 - chunk = sin.read(size) - bound = sin.read(2) - raise "Size does not match" if false and chunk.length != size - sout.write chunk - end + rescue Aborted + raise $! + rescue Exception + Log.exception $! + raise $! + ensure + if sin.respond_to? :close_read + sin.close_read + else + sin.close unless sin.closed? + end + if sin.respond_to? :threads + sin.threads.each do |th| th.raise Aborted end + end - sout.write rest - sout.close + end end - def copy_until_boundary(sin, sout, boundary) - while line = sin.gets - break if line.include? boundary - sout.write line + def merge_chunks(sin, sout, buffer) + if buffer.nil? + _merge_chunks(sin, sout) + else + ssin = Misc.open_pipe do |s| + begin + s << buffer + while c = sin.readpartial(Misc::BLOCK_SIZE) + s << c + end + rescue Aborted, IOError + rescue Exception + ensure + sin.close_read + s.close + end + end + _merge_chunks(ssin, sout) end end + def do_stream(env) + uri = env["REQUEST_URI"] + post = env["REQUEST_METHOD"] + hijack = !!env["rack.hijack"] + content_type = env["CONTENT_TYPE"] + encoding = env["HTTP_TRANSFER_ENCODING"] + id = env["HTTP_RBBT_ID"] + id = id + ": " + Thread.current.object_id.to_s if id + post == "POST" and hijack and content_type and content_type.include? "Rbbt_Param_Stream" and encoding == 'chunked' + end + def call(env) - if env["REQUEST_METHOD"] == "POST" and env["rack.hijack"] and env["CONTENT_TYPE"] and env["CONTENT_TYPE"].include? "Rbbt_Param_Stream" and env["HTTP_TRANSFER_ENCODING"] == 'chunked' - Log.high "Hijacking post data" - inputs = {} - content_type = env["CONTENT_TYPE"] - boundary = content_type.match(/boundary=([^\s;]*)/)[1] - stream_input = content_type.match(/stream=([^\s;]*)/)[1] - post_stream_chunked = env["rack.hijack"].call - job_url = nil + if do_stream(env) begin - post_stream = Misc.open_pipe do |sin| - merge_chunks(post_stream_chunked, sin) - end - inputs, filename = read_normal_inputs(post_stream, boundary, stream_input) + client = env["rack.hijack"] + buffer = client.instance_variable_get('@buffer') + tcp_io = client.call + Log.low "YES Hijacking post data #{tcp_io}" - input_stream_out, input_stream_in = Misc.pipe - Misc.add_stream_filename(input_stream_out, filename) if filename - inputs[stream_input] = input_stream_out + content_type = env["CONTENT_TYPE"] - workflow, task = parse_uri(env) - name = inputs.delete "jobname" - job = workflow.job(task, name, inputs) - Log.high "Run job #{job.path} with inputs #{Misc.fingerprint(inputs)}" + tcp_merged_io = Misc.open_pipe do |sin| merge_chunks(tcp_io, sin, buffer) end - job_url = File.join("/", workflow.to_s, task, job.name) + inputs, stream_input, filename, stream, boundary = get_inputs(content_type, tcp_merged_io) - task = task.to_sym - execution_type = case - when workflow.exec_exports.include?(task) - "exec" - when workflow.synchronous_exports.include?(task) - "synchronous" - when workflow.asynchronous_exports.include?(task) - "asynchronous" - else - raise "No known export type for #{ workflow } #{ task }. Accesses denied" - end + workflow, task = parse_uri(env) - execution_type = "exec" if inputs["_cache_type"] == 'exec' - Log.info "Streaming task with execution_type: #{ execution_type }" + job = run_job(workflow, task, inputs, stream_input, stream, boundary, filename) - case execution_type - when "exec", nil - job.exec(:stream) - when "sync", "synchronous", "async", "asynchronous" - if job.done? - done_consumer = Thread.new do - while c = post_stream.read(1024) - end - end - else - job.run(:stream) - end - else - raise "Unknown execution_type: #{execution_type}" - end + job_url = File.join("/", workflow.to_s, task, job.name) - t_in = Thread.new do - begin - copy_until_boundary(post_stream, input_stream_in, boundary) - input_stream_in.close - post_stream.close_read - rescue - Log.exception $! - end - end unless job.done? + out_stream = TSV.get_stream job - job_output = TSV.get_stream job - t_out = Thread.new do - begin - post_stream_chunked.write "HTTP/1.1 200\r\n" - post_stream_chunked.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n" - post_stream_chunked.write "\r\n" - while c = job_output.read(1024) - post_stream_chunked.write c - end - job_output.join if job_output.respond_to? :join - post_stream_chunked.close_write - done_consumer.join if done_consumer - rescue - Log.exception $! - job.abort - end - end + begin + Log.low "Write response #{tcp_io} " + tcp_io.write "HTTP/1.1 200\r\n" + tcp_io.write "Connection: close\r\n" + tcp_io.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n" + tcp_io.write "\r\n" + Log.low "Comsuming response #{tcp_io}" + Misc.consume_stream(out_stream, false, tcp_io) + Log.low "Comsumed response #{tcp_io}" + rescue Exception + Log.exception $! + end if out_stream - end + tcp_io.close unless tcp_io.closed? + Log.low "Closed io #{tcp_io}" - [200, {}, nil] + [-1, {}, []] + rescue + Log.exception $! + io.write "HTTP/1.1 515\r\n" + io.write "Connection: close\r\n" + io.write "\r\n" + io.close_write + raise $! + end else Log.high "NOT Hijacking post data" + @app.call(env) end end - #def call_old(env) - # if env["REQUEST_METHOD"] == "POST" and env["rack.hijack"] and env["CONTENT_TYPE"] and env["CONTENT_TYPE"].include? "Rbbt_Param_Stream" and env["HTTP_TRANSFER_ENCODING"] == 'chunked' - # Log.high "Hijacking post data" - # inputs = {} - # content_type = env["CONTENT_TYPE"] - # boundary = content_type.match(/boundary=([^\s;]*)/)[1] - # stream_input = content_type.match(/stream=([^\s;]*)/)[1] - # post_stream = env["rack.hijack"].call - # job_url = nil - # begin - # inputs, filename = read_normal_inputs(post_stream, boundary, stream_input) - - # input_stream_out, input_stream_in = Misc.pipe - # Misc.add_stream_filename(input_stream_out, filename) if filename - # inputs[stream_input] = input_stream_out - - # workflow, task = parse_uri(env) - # name = inputs.delete "jobname" - # job = workflow.job(task, name, inputs) - # Log.high "Run job #{job.path} with inputs #{Misc.fingerprint(inputs)}" - - # job_url = File.join("/", workflow.to_s, task, job.name) - - # task = task.to_sym - # execution_type = case - # when workflow.exec_exports.include?(task) - # "exec" - # when workflow.synchronous_exports.include?(task) - # "synchronous" - # when workflow.asynchronous_exports.include?(task) - # "asynchronous" - # else - # raise "No known export type for #{ workflow } #{ task }. Accesses denied" - # end - - # execution_type = "exec" if inputs["_cache_type"] == 'exec' - # Log.info "Streaming task with execution_type: #{ execution_type }" - - # case execution_type - # when "exec", nil - # job.exec(:stream) - # when "sync", "synchronous", "async", "asynchronous" - # if job.done? - # done_consumer = Thread.new do - # while c = post_stream.read(1024) - # end - # end - # else - # job.run(:stream) - # end - # else - # raise "Unknown execution_type: #{execution_type}" - # end - - # t_in = Thread.new do - # begin - # copy_chunked_stream(post_stream, input_stream_in, boundary) - # rescue - # Log.exception $! - # end - # end unless job.done? - - # job_output = TSV.get_stream job - # t_out = Thread.new do - # begin - # post_stream.write "HTTP/1.1 200\r\n" - # post_stream.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n" - # post_stream.write "\r\n" - # while c = job_output.read(1024) - # post_stream.write c - # end - # job_output.join if job_output.respond_to? :join - # post_stream.close_write - # done_consumer.join if done_consumer - # rescue - # Log.exception $! - # job.abort - # end - # end - - # end - # [200, {}, nil] - # else - # Log.high "NOT Hijacking post data" - # @app.call(env) - # end - #end end