lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.10 vs lib/rbbt/rest/workflow/stream_task.rb in rbbt-rest-1.8.11
- old
+ new
@@ -1,312 +1,259 @@
class StreamWorkflowTask
+ include WorkflowRESTHelpers
+ include RbbtRESTHelpers
+
def initialize(app)
@app = app
end
+ EOL = "\r\n"
+
+ def parse_uri(env)
+ uri = env["REQUEST_URI"]
+ _n, workflow, task = uri.split("/")
+ workflow = begin
+ Kernel.const_get(workflow)
+ rescue
+ raise "Could not accept task for workflow: #{ workflow }"
+ end
+ [workflow, task]
+ end
+
def read_normal_inputs(io, boundary, stream_input)
- content = ""
- content_start = false
- variable = nil
- filename = nil
inputs = {}
+ input_name = nil
+ variable_chunk = nil
+ filename = nil
+
while line = io.gets
line.chomp!
- if line == "--" + boundary
- if variable
- inputs[variable] = content
+ chunk_start = line == "--" + boundary
+
+ if chunk_start
+ if input_name
+ inputs[input_name] = variable_chunk
end
content_start = false
- content = ""
+ elsif content_start
+ if variable_chunk.empty?
+ variable_chunk << line
+ else
+ variable_chunk << "\n" << line
+ end
elsif line =~ /^Content.* name="([^\s;"]*)"/
- variable = $1
+ input_name = $1
filename = line.match(/filename="([^"]+)"/)[1] if line =~ /filename/
- content = ""
elsif line.empty?
+ variable_chunk = ""
+ break if input_name == stream_input
content_start = true
- break if variable == stream_input
- else
- content << line if content_start
end
end
[inputs, filename]
end
- def parse_uri(env)
- uri = env["REQUEST_URI"]
- _n, workflow, task = uri.split("/")
- workflow = begin
- Kernel.const_get(workflow)
- rescue
- raise "Could not accept task for workflow: #{ workflow }"
- end
- [workflow, task]
+ def copy_until_boundary(sin, sout, boundary)
+ while line = sin.gets
+ break if line.include? boundary
+ sout.write line
+ end
end
- EOL = "\r\n"
- def read_chunk(sin, rest = "")
- parts = []
- c = sin.read(1024)
- #c = sin.gets
- raise "Early connection close" if c.nil?
- c = rest << c unless rest.empty?
- c = c[2..-1] if c[0..1] == EOL
- index = c.index EOL
- while index
- part = c[0..index-1]
- parts << part
- c = c[index+2..-1]
- index = c.index EOL
- end
- rest = c
- [parts, rest]
+ def get_inputs(content_type, stream)
+ boundary = content_type.match(/boundary=([^\s;]*)/)[1]
+ stream_input = content_type.match(/stream=([^\s;]*)/)[1]
+ inputs, filename = read_normal_inputs(stream, boundary, stream_input)
+
+ IndiferentHash.setup(inputs)
+
+ [inputs, stream_input, filename, stream, boundary]
end
- def copy_chunked_stream(sin, sout, boundary)
+ def run_job(workflow, task, inputs, stream_input, stream, boundary, filename = nil)
+ name = inputs.delete "jobname"
- rest = ""
- done = false
- content = false
+ task_parameters = prepare_job_inputs(workflow, task, inputs)
- while not done
- parts, rest = read_chunk(sin, rest)
- while parts.any?
- part = parts.shift
- if content
- part.split("\n").each do |line|
- sout.puts line
- if line.include? boundary
- done = true
- break
- end
+ Misc.add_stream_filename(stream, filename) if filename
+
+ task_parameters[stream_input] = stream
+
+ task = task.to_sym
+
+ job = workflow.job(task, name, task_parameters)
+
+ execution_type = case
+ when workflow.exec_exports.include?(task)
+ "exec"
+ when workflow.synchronous_exports.include?(task)
+ "synchronous"
+ when workflow.asynchronous_exports.include?(task)
+ "asynchronous"
+ else
+ raise "No known export type for #{ workflow } #{ task }. Accesses denied"
+ end
+
+ execution_type = "exec" if inputs["_cache_type"] == 'exec'
+
+ begin
+ case execution_type
+ when "exec", nil
+ job.exec(:stream)
+ when "sync", "synchronous", "async", "asynchronous"
+ if job.done? or job.started?
+ done_consumer = Thread.new do
+ Misc.consume_stream(stream, false)
end
- content = false
+ job.join unless job.done?
else
- content = true
+ job.run(:stream)
end
+ else
+ raise "Unknown execution_type: #{execution_type}"
end
+
+ rescue Aborted, Interrupt
+ job.abort
+ stream.write "HTTP/1.1 500\r\n"
+ stream.close_write
+ rescue Exception
+ job.exception $!
+ stream.write "HTTP/1.1 500\r\n"
+ stream.close_write
end
- sout.write rest
- sout.close
+ job
end
- def merge_chunks(sin, sout)
+ def _merge_chunks(sin, sout)
- rest = ""
- done = false
- content = true
-
- while not done
- chunk_size_str = ""
- while chunk_size_str.strip.empty?
- chunk_size_str = sin.gets
- raise "Error reading chunks" if chunk_size_str.nil?
+ begin
+ while true
+ chunk_size_str = ""
+ stop = false
+ while chunk_size_str.strip.empty?
+ chunk_size_str = sin.gets
+ raise "Empty chunk size" if chunk_size_str.nil? or chunk_size_str.strip.empty?
+ chunk_size_str = "" if chunk_size_str.nil?
+ end
+ break if stop
+ size = chunk_size_str.strip.to_i(16)
+ break if size == 0
+ chunk = sin.read(size)
+ bound = sin.read(2)
+ raise "bound not right: #{ bound }" if bound != EOL
+ raise "Size does not match: #{[chunk.length, size] * " != "}" if chunk.length != size
+ sout.write chunk
end
- size = chunk_size_str.strip.to_i(16)
- break if size == 0
- chunk = sin.read(size)
- bound = sin.read(2)
- raise "Size does not match" if false and chunk.length != size
- sout.write chunk
- end
+ rescue Aborted
+ raise $!
+ rescue Exception
+ Log.exception $!
+ raise $!
+ ensure
+ if sin.respond_to? :close_read
+ sin.close_read
+ else
+ sin.close unless sin.closed?
+ end
+ if sin.respond_to? :threads
+ sin.threads.each do |th| th.raise Aborted end
+ end
- sout.write rest
- sout.close
+ end
end
- def copy_until_boundary(sin, sout, boundary)
- while line = sin.gets
- break if line.include? boundary
- sout.write line
+ def merge_chunks(sin, sout, buffer)
+ if buffer.nil?
+ _merge_chunks(sin, sout)
+ else
+ ssin = Misc.open_pipe do |s|
+ begin
+ s << buffer
+ while c = sin.readpartial(Misc::BLOCK_SIZE)
+ s << c
+ end
+ rescue Aborted, IOError
+ rescue Exception
+ ensure
+ sin.close_read
+ s.close
+ end
+ end
+ _merge_chunks(ssin, sout)
end
end
+ def do_stream(env)
+ uri = env["REQUEST_URI"]
+ post = env["REQUEST_METHOD"]
+ hijack = !!env["rack.hijack"]
+ content_type = env["CONTENT_TYPE"]
+ encoding = env["HTTP_TRANSFER_ENCODING"]
+ id = env["HTTP_RBBT_ID"]
+ id = id + ": " + Thread.current.object_id.to_s if id
+ post == "POST" and hijack and content_type and content_type.include? "Rbbt_Param_Stream" and encoding == 'chunked'
+ end
+
def call(env)
- if env["REQUEST_METHOD"] == "POST" and env["rack.hijack"] and env["CONTENT_TYPE"] and env["CONTENT_TYPE"].include? "Rbbt_Param_Stream" and env["HTTP_TRANSFER_ENCODING"] == 'chunked'
- Log.high "Hijacking post data"
- inputs = {}
- content_type = env["CONTENT_TYPE"]
- boundary = content_type.match(/boundary=([^\s;]*)/)[1]
- stream_input = content_type.match(/stream=([^\s;]*)/)[1]
- post_stream_chunked = env["rack.hijack"].call
- job_url = nil
+ if do_stream(env)
begin
- post_stream = Misc.open_pipe do |sin|
- merge_chunks(post_stream_chunked, sin)
- end
- inputs, filename = read_normal_inputs(post_stream, boundary, stream_input)
+ client = env["rack.hijack"]
+ buffer = client.instance_variable_get('@buffer')
+ tcp_io = client.call
+ Log.low "YES Hijacking post data #{tcp_io}"
- input_stream_out, input_stream_in = Misc.pipe
- Misc.add_stream_filename(input_stream_out, filename) if filename
- inputs[stream_input] = input_stream_out
+ content_type = env["CONTENT_TYPE"]
- workflow, task = parse_uri(env)
- name = inputs.delete "jobname"
- job = workflow.job(task, name, inputs)
- Log.high "Run job #{job.path} with inputs #{Misc.fingerprint(inputs)}"
+ tcp_merged_io = Misc.open_pipe do |sin| merge_chunks(tcp_io, sin, buffer) end
- job_url = File.join("/", workflow.to_s, task, job.name)
+ inputs, stream_input, filename, stream, boundary = get_inputs(content_type, tcp_merged_io)
- task = task.to_sym
- execution_type = case
- when workflow.exec_exports.include?(task)
- "exec"
- when workflow.synchronous_exports.include?(task)
- "synchronous"
- when workflow.asynchronous_exports.include?(task)
- "asynchronous"
- else
- raise "No known export type for #{ workflow } #{ task }. Accesses denied"
- end
+ workflow, task = parse_uri(env)
- execution_type = "exec" if inputs["_cache_type"] == 'exec'
- Log.info "Streaming task with execution_type: #{ execution_type }"
+ job = run_job(workflow, task, inputs, stream_input, stream, boundary, filename)
- case execution_type
- when "exec", nil
- job.exec(:stream)
- when "sync", "synchronous", "async", "asynchronous"
- if job.done?
- done_consumer = Thread.new do
- while c = post_stream.read(1024)
- end
- end
- else
- job.run(:stream)
- end
- else
- raise "Unknown execution_type: #{execution_type}"
- end
+ job_url = File.join("/", workflow.to_s, task, job.name)
- t_in = Thread.new do
- begin
- copy_until_boundary(post_stream, input_stream_in, boundary)
- input_stream_in.close
- post_stream.close_read
- rescue
- Log.exception $!
- end
- end unless job.done?
+ out_stream = TSV.get_stream job
- job_output = TSV.get_stream job
- t_out = Thread.new do
- begin
- post_stream_chunked.write "HTTP/1.1 200\r\n"
- post_stream_chunked.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n"
- post_stream_chunked.write "\r\n"
- while c = job_output.read(1024)
- post_stream_chunked.write c
- end
- job_output.join if job_output.respond_to? :join
- post_stream_chunked.close_write
- done_consumer.join if done_consumer
- rescue
- Log.exception $!
- job.abort
- end
- end
+ begin
+ Log.low "Write response #{tcp_io} "
+ tcp_io.write "HTTP/1.1 200\r\n"
+ tcp_io.write "Connection: close\r\n"
+ tcp_io.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n"
+ tcp_io.write "\r\n"
+ Log.low "Comsuming response #{tcp_io}"
+ Misc.consume_stream(out_stream, false, tcp_io)
+ Log.low "Comsumed response #{tcp_io}"
+ rescue Exception
+ Log.exception $!
+ end if out_stream
- end
+ tcp_io.close unless tcp_io.closed?
+ Log.low "Closed io #{tcp_io}"
- [200, {}, nil]
+ [-1, {}, []]
+ rescue
+ Log.exception $!
+ io.write "HTTP/1.1 515\r\n"
+ io.write "Connection: close\r\n"
+ io.write "\r\n"
+ io.close_write
+ raise $!
+ end
else
Log.high "NOT Hijacking post data"
+
@app.call(env)
end
end
- #def call_old(env)
- # if env["REQUEST_METHOD"] == "POST" and env["rack.hijack"] and env["CONTENT_TYPE"] and env["CONTENT_TYPE"].include? "Rbbt_Param_Stream" and env["HTTP_TRANSFER_ENCODING"] == 'chunked'
- # Log.high "Hijacking post data"
- # inputs = {}
- # content_type = env["CONTENT_TYPE"]
- # boundary = content_type.match(/boundary=([^\s;]*)/)[1]
- # stream_input = content_type.match(/stream=([^\s;]*)/)[1]
- # post_stream = env["rack.hijack"].call
- # job_url = nil
- # begin
- # inputs, filename = read_normal_inputs(post_stream, boundary, stream_input)
-
- # input_stream_out, input_stream_in = Misc.pipe
- # Misc.add_stream_filename(input_stream_out, filename) if filename
- # inputs[stream_input] = input_stream_out
-
- # workflow, task = parse_uri(env)
- # name = inputs.delete "jobname"
- # job = workflow.job(task, name, inputs)
- # Log.high "Run job #{job.path} with inputs #{Misc.fingerprint(inputs)}"
-
- # job_url = File.join("/", workflow.to_s, task, job.name)
-
- # task = task.to_sym
- # execution_type = case
- # when workflow.exec_exports.include?(task)
- # "exec"
- # when workflow.synchronous_exports.include?(task)
- # "synchronous"
- # when workflow.asynchronous_exports.include?(task)
- # "asynchronous"
- # else
- # raise "No known export type for #{ workflow } #{ task }. Accesses denied"
- # end
-
- # execution_type = "exec" if inputs["_cache_type"] == 'exec'
- # Log.info "Streaming task with execution_type: #{ execution_type }"
-
- # case execution_type
- # when "exec", nil
- # job.exec(:stream)
- # when "sync", "synchronous", "async", "asynchronous"
- # if job.done?
- # done_consumer = Thread.new do
- # while c = post_stream.read(1024)
- # end
- # end
- # else
- # job.run(:stream)
- # end
- # else
- # raise "Unknown execution_type: #{execution_type}"
- # end
-
- # t_in = Thread.new do
- # begin
- # copy_chunked_stream(post_stream, input_stream_in, boundary)
- # rescue
- # Log.exception $!
- # end
- # end unless job.done?
-
- # job_output = TSV.get_stream job
- # t_out = Thread.new do
- # begin
- # post_stream.write "HTTP/1.1 200\r\n"
- # post_stream.write "RBBT-STREAMING-JOB-URL: #{ job_url }\r\n"
- # post_stream.write "\r\n"
- # while c = job_output.read(1024)
- # post_stream.write c
- # end
- # job_output.join if job_output.respond_to? :join
- # post_stream.close_write
- # done_consumer.join if done_consumer
- # rescue
- # Log.exception $!
- # job.abort
- # end
- # end
-
- # end
- # [200, {}, nil]
- # else
- # Log.high "NOT Hijacking post data"
- # @app.call(env)
- # end
- #end
end