lib/t2-server/server.rb in t2-server-0.6.1 vs lib/t2-server/server.rb in t2-server-0.9.0

- old
+ new

@@ -1,6 +1,6 @@ -# Copyright (c) 2010, 2011 The University of Manchester, UK. +# Copyright (c) 2010-2012 The University of Manchester, UK. # # All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: @@ -12,11 +12,11 @@ # this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. # # * Neither the names of The University of Manchester nor the names of its # contributors may be used to endorse or promote products derived from this -# software without specific prior written permission. +# software without specific prior written permission. # # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE @@ -28,461 +28,457 @@ # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. # # Author: Robert Haines -require 'rubygems' require 'base64' require 'uri' -require 'net/https' -require 'libxml' module T2Server # An interface for directly communicating with one or more Taverna 2 Server # instances. class Server - include LibXML + include XML::Methods - private_class_method :new + # The version of the remote Taverna Server instance. + attr_reader :version - # The URI of this server instance. - attr_reader :uri - - # The maximum number of runs that this server will allow at any one time. - # Runs in any state (+Initialized+, +Running+ and +Finished+) are counted - # against this maximum. - attr_reader :run_limit - - # list of servers we know about - @@servers = [] - # :stopdoc: - # New is private but rdoc does not get it right! Hence :stopdoc: section. - def initialize(uri, username, password) - @uri = uri - @host = @uri.host - @port = @uri.port - @base_path = @uri.path - @rest_path = @uri.path + "/rest" + XPaths = { + # Server top-level XPath queries + :server => XML::Methods.xpath_compile("//nsr:serverDescription"), + :policy => XML::Methods.xpath_compile("//nsr:policy"), + :run => XML::Methods.xpath_compile("//nsr:run"), + :runs => XML::Methods.xpath_compile("//nsr:runs"), - # set up http connection - @http = Net::HTTP.new(@host, @port) + # Server policy XPath queries + :runlimit => XML::Methods.xpath_compile("//nsr:runLimit"), + :permwkf => XML::Methods.xpath_compile("//nsr:permittedWorkflows"), + :permlstn => XML::Methods.xpath_compile("//nsr:permittedListeners"), + :permlstt => XML::Methods.xpath_compile("//nsr:permittedListenerTypes"), + :notify => + XML::Methods.xpath_compile("//nsr:enabledNotificationFabrics") + } + # :startdoc: - # use ssl? - @ssl = uri.scheme == "https" - if ssl? - @username = username - @password = password - - @http.use_ssl = true - @http.verify_mode = OpenSSL::SSL::VERIFY_NONE + # :call-seq: + # new(uri, connection_parameters = nil) -> Server + # new(uri, connection_parameters = nil) {|self| ...} + # + # Create a new Server instance that represents the real server at _uri_. + # If _connection_parameters_ are supplied they will be used to set up the + # network connection to the server. + # + # It will _yield_ itself if a block is given. + def initialize(uri, params = nil) + # we want to use URIs here but strings can be passed in + unless uri.is_a? URI + uri = URI.parse(Util.strip_path_slashes(uri)) end + # strip username and password from the URI if present + if uri.user != nil + uri = URI::HTTP.new(uri.scheme, nil, uri.host, uri.port, nil, + uri.path, nil, nil, nil); + end + + # setup connection + @connection = ConnectionFactory.connect(uri, params) + # add a slash to the end of this address to work around this bug: # http://www.mygrid.org.uk/dev/issues/browse/TAVSERV-113 - @links = parse_description(get_attribute("#{@rest_path}/")) - #@links.each {|key, val| puts "#{key}: #{val}"} - - # get max runs - @run_limit = get_attribute(@links[:runlimit]).to_i - - # initialise run list + server_description = xml_document(get_attribute("#{uri.path}/rest/", + "application/xml")) + @version = get_version(server_description) + @links = get_description(server_description) + @links[:admin] = "#{uri.path}/admin" + + # initialize run object cache @runs = {} - @runs = get_runs + + yield(self) if block_given? end + + # :stopdoc: + def Server.connect(uri, username="", password="") + warn "[DEPRECATION] 'Server#connect' is deprecated and will be " + + "removed in 1.0." + new(uri) + end # :startdoc: # :call-seq: - # Server.connect(uri, username="", password="") -> server + # administrator(credentials = nil) -> Administrator + # administrator(credentials = nil) {|admin| ...} # - # Connect to the server specified by _uri_ which should be of the form: - # http://example.com:8888/blah or https://user:pass@example.com:8888/blah - # - # The username and password can also be passed in separately. - # A Server instance is returned that represents the connection. - def Server.connect(uri, username="", password="") - # we want to use URIs here but strings can be passed in - if !uri.instance_of? URI - uri = URI.parse(uri.strip_path); - end - - # strip username and password from the URI if present - username = uri.user || username - password = uri.password || password - new_uri = URI::HTTP.new(uri.scheme, nil, uri.host, uri.port, nil, - uri.path, nil, nil, nil); - - # see if we've already got this server - server = @@servers.find {|s| s.uri == new_uri} + # Return an instance of the Taverna Server administrator interface. This + # method will _yield_ the newly created administrator if a block is given. + def administrator(credentials = nil) + admin = Administrator.new(self, credentials) - if !server - # no, so create new one and return it - server = new(new_uri, username, password) - @@servers << server - end - - server + yield(admin) if block_given? + admin end # :call-seq: - # server.create_run(workflow) -> run + # create_run(workflow, credentials = nil) -> run + # create_run(workflow, credentials = nil) {|run| ...} # # Create a run on this server using the specified _workflow_. - def create_run(workflow) - uuid = initialize_run(workflow) - @runs[uuid] = Run.create(self, "", uuid) + # This method will _yield_ the newly created Run if a block is given. + def create_run(workflow, credentials = nil) + id = initialize_run(workflow, credentials) + run = Run.create(self, "", credentials, id) + + # cache newly created run object - this must be done per user + user = credentials.nil? ? :all : credentials.username + @runs[user] = {} unless @runs[user] + @runs[user][id] = run + + yield(run) if block_given? + run end # :call-seq: - # server.initialize_run(workflow) -> string + # initialize_run(workflow, credentials = nil) -> string # # Create a run on this server using the specified _workflow_ but do not - # return it as a Run instance. Return its UUID instead. - def initialize_run(workflow) - request = Net::HTTP::Post.new("#{@links[:runs]}") - request.content_type = "application/xml" - if ssl? - request.basic_auth @username, @password - end - begin - response = @http.request(request, Fragments::WORKFLOW % workflow) - rescue InternalHTTPError => e - raise ConnectionError.new(e) - end - - case response - when Net::HTTPCreated - # return the uuid of the newly created run - epr = URI.parse(response['location']) - epr.path[-36..-1] - when Net::HTTPForbidden - raise ServerAtCapacityError.new(@run_limit) - when Net::HTTPUnauthorized - raise AuthorizationError.new(@username) - else - raise UnexpectedServerResponse.new(response) - end + # return it as a Run instance. Return its identifier instead. + def initialize_run(workflow, credentials = nil) + # set up the run object cache - this must be done per user + user = credentials.nil? ? :all : credentials.username + @runs[user] = {} unless @runs[user] + + @connection.POST_run("#{@links[:runs]}", + XML::Fragments::WORKFLOW % workflow, credentials) end # :call-seq: - # server.ssl? -> bool + # uri -> URI # - # Is this server using SSL? - def ssl? - @ssl + # The URI of the connection to the remote Taverna Server. + def uri + @connection.uri end # :call-seq: - # server.runs -> [runs] + # run_limit(credentials = nil) -> num # + # The maximum number of runs that this server will allow at any one time. + # Runs in any state (+Initialized+, +Running+ and +Finished+) are counted + # against this maximum. + def run_limit(credentials = nil) + get_attribute(@links[:runlimit], "text/plain", credentials).to_i + end + + # :call-seq: + # runs(credentials = nil) -> [runs] + # # Return the set of runs on this server. - def runs - get_runs.values + def runs(credentials = nil) + get_runs(credentials).values end # :call-seq: - # server.run(uuid) -> run + # run(identifier, credentials = nil) -> run # # Return the specified run. - def run(uuid) - get_runs[uuid] + def run(identifier, credentials = nil) + get_runs(credentials)[identifier] end # :call-seq: - # server.delete_run(run) -> bool + # delete_run(run, credentials = nil) -> bool # # Delete the specified run from the server, discarding all of its state. - # _run_ can be either a Run instance or a UUID. - def delete_run(run) - # get the uuid from the run if that is what is passed in + # _run_ can be either a Run instance or a identifier. + def delete_run(run, credentials = nil) + # get the identifier from the run if that is what is passed in if run.instance_of? Run - run = run.uuid + run = run.identifier end - request = Net::HTTP::Delete.new("#{@links[:runs]}/#{run}") - if ssl? - request.basic_auth @username, @password - end - begin - response = @http.request(request) - rescue InternalHTTPError => e - raise ConnectionError.new(e) - end - - case response - when Net::HTTPNoContent - # Success, carry on... - @runs.delete(run) + if delete_attribute("#{@links[:runs]}/#{run}", credentials) + # delete cached run object - this must be done per user + user = credentials.nil? ? :all : credentials.username + @runs[user].delete(run) if @runs[user] true - when Net::HTTPNotFound - raise RunNotFoundError.new(run) - when Net::HTTPForbidden - raise AccessForbiddenError.new("run #{run}") - when Net::HTTPUnauthorized - raise AuthorizationError.new(@username) - else - raise UnexpectedServerResponse.new(response) end end # :call-seq: - # server.delete_all_runs + # delete_all_runs(credentials = nil) # # Delete all runs on this server, discarding all of their state. - def delete_all_runs + def delete_all_runs(credentials = nil) # first refresh run list - runs.each {|run| run.delete} + runs(credentials).each {|run| run.delete} end - # :call-seq: - # server.set_run_input(run, input, value) -> bool - # - # Set the workflow input port _input_ on run _run_ to _value_. _run_ can - # be either a Run instance or a UUID. - def set_run_input(run, input, value) - # get the run from the uuid if that is what is passed in + # :stopdoc: + def set_run_input(run, input, value, credentials = nil) + warn "[DEPRECATION] 'Server#set_run_input' is deprecated and will be " + + "removed in 1.0. Input ports are set directly instead. The most " + + "direct replacement for this method is: " + + "'Run#input_port(input).value = value'" + + # get the run from the identifier if that is what is passed in if not run.instance_of? Run - run = run(run) + run = run(run, credentials) end - xml_value = XML::Node.new_text(value) - path = "#{@links[:runs]}/#{run.uuid}/#{run.inputs}/input/#{input}" - set_attribute(path, Fragments::RUNINPUTVALUE % xml_value, "application/xml") - rescue AttributeNotFoundError => e - if get_runs.has_key? run.uuid - raise e - else - raise RunNotFoundError.new(run.uuid) - end + run.input_port(input).value = value end - # :call-seq: - # server.set_run_input_file(run, input, filename) -> bool - # - # Set the workflow input port _input_ on run _run_ to use the file at - # _filename_ for its input. _run_ can be either a Run instance or a UUID. - def set_run_input_file(run, input, filename) - # get the run from the uuid if that is what is passed in + def set_run_input_file(run, input, filename, credentials = nil) + warn "[DEPRECATION] 'Server#set_run_input_file' is deprecated and " + + "will be removed in 1.0. Input ports are set directly instead. The " + + "most direct replacement for this method is: " + + "'Run#input_port(input).remote_file = filename'" + + # get the run from the identifier if that is what is passed in if not run.instance_of? Run - run = run(run) + run = run(run, credentials) end - xml_value = XML::Node.new_text(filename) - path = "#{@links[:runs]}/#{run.uuid}/#{run.inputs}/input/#{input}" - set_attribute(path, Fragments::RUNINPUTFILE % xml_value, "application/xml") - rescue AttributeNotFoundError => e - if get_runs.has_key? run.uuid - raise e - else - raise RunNotFoundError.new(run.uuid) - end + run.input_port(input).remote_file = filename end - # :call-seq: - # server.make_run_dir(run, root, dir) -> bool - # - # Create a directory _dir_ within the directory _root_ on _run_. _run_ can - # be either a Run instance or a UUID. This is mainly for use by Run#mkdir. - def make_run_dir(run, root, dir) - # get the uuid from the run if that is what is passed in + def create_dir(run, root, dir, credentials = nil) + # get the identifier from the run if that is what is passed in if run.instance_of? Run - run = run.uuid + run = run.identifier end raise AccessForbiddenError.new("subdirectories (#{dir})") if dir.include? ?/ - request = Net::HTTP::Post.new("#{@links[:runs]}/#{run}/#{root}") - request.content_type = "application/xml" - if ssl? - request.basic_auth @username, @password + @connection.POST_dir("#{@links[:runs]}/#{run}/#{root}", + XML::Fragments::MKDIR % dir, run, dir, credentials) + end + + def make_run_dir(run, root, dir, credentials = nil) + warn "[DEPRECATION] 'Server#make_run_dir' is deprecated and will be " + + "removed in 1.0. Please use 'Run#mkdir' instead." + + create_dir(run, root, dir, credentials) + end + + def upload_file(run, filename, location, rename, credentials = nil) + contents = IO.read(filename) + rename = filename.split('/')[-1] if rename == "" + + if upload_data(run, contents, rename, location, credentials) + rename end - begin - response = @http.request(request, Fragments::MKDIR % dir) - rescue InternalHTTPError => e - raise ConnectionError.new(e) - end + end - case response - when Net::HTTPCreated - # OK, carry on... - true - when Net::HTTPNotFound - raise RunNotFoundError.new(run) - when Net::HTTPForbidden - raise AccessForbiddenError.new("#{dir} on run #{run}") - when Net::HTTPUnauthorized - raise AuthorizationError.new(@username) - else - raise UnexpectedServerResponse.new(response) + def upload_data(run, data, remote_name, location, credentials = nil) + # get the identifier from the run if that is what is passed in + if run.instance_of? Run + run = run.identifier end + + contents = Base64.encode64(data) + + @connection.POST_file("#{@links[:runs]}/#{run}/#{location}", + XML::Fragments::UPLOAD % [remote_name, contents], run, credentials) end - # :call-seq: - # server.upload_run_file(run, filename, location, rename) -> string - # - # Upload a file to _run_. _run_ can be either a Run instance or a UUID. - # Mainly for internal use by Run#upload_file. - def upload_run_file(run, filename, location, rename) - # get the uuid from the run if that is what is passed in + def upload_run_file(run, filename, location, rename, credentials = nil) + warn "[DEPRECATION] 'Server#upload_run_file' is deprecated and will " + + "be removed in 1.0. Please use 'Run#upload_file' or " + + "'Run#input_port(input).file = filename' instead." + + upload_file(run, filename, location, rename, credentials) + end + + def create_run_attribute(run, path, value, type, credentials = nil) + # get the identifier from the run if that is what is passed in if run.instance_of? Run - run = run.uuid + run = run.identifier end - contents = Base64.encode64(IO.read(filename)) - rename = filename.split('/')[-1] if rename == "" - request = Net::HTTP::Post.new("#{@links[:runs]}/#{run}/#{location}") - request.content_type = "application/xml" - if ssl? - request.basic_auth @username, @password + create_attribute("#{@links[:runs]}/#{run}/#{path}", value, type, + credentials) + rescue AttributeNotFoundError => e + if get_runs(credentials).has_key? run + raise e + else + raise RunNotFoundError.new(run) end - begin - response = @http.request(request, Fragments::UPLOAD % [rename, contents]) - rescue InternalHTTPError => e - raise ConnectionError.new(e) + end + + def get_run_attribute(run, path, type, credentials = nil) + # get the identifier from the run if that is what is passed in + if run.instance_of? Run + run = run.identifier end - case response - when Net::HTTPCreated - # Success, return remote name of uploaded file - rename - when Net::HTTPNotFound - raise RunNotFoundError.new(run) - when Net::HTTPForbidden - raise AccessForbiddenError.new("run #{run}") - when Net::HTTPUnauthorized - raise AuthorizationError.new(@username) + get_attribute("#{@links[:runs]}/#{run}/#{path}", type, credentials) + rescue AttributeNotFoundError => e + if get_runs(credentials).has_key? run + raise e else - raise UnexpectedServerResponse.new(response) + raise RunNotFoundError.new(run) end end - # :call-seq: - # server.get_run_attribute(run, path) -> string - # - # Get the attribute at _path_ in _run_. _run_ can be either a Run instance - # or a UUID. - def get_run_attribute(run, path) - # get the uuid from the run if that is what is passed in + def set_run_attribute(run, path, value, type, credentials = nil) + # get the identifier from the run if that is what is passed in if run.instance_of? Run - run = run.uuid + run = run.identifier end - get_attribute("#{@links[:runs]}/#{run}/#{path}") + set_attribute("#{@links[:runs]}/#{run}/#{path}", value, type, + credentials) rescue AttributeNotFoundError => e - if get_runs.has_key? run + if get_runs(credentials).has_key? run raise e else raise RunNotFoundError.new(run) end end - # :call-seq: - # server.set_run_attribute(run, path, value) -> bool - # - # Set the attribute at _path_ in _run_ to _value_. _run_ can be either a - # Run instance or a UUID. - def set_run_attribute(run, path, value) - # get the uuid from the run if that is what is passed in + def delete_run_attribute(run, path, credentials = nil) + # get the identifier from the run if that is what is passed in if run.instance_of? Run - run = run.uuid + run = run.identifier end - set_attribute("#{@links[:runs]}/#{run}/#{path}", value, "text/plain") + delete_attribute("#{@links[:runs]}/#{run}/#{path}", credentials) rescue AttributeNotFoundError => e - if get_runs.has_key? run + if get_runs(credentials).has_key? run raise e else raise RunNotFoundError.new(run) end end - private - def get_attribute(path) - request = Net::HTTP::Get.new(path) - if ssl? - request.basic_auth @username, @password + def download_run_file(run, path, range, credentials = nil) + # get the identifier from the run if that is what is passed in + if run.instance_of? Run + run = run.identifier end - begin - response = @http.request(request) - rescue InternalHTTPError => e - raise ConnectionError.new(e) - end - case response - when Net::HTTPOK - return response.body - when Net::HTTPNotFound - raise AttributeNotFoundError.new(path) - when Net::HTTPForbidden - raise AccessForbiddenError.new("attribute #{path}") - when Net::HTTPUnauthorized - raise AuthorizationError.new(@username) + get_attribute("#{@links[:runs]}/#{run}/#{path}", + "application/octet-stream", range, credentials) + rescue AttributeNotFoundError => e + if get_runs(credentials).has_key? run + raise e else - raise UnexpectedServerResponse.new(response) + raise RunNotFoundError.new(run) end end - def set_attribute(path, value, type) - request = Net::HTTP::Put.new(path) - request.content_type = type - if ssl? - request.basic_auth @username, @password + def get_admin_attribute(path, credentials = nil) + get_attribute("#{@links[:admin]}/#{path}", "*/*", credentials) + end + + def set_admin_attribute(path, value, credentials = nil) + set_attribute("#{@links[:admin]}/#{path}", value, "text/plain", + credentials) + end + + def admin_resource_writable?(path, credentials = nil) + headers = @connection.OPTIONS("#{@links[:admin]}/#{path}", credentials) + headers["allow"][0].split(",").include? "PUT" + end + # :startdoc: + + private + def create_attribute(path, value, type, credentials = nil) + @connection.POST(path, value, type, credentials) + end + + def get_attribute(path, type, *rest) + credentials = nil + range = nil + + rest.each do |param| + case param + when HttpCredentials + credentials = param + when Range + range = param + when Array + range = param[0]..param[1] + end end + begin - response = @http.request(request, value) - rescue InternalHTTPError => e - raise ConnectionError.new(e) + @connection.GET(path, type, range, credentials) + rescue ConnectionRedirectError => cre + @connection = cre.redirect + retry end + end - case response - when Net::HTTPOK - # OK, so carry on - true - when Net::HTTPNotFound - raise AttributeNotFoundError.new(path) - when Net::HTTPForbidden - raise AccessForbiddenError.new("attribute #{path}") - when Net::HTTPUnauthorized - raise AuthorizationError.new(@username) + def set_attribute(path, value, type, credentials = nil) + @connection.PUT(path, value, type, credentials) + end + + def delete_attribute(path, credentials = nil) + @connection.DELETE(path, credentials) + end + + def get_version(doc) + version = xpath_attr(doc, XPaths[:server], "serverVersion") + if version == nil + raise RuntimeError.new("Taverna Servers prior to version 2.3 " + + "are no longer supported.") else - raise UnexpectedServerResponse.new(response) + return version.to_f end end - def parse_description(desc) - doc = XML::Document.string(desc) - nsmap = Namespaces::MAP - { - :runs => URI.parse(doc.find_first(XPaths::RUNS, nsmap).attributes["href"]).path, - :runlimit => URI.parse(doc.find_first(XPaths::RUNLIMIT, nsmap).attributes["href"]).path, - :permworkflows => URI.parse(doc.find_first(XPaths::PERMWKF, nsmap).attributes["href"]).path, - :permlisteners => URI.parse(doc.find_first(XPaths::PERMLSTN, nsmap).attributes["href"]).path - } + def get_description(doc) + links = {} + links[:runs] = URI.parse(xpath_attr(doc, XPaths[:runs], "href")).path + + links[:policy] = URI.parse(xpath_attr(doc, XPaths[:policy], "href")).path + doc = xml_document(get_attribute(links[:policy], "application/xml")) + + links[:permlisteners] = + URI.parse(xpath_attr(doc, XPaths[:permlstt], "href")).path + links[:notifications] = + URI.parse(xpath_attr(doc, XPaths[:notify], "href")).path + + links[:runlimit] = + URI.parse(xpath_attr(doc, XPaths[:runlimit], "href")).path + links[:permworkflows] = + URI.parse(xpath_attr(doc, XPaths[:permwkf], "href")).path + + links end - def get_runs - run_list = get_attribute("#{@links[:runs]}") + def get_runs(credentials = nil) + run_list = get_attribute("#{@links[:runs]}", "application/xml", + credentials) - doc = XML::Document.string(run_list) + doc = xml_document(run_list) - # get list of run uuids - uuids = [] - doc.find(XPaths::RUN, Namespaces::MAP).each do |run| - uuids << run.attributes["href"].split('/')[-1] + # get list of run identifiers + ids = [] + xpath_find(doc, XPaths[:run]).each do |run| + ids << xml_node_attribute(run, "href").split('/')[-1] end + # cache run objects - this must be done per user + user = credentials.nil? ? :all : credentials.username + @runs[user] = {} unless @runs[user] + # add new runs - uuids.each do |uuid| - if !@runs.has_key? uuid - @runs[uuid] = Run.create(self, "", uuid) + ids.each do |id| + if !@runs[user].has_key? id + @runs[user][id] = Run.create(self, "", credentials, id) end end # clear out the expired runs - if @runs.length > @run_limit - @runs.delete_if {|key, val| !uuids.member? key} + if @runs[user].length > ids.length + @runs[user].delete_if {|key, val| !ids.member? key} end - @runs + @runs[user] end - end + end end