# Copyright (c) 2010-2012 The University of Manchester, UK. # # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: # # * Redistributions of source code must retain the above copyright notice, # this list of conditions and the following disclaimer. # # * Redistributions in binary form must reproduce the above copyright notice, # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the names of The University of Manchester nor the names of its # contributors may be used to endorse or promote products derived from this # software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # Author: Robert Haines require 'base64' require 'time' require 'rubygems' require 'taverna-baclava' module T2Server # An interface for easily running jobs on a Taverna 2 Server with minimal # setup and configuration required. # # A run can be in one of three states: # * :initialized - The run has been accepted by the server. It may not yet be # ready to run though as its input port may not have been set. # * :running - The run is being run by the server. # * :finished - The run has finished running and its outputs are available # for download. class Run include XML::Methods private_class_method :new # The identifier of this run on the server. attr_reader :identifier alias :id :identifier # The server instance that this run is hosted on. attr_reader :server # :stopdoc: XPaths = { # Run XPath queries :run_desc => XML::Methods.xpath_compile("/nsr:runDescription"), :dir => XML::Methods.xpath_compile("//nss:dir"), :file => XML::Methods.xpath_compile("//nss:file"), :expiry => XML::Methods.xpath_compile("//nsr:expiry"), :workflow => XML::Methods.xpath_compile("//nsr:creationWorkflow"), :status => XML::Methods.xpath_compile("//nsr:status"), :createtime => XML::Methods.xpath_compile("//nsr:createTime"), :starttime => XML::Methods.xpath_compile("//nsr:startTime"), :finishtime => XML::Methods.xpath_compile("//nsr:finishTime"), :wdir => XML::Methods.xpath_compile("//nsr:workingDirectory"), :inputs => XML::Methods.xpath_compile("//nsr:inputs"), :output => XML::Methods.xpath_compile("//nsr:output"), :securectx => XML::Methods.xpath_compile("//nsr:securityContext"), :listeners => XML::Methods.xpath_compile("//nsr:listeners"), :baclava => XML::Methods.xpath_compile("//nsr:baclava"), :inputexp => XML::Methods.xpath_compile("//nsr:expected"), # Port descriptions XPath queries :port_in => XML::Methods.xpath_compile("//port:input"), :port_out => XML::Methods.xpath_compile("//port:output"), # Run security XPath queries :sec_creds => XML::Methods.xpath_compile("//nsr:credentials"), :sec_perms => XML::Methods.xpath_compile("//nsr:permissions"), :sec_trusts => XML::Methods.xpath_compile("//nsr:trusts"), :sec_perm => XML::Methods.xpath_compile("/nsr:permissionsDescriptor/nsr:permission"), :sec_uname => XML::Methods.xpath_compile("nsr:userName"), :sec_uperm => XML::Methods.xpath_compile("nsr:permission"), :sec_cred => XML::Methods.xpath_compile("/nsr:credential"), :sec_suri => XML::Methods.xpath_compile("nss:serviceURI"), :sec_trust => XML::Methods.xpath_compile("/nsr:trustedIdentities/nsr:trust") } # The name to be used internally for retrieving results via baclava BACLAVA_FILE = "out.xml" # New is private but rdoc does not get it right! Hence :stopdoc: section. def initialize(server, uri, credentials = nil) @server = server @uri = uri @identifier = Util.get_path_leaf_from_uri(@uri) @workflow = "" @baclava_in = false @baclava_out = false @credentials = credentials # The following three fields hold cached data about the run that is only # downloaded the first time it is requested. @run_doc = nil @owner = nil @links = nil # initialize ports lists to nil as an empty list means no inputs/outputs @input_ports = nil @output_ports = nil end # :startdoc: # :call-seq: # Run.create(server, workflow) -> run # Run.create(server, workflow, connection_parameters) -> run # Run.create(server, workflow, user_credentials) -> run # Run.create(server, workflow, ...) {|run| ...} # # Create a new run in the :initialized state. The run will be created on # the server with address supplied by _server_. This can either be a # String of the form http://example.com:8888/blah or an already # created instance of T2Server::Server. The _workflow_ must also be # supplied as a string in t2flow or scufl format. User credentials and # connection parameters can be supplied if required but are both optional. # If _server_ is an instance of T2Server::Server then # _connection_parameters_ will be ignored. # # This method will _yield_ the newly created Run if a block is given. def Run.create(server, workflow, *rest) credentials = nil uri = nil conn_params = nil rest.each do |param| case param when URI uri = param when ConnectionParameters conn_params = param when HttpCredentials credentials = param end end if server.class != Server server = Server.new(server, conn_params) end if uri.nil? uri = server.initialize_run(workflow, credentials) end run = new(server, uri, credentials) yield(run) if block_given? run end # :stopdoc: def uuid warn "[DEPRECATION] 'uuid' is deprecated and will be removed in 1.0. " + "Please use Run#id or Run#identifier instead." @identifier end # :startdoc: # :call-seq: # owner -> String # # Get the username of the owner of this run. The owner is the user who # created the run on the server. def owner @owner = _get_run_owner if @owner.nil? @owner end # :call-seq: # delete # # Delete this run from the server. def delete @server.delete(@uri, @credentials) end # :stopdoc: def inputs warn "[DEPRECATION] 'inputs' is deprecated and will be removed in 1.0." links[:inputs] end def set_input(input, value) warn "[DEPRECATION] 'Run#set_input' is deprecated and will be removed " + "in 1.0. Input ports are set directly instead. The most direct " + "replacement for this method is: 'Run#input_port(input).value = value'" input_port(input).value = value end def set_input_file(input, filename) warn "[DEPRECATION] 'Run#set_input_file' is deprecated and will be " + "removed in 1.0. Input ports are set directly instead. The most " + "direct replacement for this method is: " + "'Run#input_port(input).remote_file = filename'" input_port(input).remote_file = filename end # :startdoc: # :call-seq: # input_ports -> Hash # # Return a hash (name, port) of all the input ports this run expects. def input_ports @input_ports = _get_input_port_info if @input_ports.nil? @input_ports end # :call-seq: # input_port(port) -> Port # # Get _port_. def input_port(port) input_ports[port] end # :call-seq: # output_ports -> Hash # # Return a hash (name, port) of all the output ports this run has. Until # the run is finished this method will return _nil_. def output_ports if finished? and @output_ports.nil? @output_ports = _get_output_port_info end @output_ports end # :call-seq: # output_port(port) -> Port # # Get output port _port_. def output_port(port) output_ports[port] if finished? end # :stopdoc: def get_output_ports warn "[DEPRECATION] 'get_output_ports' is deprecated and will be " + "removed in 1.0. Please use 'Run#output_ports' instead." lists, items = _ls_ports("out") items + lists end def get_output(output, refs=false) warn "[DEPRECATION] 'get_output' is deprecated and will be removed " + "in 1.0. Please use 'Run#output_port(port).values' instead." _get_output(output, refs) end def get_output_refs(output) warn "[DEPRECATION] 'get_output_refs' is deprecated and will be " + "removed in 1.0. Please use 'Run#output_port(port).data' instead." _get_output(output, true) end # :startdoc: # :call-seq: # expiry -> string # # Return the expiry time of this run as an instance of class Time. def expiry Time.parse(@server.read(links[:expiry], "text/plain", @credentials)) end # :call-seq: # expiry=(time) -> bool # # Set the expiry time of this run to _time_. _time_ should either be a Time # object or something that the Time class can parse. If the value given # does not specify a date then today's date will be assumed. If a time/date # in the past is specified, the expiry time will not be changed. def expiry=(time) unless time.instance_of? Time time = Time.parse(time) end # need to massage the xmlschema format slightly as the server cannot # parse timezone offsets with a colon (eg +00:00) date_str = time.xmlschema(2) date_str = date_str[0..-4] + date_str[-2..-1] @server.update(links[:expiry], date_str, "text/plain", @credentials) end # :call-seq: # workflow -> string # # Get the workflow that this run represents. def workflow if @workflow == "" @workflow = @server.read(links[:workflow], "application/xml", @credentials) end @workflow end # :call-seq: # status -> string # # Get the status of this run. Status can be one of :initialized, # :running or :finished. def status text_to_state(@server.read(links[:status], "text/plain", @credentials)) end # :call-seq: # start # # Start this run on the server. # # Raises RunStateError if the run is not in the :initialized state. def start state = status raise RunStateError.new(state, :initialized) if state != :initialized # set all the inputs _check_and_set_inputs unless baclava_input? @server.update(links[:status], state_to_text(:running), "text/plain", @credentials) end # :call-seq: # wait(check_interval = 1) # # Wait (block) for this run to finish. How often (in seconds) the run is # tested for completion can be specified with check_interval. # # Raises RunStateError if the run is still in the :initialised state. def wait(*params) state = status raise RunStateError.new(state, :running) if state == :initialized interval = 1 params.each do |param| case param when Hash warn "[DEPRECATION] 'Run#wait(params={})' is deprecated and will " + "be removed in 1.0. Please use Run#wait(check_interval) instead." interval = param[:interval] || 1 when Integer interval = param end end # wait until finished? sleep(interval) end end # :call-seq: # exitcode -> integer # # Get the return code of the run. Zero indicates success. def exitcode @server.read(links[:exitcode], "text/plain", @credentials).to_i end # :call-seq: # stdout -> string # # Get anything that the run printed to the standard out stream. def stdout @server.read(links[:stdout], "text/plain", @credentials) end # :call-seq: # stderr -> string # # Get anything that the run printed to the standard error stream. def stderr @server.read(links[:stderr], "text/plain", @credentials) end # :call-seq: # mkdir(dir) -> bool # # Create a directory in the run's working directory on the server. This # could be used to store input data. def mkdir(dir) dir = Util.strip_path_slashes(dir) @server.mkdir(links[:wdir], dir, @credentials) end # :call-seq: # upload_file(filename, params={}) -> string # # Upload a file, with name _filename_, to the server. Possible values that # can be passed in via _params_ are: # * :dir - The directory to upload to. If this is not left blank the # corresponding directory will need to have been created by Run#mkdir. # * :rename - Save the file on the server with a different name. # # The name of the file on the server is returned. def upload_file(filename, params={}) location = params[:dir] || "" uri = Util.append_to_uri_path(links[:wdir], location) rename = params[:rename] || "" file_uri = @server.upload_file(filename, uri, rename, @credentials) Util.get_path_leaf_from_uri(file_uri) end # :call-seq: # upload_data(data, remote_name, remote_directory = "") -> bool # # Upload data to the server and store it in remote_file. The # remote directory to put this file in can also be specified, but if it is # it must first have been created by a call to Run#mkdir. def upload_data(data, remote_name, remote_directory = "") location_uri = Util.append_to_uri_path(links[:wdir], remote_directory) @server.upload_data(data, remote_name, location_uri, @credentials) end # :stopdoc: def upload_input_file(input, filename, params={}) warn "[DEPRECATION] 'Run#upload_input_file' is deprecated and will be " + "removed in 1.0. Input ports are set directly instead. The most " + "direct replacement for this method is: " + "'Run#input_port(input).file = filename'" input_port(input).file = filename end # :startdoc: # :call-seq: # baclava_input=(filename) -> bool # # Use a baclava file for the workflow inputs. def baclava_input=(filename) state = status raise RunStateError.new(state, :initialized) if state != :initialized file = upload_file(filename) result = @server.update(links[:baclava], file, "text/plain", @credentials) @baclava_in = true if result result end # :stopdoc: def upload_baclava_input(filename) warn "[DEPRECATION] 'upload_baclava_input' is deprecated and will be " + "removed in 1.0. Please use 'Run#baclava_input=' instead." self.baclava_input = filename end def upload_baclava_file(filename) warn "[DEPRECATION] 'upload_baclava_file' is deprecated and will be " + "removed in 1.0. Please use 'Run#baclava_input=' instead." self.baclava_input = filename end # :startdoc: # :call-seq: # request_baclava_output -> bool # # Set the server to save the outputs of this run in baclava format. This # must be done before the run is started. def request_baclava_output return if @baclava_out state = status raise RunStateError.new(state, :initialized) if state != :initialized @baclava_out = @server.update(links[:output], BACLAVA_FILE, "text/plain", @credentials) end # :stopdoc: def set_baclava_output(name="") warn "[DEPRECATION] 'set_baclava_output' is deprecated and will be " + "removed in 1.0. Please use 'Run#request_baclava_output' instead." self.request_baclava_output end # :startdoc: # :call-seq: # baclava_input? -> bool # # Have the inputs to this run been set by a baclava document? def baclava_input? @baclava_in end # :call-seq: # baclava_output? -> bool # # Has this run been set to return results in baclava format? def baclava_output? @baclava_out end # :call-seq: # baclava_output -> string # # Get the outputs of this run in baclava format. This can only be done if # the output has been requested in baclava format by #set_baclava_output # before starting the run. def baclava_output state = status raise RunStateError.new(state, :finished) if state != :finished raise AccessForbiddenError.new("baclava output") if !@baclava_out baclava_uri = Util.append_to_uri_path(links[:wdir], BACLAVA_FILE) @server.read(baclava_uri, "*/*", @credentials) end # :stopdoc: def get_baclava_output warn "[DEPRECATION] 'get_baclava_output' is deprecated and will be " + "removed in 1.0. Please use 'Run#baclava_output' instead." baclava_output end # :startdoc: # :call-seq: # zip_output -> binary blob # # Get the working directory of this run directly from the server in zip # format. def zip_output state = status raise RunStateError.new(state, :finished) if state != :finished output_uri = Util.append_to_uri_path(links[:wdir], "out") @server.read(output_uri, "application/zip", @credentials) end # :call-seq: # initialized? -> bool # # Is this run in the :initialized state? def initialized? status == :initialized end # :call-seq: # running? -> bool # # Is this run in the :running state? def running? status == :running end # :call-seq: # finished? -> bool # # Is this run in the :finished state? def finished? status == :finished end # :call-seq: # create_time -> string # # Get the creation time of this run as an instance of class Time. def create_time Time.parse(@server.read(links[:createtime], "text/plain", @credentials)) end # :call-seq: # start_time -> string # # Get the start time of this run as an instance of class Time. def start_time Time.parse(@server.read(links[:starttime], "text/plain", @credentials)) end # :call-seq: # finish_time -> string # # Get the finish time of this run as an instance of class Time. def finish_time Time.parse(@server.read(links[:finishtime], "text/plain", @credentials)) end # :call-seq: # owner? -> bool # # Are the credentials being used to access this run those of the owner? # The owner of the run can give other users certain access rights to their # runs but only the owner can change these rights - or even see what they # are. Sometimes it is useful to know if the user accessing the run is # actually the owner of it or not. def owner? @credentials.username == owner end # :call-seq: # grant_permission(username, permission) -> username # # Grant the user the stated permission. A permission can be one of # :none, :read, :update or :destroy. # Only the owner of a run may grant permissions on it. +nil+ is returned # if a user other than the owner uses this method. def grant_permission(username, permission) return unless owner? value = XML::Fragments::PERM_UPDATE % [username, permission.to_s] @server.create(links[:sec_perms], value, "application/xml", @credentials) end # :call-seq: # permissions -> hash # # Return a hash (username => permission) of all the permissions set for # this run. Only the owner of a run may query its permissions. +nil+ is # returned if a user other than the owner uses this method. def permissions return unless owner? perms = {} doc = xml_document(@server.read(links[:sec_perms], "application/xml", @credentials)) xpath_find(doc, XPaths[:sec_perm]).each do |p| user = xml_node_content(xpath_first(p, XPaths[:sec_uname])) perm = xml_node_content(xpath_first(p, XPaths[:sec_uperm])).to_sym perms[user] = perm end perms end # :call-seq: # permission(username) -> permission # # Return the permission granted to the supplied username, if any. Only the # owner of a run may query its permissions. +nil+ is returned if a user # other than the owner uses this method. def permission(username) return unless owner? permissions[username] end # :call-seq: # revoke_permission(username) -> bool # # Revoke whatever permissions that have been granted to the user. Only the # owner of a run may revoke permissions on it. +nil+ is returned if a user # other than the owner uses this method. def revoke_permission(username) return unless owner? uri = Util.append_to_uri_path(links[:sec_perms], username) @server.delete(uri, @credentials) end # :call-seq: # add_password_credential(service_uri, username, password) -> URI # # Provide a username and password credential for the secure service at the # specified URI. The URI of the credential on the server is returned. Only # the owner of a run may supply credentials for it. +nil+ is returned if a # user other than the owner uses this method. def add_password_credential(uri, username, password) return unless owner? # Is this a new credential, or an update? cred_uri = credential(uri) # basic uri checks uri = _check_cred_uri(uri) cred = XML::Fragments::USERPASS_CRED % [uri, username, password] value = XML::Fragments::CREDENTIAL % cred if cred_uri.nil? @server.create(links[:sec_creds], value, "application/xml", @credentials) else @server.update(cred_uri, value, "application/xml", @credentials) end end # :call-seq: # add_keypair_credential(service_uri, filename, password, # alias = "Imported Certificate", type = :pkcs12) -> URI # # Provide a client certificate credential for the secure service at the # specified URI. You will need to provide the password to unlock the # private key. You will also need to provide the 'alias' or 'friendlyName' # of the key you wish to use if it differs from the default. The URI of the # credential on the server is returned. Only the owner of a run may supply # credentials for it. +nil+ is returned if a user other than the owner uses # this method. def add_keypair_credential(uri, filename, password, name = "Imported Certificate", type = :pkcs12) return unless owner? type = type.to_s.upcase contents = Base64.encode64(IO.read(filename)) # basic uri checks uri = _check_cred_uri(uri) cred = XML::Fragments::KEYPAIR_CRED % [uri, name, contents, type, password] value = XML::Fragments::CREDENTIAL % cred @server.create(links[:sec_creds], value, "application/xml", @credentials) end # :call-seq: # credentials -> Hash # # Return a hash (service_uri => credential_uri) of all the credentials # provided for this run. Only the owner of a run may query its credentials. # +nil+ is returned if a user other than the owner uses this method. def credentials return unless owner? creds = {} doc = xml_document(@server.read(links[:sec_creds], "application/xml", @credentials)) xpath_find(doc, XPaths[:sec_cred]).each do |c| uri = URI.parse(xml_node_content(xpath_first(c, XPaths[:sec_suri]))) cred_uri = URI.parse(xml_node_attribute(c, "href")) creds[uri] = cred_uri end creds end # :call-seq: # credential(service_uri) -> URI # # Return the URI of the credential set for the supplied service, if # any. Only the owner of a run may query its credentials. +nil+ is # returned if a user other than the owner uses this method. def credential(uri) return unless owner? credentials[uri] end # :call-seq: # delete_credential(service_uri) -> bool # # Delete the credential that has been provided for the specified service. # Only the owner of a run may delete its credentials. +nil+ is returned if # a user other than the owner uses this method. def delete_credential(uri) return unless owner? @server.delete(credentials[uri], @credentials) end # :call-seq: # delete_all_credentials -> bool # # Delete all credentials associated with this workflow run. Only the owner # of a run may delete its credentials. +nil+ is returned if a user other # than the owner uses this method. def delete_all_credentials return unless owner? @server.delete(links[:sec_creds], @credentials) end # :call-seq: # add_trust(filename, type = :x509) -> URI # # Add a trusted identity (server public key) to verify peers when using # https connections to Web Services. The URI of the trust on the server is # returned. Only the owner of a run may add a trust. +nil+ is returned if # a user other than the owner uses this method. def add_trust(filename, type = :x509) return unless owner? type = type.to_s.upcase contents = Base64.encode64(IO.read(filename)) value = XML::Fragments::TRUST % [contents, type] @server.create(links[:sec_trusts], value, "application/xml", @credentials) end # :call-seq: # trusts -> Array # # Return a list of all the URIs of trusts that have been registered for # this run. At present there is no way to differentiate between trusts # without noting the URI returned when originally uploaded. Only the owner # of a run may query its trusts. +nil+ is returned if a user other than the # owner uses this method. def trusts return unless owner? t_uris = [] doc = xml_document(@server.read(links[:sec_trusts], "application/xml", @credentials)) xpath_find(doc, XPaths[:sec_trust]). each do |t| t_uris << URI.parse(xml_node_attribute(t, "href")) end t_uris end # :call-seq: # delete_trust(URI) -> bool # # Delete the trust with the provided URI. Only the owner of a run may # delete its trusts. +nil+ is returned if a user other than the owner uses # this method. def delete_trust(uri) return unless owner? @server.delete(uri, @credentials) end # :call-seq: # delete_all_trusts -> bool # # Delete all trusted identities associated with this workflow run. Only # the owner of a run may delete its trusts. +nil+ is returned if a user # other than the owner uses this method. def delete_all_trusts return unless owner? @server.delete(links[:sec_trusts], @credentials) end # :stopdoc: # Outputs are represented as a directory structure with the eventual list # items (leaves) as files. This method (not part of the public API) # downloads a file from the run's working directory. def download_output_data(uri, range = nil) @server.read(uri, "application/octet-stream", range, @credentials) end # :startdoc: private def links @links = _get_run_links if @links.nil? @links end # Check each input to see if it requires a list input and call the # requisite upload method for the entire set of inputs. def _check_and_set_inputs lists = false input_ports.each_value do |port| if port.depth > 0 lists = true break end end lists ? _fake_lists : _set_all_inputs end # Set all the inputs on the server. The inputs must have been set prior to # this call using the InputPort API. def _set_all_inputs input_ports.each_value do |port| next unless port.set? if port.file? # If we're using a local file upload it first then set the port to # use a remote file. unless port.remote_file? file = upload_file(port.file) port.remote_file = file end xml_value = xml_text_node(port.file) uri = Util.append_to_uri_path(links[:inputs], "input/#{port.name}") @server.update(uri, XML::Fragments::RUNINPUTFILE % xml_value, "application/xml", @credentials) else xml_value = xml_text_node(port.value) uri = Util.append_to_uri_path(links[:inputs], "input/#{port.name}") @server.update(uri, XML::Fragments::RUNINPUTVALUE % xml_value, "application/xml", @credentials) end end end # Fake being able to handle lists as inputs by converting everything into # one big baclava document and uploading that. This has to be done for all # inputs or none at all. The inputs must have been set prior to this call # using the InputPort API. def _fake_lists data_map = {} input_ports.each_value do |port| next unless port.set? if port.file? unless port.remote_file? file = File.read(port.file) data_map[port.name] = Taverna::Baclava::Node.new(file) end else data_map[port.name] = Taverna::Baclava::Node.new(port.value) end end # Create and upload the baclava data. baclava = Taverna::Baclava::Writer.write(data_map) upload_data(baclava, "in.baclava") @server.update(links[:baclava], "in.baclava", "text/plain", @credentials) end # Check that the uri passed in is suitable for credential use: # * rserve uris must not have a path. # * http(s) uris must have at least "/" as their path. def _check_cred_uri(uri) u = URI(uri) case u.scheme when "rserve" u.path = "" when /https?/ u.path = "/" if u.path == "" end u.to_s end # List a directory in the run's workspace on the server. If dir is left # blank then / is listed. As there is no concept of changing into a # directory (cd) in Taverna Server then all paths passed into _ls_ports # should be full paths starting at "root". The contents of a directory are # returned as a list of two lists, "lists" and "values" respectively. def _ls_ports(dir="", top=true) dir = Util.strip_path_slashes(dir) uri = Util.append_to_uri_path(links[:wdir], dir) dir_list = @server.read(uri, "*/*", @credentials) # compile a list of directory entries stripping the # directory name from the front of each filename lists = [] values = [] doc = xml_document(dir_list) xpath_find(doc, XPaths[:dir]).each do |e| if top lists << xml_node_content(e).split('/')[-1] else index = (xml_node_attribute(e, 'name').to_i - 1) lists[index] = xml_node_content(e).split('/')[-1] end end xpath_find(doc, XPaths[:file]).each do |e| if top values << xml_node_content(e).split('/')[-1] else index = (xml_node_attribute(e, 'name').to_i - 1) values[index] = xml_node_content(e).split('/')[-1] end end [lists, values] end def _get_output(output, refs=false, top=true) output = Util.strip_path_slashes(output) # if at the top level we need to check if the port represents a list # or a singleton value if top lists, items = _ls_ports("out") if items.include? output if refs return "#{@server.uri}/rest/runs/#{@identifier}/" + "#{links[:wdir]}/out/#{output}" else out_uri = Util.append_to_uri_path(links[:wdir], "out/#{output}") return @server.read(out_uri, "application/octet-stream", @credentials) end end end # we're not at the top level so look at the contents of the output port lists, items = _ls_ports("out/#{output}", false) # build up lists of results result = [] # for each list recurse into it and add the items to the result lists.each do |list| result << _get_output("#{output}/#{list}", refs, false) end # for each item, add it to the output list items.each do |item| if refs result << "#{@server.uri}/rest/runs/#{@identifier}/" + "#{links[:wdir]}/out/#{output}/#{item}" else out_uri = Util.append_to_uri_path(links[:wdir], "out/#{output}/#{item}") result << @server.read(out_uri, "application/octet-stream", @credentials) end end result end def _get_input_port_info ports = {} port_desc = @server.read(links[:inputexp], "application/xml", @credentials) doc = xml_document(port_desc) xpath_find(doc, XPaths[:port_in]).each do |inp| port = InputPort.new(self, inp) ports[port.name] = port end ports end def _get_output_port_info ports = {} begin port_desc = @server.read(links[:output], "application/xml", @credentials) rescue AttributeNotFoundError => anfe return ports end doc = xml_document(port_desc) xpath_find(doc, XPaths[:port_out]).each do |out| port = OutputPort.new(self, out) ports[port.name] = port end ports end def _get_run_description if @run_doc.nil? @run_doc = xml_document(@server.read(@uri, "application/xml", @credentials)) end @run_doc end def _get_run_owner doc = _get_run_description xpath_attr(doc, XPaths[:run_desc], "owner") end def _get_run_links doc = _get_run_description # first parse out the basic stuff links = {} [:expiry, :workflow, :status, :createtime, :starttime, :finishtime, :wdir, :inputs, :output, :securectx, :listeners].each do |key| links[key] = URI.parse(xpath_attr(doc, XPaths[key], "href")) end # get inputs inputs = @server.read(links[:inputs], "application/xml",@credentials) doc = xml_document(inputs) links[:baclava] = URI.parse(xpath_attr(doc, XPaths[:baclava], "href")) links[:inputexp] = URI.parse(xpath_attr(doc, XPaths[:inputexp], "href")) # set io properties links[:io] = Util.append_to_uri_path(links[:listeners], "io") links[:stdout] = Util.append_to_uri_path(links[:io], "properties/stdout") links[:stderr] = Util.append_to_uri_path(links[:io], "properties/stderr") links[:exitcode] = Util.append_to_uri_path(links[:io], "properties/exitcode") # security properties - only available to the owner of a run if owner? securectx = @server.read(links[:securectx], "application/xml", @credentials) doc = xml_document(securectx) [:sec_creds, :sec_perms, :sec_trusts].each do |key| #links[key] = "#{links[:securectx]}/" + xpath_attr(doc, XPaths[key], # "href").split('/')[-1] links[key] = Util.append_to_uri_path(links[:securectx], xpath_attr(doc, XPaths[key], "href").split('/')[-1]) end end links end # :stopdoc: STATE2TEXT = { :initialized => "Initialized", :running => "Operating", :finished => "Finished", :stopped => "Stopped" } TEXT2STATE = { "Initialized" => :initialized, "Operating" => :running, "Finished" => :finished, "Stopped" => :stopped } # :startdoc: def state_to_text(state) STATE2TEXT[state.to_sym] end def text_to_state(text) TEXT2STATE[text] end end end