lib/rbbt/rest/client.rb in rbbt-rest-1.3.2 vs lib/rbbt/rest/client.rb in rbbt-rest-1.3.3
- old
+ new
@@ -2,179 +2,17 @@
require 'json'
require 'rbbt/workflow'
require 'rbbt/workflow/step'
require 'rbbt/util/misc'
-class WorkflowRESTClient
+require 'rbbt/rest/client/get'
+require 'rbbt/rest/client/adaptor'
+require 'rbbt/rest/client/step'
- def self.fix_hash(hash, fix_values = false)
- fixed = {}
- hash.each do |key, value|
- fixed[key.to_sym] = case
- when Hash === value
- fix_hash(value)
- when (fix_values and String === value)
- value.to_sym
- else
- value
- end
- end
- fixed
- end
+class WorkflowRESTClient
+ include Workflow
- def self.get_raw(url, params = {})
- Log.debug{ "RestClient get_raw: #{ url } - #{Misc.fingerprint params}" }
- params = params.merge({ :_format => 'raw' })
- Misc.insist(2, 0.5) do
- RestClient.get(URI.encode(url), :params => params)
- end
- end
-
- def self.post_jobname(url, params = {})
- Log.debug{ "RestClient post_jobname: #{ url } - #{Misc.fingerprint params}" }
- params = params.merge({ :_format => 'jobname' })
- RestClient.post(URI.encode(url), params)
- end
-
- def self.get_json(url, params = {})
- Log.debug{ "RestClient get_json: #{ url } - #{Misc.fingerprint params }" }
- params = params.merge({ :_format => 'json' })
- begin
- res = RestClient.get(URI.encode(url), :params => params)
- rescue => e
- raise JSON.parse(e.response)["message"]
- end
- begin
- JSON.parse(res)
- rescue
- res
- end
- end
-
- def self.post_json(url, params = {})
- if url =~ /_cache_type=:exec/
- JSON.parse(Open.open(url, :nocache => true))
- else
- params = params.merge({ :_format => 'json' })
- res = RestClient.post(URI.encode(url), params)
- begin
- JSON.parse(res)
- rescue
- res
- end
- end
- end
-
- class RemoteStep < Step
-
- attr_accessor :url, :base_url, :task, :name, :inputs, :result_type, :result_description
-
- def name
- (Array === @url ? @url.first : @url).split("/").last
- end
-
- def task_name
- (Array === @url ? @url.first : @url).split("/")[-2]
- end
-
- def info
- info = WorkflowRESTClient.get_json(File.join(url, 'info'))
- info = WorkflowRESTClient.fix_hash(info)
- info[:status] = info[:status].to_sym if String === info[:status]
- info
- end
-
- def done?
- status.to_s == 'done'
- end
-
- def fork
- @name = WorkflowRESTClient.post_jobname(File.join(base_url, task.to_s), inputs.merge(:jobname => @name, :_cache_type => :asynchronous))
- @url = File.join(base_url, task.to_s, @name)
- self
- end
-
- def initialize(base_url, task = nil, name = nil, inputs = nil, result_type = nil, result_description = nil, exec = false)
- if task.nil?
- @url = base_url
- else
- @base_url, @task, @name, @inputs, @result_type, @result_description = base_url, task, name, inputs, result_type, result_description
- if exec
- @url = [File.join(base_url, task.to_s), inputs]
- else
- self.fork
- end
- end
- end
-
- def _exec(noload = false)
- if Array === @url
- url, params = @url
- else
- url, params = @url, {:_cache_type => :synchronous}
- end
-
- params[:jobname] = @name if @name
-
- if noload and %w(boolean string tsv).include? result_type
- WorkflowRESTClient.get_raw(url, params)
- else
- case result_type
- when :string
- WorkflowRESTClient.get_raw(url, params)
- when :boolean
- WorkflowRESTClient.get_raw(url, params) == "true"
- when :tsv
- TSV.open(StringIO.new(WorkflowRESTClient.get_raw(url, params)))
- when :annotations
- Annotated.load_tsv(TSV.open(StringIO.new(WorkflowRESTClient.get_raw(url, params))))
- else
- WorkflowRESTClient.get_json(url, params)
- end
- end
- end
-
- def exec
- res = _exec
- prepare_result(res, result_type)
- end
-
- def run(noload = false)
- if noload
- _exec(noload)
- else
- exec
- end
- end
-
- def load
- exec
- end
-
- def join
- exec
- self
- end
-
- def status
- info[:status]
- end
-
- def clean
- WorkflowRESTClient.get_raw(url, :_update => :clean)
- self
- end
-
- def files
- WorkflowRESTClient.get_json(File.join(url, 'files'))
- end
-
- def file(file)
- WorkflowRESTClient.get_json(File.join(url, 'file', file))
- end
- end
-
attr_accessor :url, :name, :exec_exports, :asynchronous_exports, :synchronous_exports
def initialize(url, name)
Log.debug{ "Loading remote workflow #{ name }: #{ url }" }
@url, @name = url, name
@@ -183,107 +21,10 @@
def to_s
name
end
- def workflow_description
- WorkflowRESTClient.get_raw(File.join(url, 'description'))
- end
-
- def documentation
- @documention ||= IndiferentHash.setup(WorkflowRESTClient.get_json(File.join(url, "documentation"),{}))
- end
-
- def task_info(task)
- @task_info ||= {}
- @task_info[task]
-
- if @task_info[task].nil?
- task_info = WorkflowRESTClient.get_json(File.join(url, task.to_s, 'info'))
- task_info = WorkflowRESTClient.fix_hash(task_info)
-
- task_info[:result_type] = task_info[:result_type].to_sym
- task_info[:export] = task_info[:export].to_sym
- task_info[:input_types] = WorkflowRESTClient.fix_hash(task_info[:input_types], true)
- task_info[:inputs] = task_info[:inputs].collect{|input| input.to_sym }
-
- @task_info[task] = task_info
- end
- @task_info[task]
- end
-
- def exported_tasks
- (@asynchronous_exports + @synchronous_exports + @exec_exports).compact.flatten
- end
-
- def tasks
- @tasks ||= Hash.new do |hash,task_name|
- info = task_info(task_name)
- task = Task.setup info do |*args|
- raise "This is a remote task"
- end
- task.name = task_name.to_sym
- hash[task_name] = task
- end
- end
-
- def load_tasks
- exported_tasks.each{|name| tasks[name]}
- nil
- end
-
- def task_dependencies
- @task_dependencies ||= Hash.new do |hash,task|
- hash[task] = if exported_tasks.include? task
- WorkflowRESTClient.get_json(File.join(url, task.to_s, 'dependencies'))
- else
- []
- end
- end
- end
-
- def rec_dependencies(taskname)
- if task_dependencies.include? taskname
- deps = task_dependencies[taskname].select{|dep| String === dep or Symbol === dep}
- deps.concat deps.collect{|dep| rec_dependencies(dep)}.compact.flatten
- deps.uniq
- else
- []
- end
- end
-
- def rec_inputs(taskname)
- [taskname].concat(rec_dependencies(taskname)).inject([]){|acc, tn| acc.concat tasks[tn.to_sym].inputs}
- end
-
- def rec_input_defaults(taskname)
- [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].input_defaults}.
- tap{|h| IndiferentHash.setup(h) }
- end
-
- def rec_input_types(taskname)
- [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].input_types}.
- tap{|h| IndiferentHash.setup(h) }
- end
-
- def rec_input_descriptions(taskname)
- [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].input_descriptions}.
- tap{|h| IndiferentHash.setup(h) }
- end
-
- def rec_input_options(taskname)
- [taskname].concat(rec_dependencies(taskname)).inject({}){|acc, tn| acc.merge tasks[tn.to_sym].input_options}.
- tap{|h| IndiferentHash.setup(h) }
- end
-
- def init_remote_tasks
- task_exports = WorkflowRESTClient.get_json(url)
- @asynchronous_exports = task_exports["asynchronous"].collect{|task| task.to_sym }
- @synchronous_exports = task_exports["synchronous"].collect{|task| task.to_sym }
- @exec_exports = task_exports["exec"].collect{|task| task.to_sym }
- end
-
def job(task, name, inputs)
task_info = task_info(task)
fixed_inputs = {}
inputs.each do |k,v|
fixed_inputs[k] = case v
@@ -291,80 +32,18 @@
v.to_s
else
v
end
end
+
RemoteStep.new(url, task, name, fixed_inputs, task_info[:result_type], task_info[:result_description], @exec_exports.include?(task))
end
def load_id(id)
task, name = id.split("/")
- step = RemoteStep.new File.join(url, id)
+ step = RemoteStep.new url, task, nil
+ step.name = name
step.result_type = task_info(task)[:result_type]
step.result_description = task_info(task)[:result_description]
step
- end
-
- def doc(task = nil)
-
- if task.nil?
- puts self.to_s
- puts "=" * self.to_s.length
- puts
-
- puts "## TASKS"
- puts
- tasks.each do |name,task|
- puts " * #{ name }:"
- puts " " << task.description if task.description and not task.description.empty?
- puts
- end
- else
-
- if Task === task
- task_name = task.name
- else
- task_name = task
- task = self.tasks[task_name]
- end
- dependencies = self.rec_dependencies(task_name).collect{|dep_name| self.tasks[dep_name.to_sym]}
-
- task.doc(dependencies)
- end
- end
-
- def doc(task = nil)
-
- if task.nil?
- puts Log.color :magenta, self.to_s
- puts Log.color :magenta, "=" * self.to_s.length
- if self.documentation[:description] and not self.documentation[:description].empty?
- puts
- puts Misc.format_paragraph self.documentation[:description]
- end
- puts
-
- puts Log.color :magenta, "## TASKS"
- if self.documentation[:task_description] and not self.documentation[:task_description].empty?
- puts
- puts Misc.format_paragraph self.documentation[:task_description]
- end
- puts
-
- tasks.each do |name,task|
- puts Misc.format_definition_list_item(name.to_s, task.description || "", 80, 30, :yellow)
- end
-
- else
-
- if Task === task
- task_name = task.name
- else
- task_name = task
- task = self.tasks[task_name]
- end
- dependencies = self.rec_dependencies(task_name).collect{|dep_name| self.tasks[dep_name.to_sym]}
-
- task.doc(dependencies)
- end
end
end