lib/t2-server/server.rb in t2-server-0.9.1 vs lib/t2-server/server.rb in t2-server-0.9.2

- old
+ new

@@ -38,14 +38,15 @@ # An interface for directly communicating with one or more Taverna 2 Server # instances. class Server include XML::Methods - # The version of the remote Taverna Server instance. - attr_reader :version - # :stopdoc: + # Internal references to the main rest and admin top-level resource + # endpoints. + REST_ENDPOINT = "rest/" + XPaths = { # Server top-level XPath queries :server => XML::Methods.xpath_compile("//nsr:serverDescription"), :policy => XML::Methods.xpath_compile("//nsr:policy"), :run => XML::Methods.xpath_compile("//nsr:run"), @@ -83,17 +84,16 @@ end # setup connection @connection = ConnectionFactory.connect(uri, params) - # add a slash to the end of this address to work around this bug: - # http://www.mygrid.org.uk/dev/issues/browse/TAVSERV-113 - server_description = xml_document(get_attribute("#{uri.path}/rest/", - "application/xml")) - @version = get_version(server_description) - @links = get_description(server_description) - @links[:admin] = "#{uri.path}/admin" + # The following four fields hold cached data about the server that is + # only downloaded the first time it is requested. + @server_doc = nil + @version = nil + @version_components = nil + @links = nil # initialize run object cache @runs = {} yield(self) if block_given? @@ -147,15 +147,41 @@ def initialize_run(workflow, credentials = nil) # set up the run object cache - this must be done per user user = credentials.nil? ? :all : credentials.username @runs[user] = {} unless @runs[user] - @connection.POST_run("#{@links[:runs]}", - XML::Fragments::WORKFLOW % workflow, credentials) + @connection.POST(links[:runs], workflow, + "application/vnd.taverna.t2flow+xml", credentials) end # :call-seq: + # version -> String + # + # The version string of the remote Taverna Server. + def version + if @version.nil? + @version = _get_version + end + + @version + end + + # :call-seq: + # version_components -> Array + # + # An array of the major, minor and patch version components of the remote + # Taverna Server. + def version_components + if @version_components.nil? + comps = version.split(".") + @version_components = comps.map { |v| v.to_i } + end + + @version_components + end + + # :call-seq: # uri -> URI # # The URI of the connection to the remote Taverna Server. def uri @connection.uri @@ -166,11 +192,11 @@ # # The maximum number of runs that this server will allow at any one time. # Runs in any state (+Initialized+, +Running+ and +Finished+) are counted # against this maximum. def run_limit(credentials = nil) - get_attribute(@links[:runlimit], "text/plain", credentials).to_i + read(links[:runlimit], "text/plain", credentials).to_i end # :call-seq: # runs(credentials = nil) -> [runs] # @@ -185,33 +211,36 @@ # Return the specified run. def run(identifier, credentials = nil) get_runs(credentials)[identifier] end - # :call-seq: - # delete_run(run, credentials = nil) -> bool - # - # Delete the specified run from the server, discarding all of its state. - # _run_ can be either a Run instance or a identifier. + # :stopdoc: def delete_run(run, credentials = nil) + warn "[DEPRECATION] 'Server#delete_run' is deprecated and will be " + + "removed in 1.0. Please use 'Run#delete' to delete a run." + # get the identifier from the run if that is what is passed in if run.instance_of? Run run = run.identifier end - if delete_attribute("#{@links[:runs]}/#{run}", credentials) + run_uri = Util.append_to_uri_path(links[:runs], run) + if delete(run_uri, credentials) # delete cached run object - this must be done per user user = credentials.nil? ? :all : credentials.username @runs[user].delete(run) if @runs[user] true end end + # :startdoc: # :call-seq: # delete_all_runs(credentials = nil) # - # Delete all runs on this server, discarding all of their state. + # Delete all runs on this server, discarding all of their state. Note that + # only those runs that the provided credentials have permission to delete + # will be deleted. def delete_all_runs(credentials = nil) # first refresh run list runs(credentials).each {|run| run.delete} end @@ -242,156 +271,62 @@ end run.input_port(input).remote_file = filename end - def create_dir(run, root, dir, credentials = nil) - # get the identifier from the run if that is what is passed in - if run.instance_of? Run - run = run.identifier - end - - raise AccessForbiddenError.new("subdirectories (#{dir})") if dir.include? ?/ - @connection.POST_dir("#{@links[:runs]}/#{run}/#{root}", - XML::Fragments::MKDIR % dir, run, dir, credentials) + def mkdir(uri, dir, credentials = nil) + @connection.POST(uri, XML::Fragments::MKDIR % dir, "application/xml", + credentials) end def make_run_dir(run, root, dir, credentials = nil) warn "[DEPRECATION] 'Server#make_run_dir' is deprecated and will be " + "removed in 1.0. Please use 'Run#mkdir' instead." create_dir(run, root, dir, credentials) end - def upload_file(run, filename, location, rename, credentials = nil) + def upload_file(filename, uri, remote_name, credentials = nil) contents = IO.read(filename) - rename = filename.split('/')[-1] if rename == "" + remote_name = filename.split('/')[-1] if remote_name == "" - if upload_data(run, contents, rename, location, credentials) - rename - end + upload_data(contents, remote_name, uri, credentials) end - def upload_data(run, data, remote_name, location, credentials = nil) - # get the identifier from the run if that is what is passed in - if run.instance_of? Run - run = run.identifier - end + def upload_data(data, remote_name, uri, credentials = nil) + # Different Server versions support different upload methods + (major, minor, patch) = version_components - contents = Base64.encode64(data) - - @connection.POST_file("#{@links[:runs]}/#{run}/#{location}", - XML::Fragments::UPLOAD % [remote_name, contents], run, credentials) + if minor == 4 && patch >= 1 + put_uri = Util.append_to_uri_path(uri, remote_name) + @connection.PUT(put_uri, data, "application/octet-stream", credentials) + else + contents = Base64.encode64(data) + @connection.POST(uri, + XML::Fragments::UPLOAD % [remote_name, contents], "application/xml", + credentials) + end end def upload_run_file(run, filename, location, rename, credentials = nil) warn "[DEPRECATION] 'Server#upload_run_file' is deprecated and will " + "be removed in 1.0. Please use 'Run#upload_file' or " + "'Run#input_port(input).file = filename' instead." upload_file(run, filename, location, rename, credentials) end - def create_run_attribute(run, path, value, type, credentials = nil) - # get the identifier from the run if that is what is passed in - if run.instance_of? Run - run = run.identifier - end - - create_attribute("#{@links[:runs]}/#{run}/#{path}", value, type, - credentials) - rescue AttributeNotFoundError => e - if get_runs(credentials).has_key? run - raise e - else - raise RunNotFoundError.new(run) - end - end - - def get_run_attribute(run, path, type, credentials = nil) - # get the identifier from the run if that is what is passed in - if run.instance_of? Run - run = run.identifier - end - - get_attribute("#{@links[:runs]}/#{run}/#{path}", type, credentials) - rescue AttributeNotFoundError => e - if get_runs(credentials).has_key? run - raise e - else - raise RunNotFoundError.new(run) - end - end - - def set_run_attribute(run, path, value, type, credentials = nil) - # get the identifier from the run if that is what is passed in - if run.instance_of? Run - run = run.identifier - end - - set_attribute("#{@links[:runs]}/#{run}/#{path}", value, type, - credentials) - rescue AttributeNotFoundError => e - if get_runs(credentials).has_key? run - raise e - else - raise RunNotFoundError.new(run) - end - end - - def delete_run_attribute(run, path, credentials = nil) - # get the identifier from the run if that is what is passed in - if run.instance_of? Run - run = run.identifier - end - - delete_attribute("#{@links[:runs]}/#{run}/#{path}", credentials) - rescue AttributeNotFoundError => e - if get_runs(credentials).has_key? run - raise e - else - raise RunNotFoundError.new(run) - end - end - - def download_run_file(run, path, range, credentials = nil) - # get the identifier from the run if that is what is passed in - if run.instance_of? Run - run = run.identifier - end - - get_attribute("#{@links[:runs]}/#{run}/#{path}", - "application/octet-stream", range, credentials) - rescue AttributeNotFoundError => e - if get_runs(credentials).has_key? run - raise e - else - raise RunNotFoundError.new(run) - end - end - - def get_admin_attribute(path, credentials = nil) - get_attribute("#{@links[:admin]}/#{path}", "*/*", credentials) - end - - def set_admin_attribute(path, value, credentials = nil) - set_attribute("#{@links[:admin]}/#{path}", value, "text/plain", - credentials) - end - - def admin_resource_writable?(path, credentials = nil) - headers = @connection.OPTIONS("#{@links[:admin]}/#{path}", credentials) + def is_resource_writable?(uri, credentials = nil) + headers = @connection.OPTIONS(uri, credentials) headers["allow"][0].split(",").include? "PUT" end - # :startdoc: - private - def create_attribute(path, value, type, credentials = nil) - @connection.POST(path, value, type, credentials) + def create(uri, value, type, credentials = nil) + @connection.POST(uri, value, type, credentials) end - def get_attribute(path, type, *rest) + def read(uri, type, *rest) credentials = nil range = nil rest.each do |param| case param @@ -403,80 +338,114 @@ range = param[0]..param[1] end end begin - @connection.GET(path, type, range, credentials) + @connection.GET(uri, type, range, credentials) rescue ConnectionRedirectError => cre + # We've been redirected so save the new connection object with the new + # server URI and try again with the new URI. @connection = cre.redirect + uri = Util.replace_uri_path(@connection.uri, uri.path) retry end end - def set_attribute(path, value, type, credentials = nil) - @connection.PUT(path, value, type, credentials) + def update(uri, value, type, credentials = nil) + @connection.PUT(uri, value, type, credentials) end - def delete_attribute(path, credentials = nil) - @connection.DELETE(path, credentials) + def delete(uri, credentials = nil) + @connection.DELETE(uri, credentials) end + # :startdoc: - def get_version(doc) + private + + def links + @links = _get_server_links if @links.nil? + + @links + end + + def _get_server_description + if @server_doc.nil? + rest_uri = Util.append_to_uri_path(uri, REST_ENDPOINT) + @server_doc = xml_document(read(rest_uri, "application/xml")) + end + + @server_doc + end + + def _get_version + doc = _get_server_description version = xpath_attr(doc, XPaths[:server], "serverVersion") if version == nil raise RuntimeError.new("Taverna Servers prior to version 2.3 " + "are no longer supported.") else - return version.to_f + # Remove SNAPSHOT tag if it's there. + if version.end_with? "-SNAPSHOT" + version.gsub!("-SNAPSHOT", "") + end + + # Add .0 if we only have a major and minor component. + if version.split(".").length == 2 + version += ".0" + end + + return version end end - def get_description(doc) + def _get_server_links + doc = _get_server_description links = {} - links[:runs] = URI.parse(xpath_attr(doc, XPaths[:runs], "href")).path + links[:runs] = URI.parse(xpath_attr(doc, XPaths[:runs], "href")) - links[:policy] = URI.parse(xpath_attr(doc, XPaths[:policy], "href")).path - doc = xml_document(get_attribute(links[:policy], "application/xml")) + links[:policy] = URI.parse(xpath_attr(doc, XPaths[:policy], "href")) + doc = xml_document(read(links[:policy], "application/xml")) links[:permlisteners] = - URI.parse(xpath_attr(doc, XPaths[:permlstt], "href")).path + URI.parse(xpath_attr(doc, XPaths[:permlstt], "href")) links[:notifications] = - URI.parse(xpath_attr(doc, XPaths[:notify], "href")).path + URI.parse(xpath_attr(doc, XPaths[:notify], "href")) links[:runlimit] = - URI.parse(xpath_attr(doc, XPaths[:runlimit], "href")).path + URI.parse(xpath_attr(doc, XPaths[:runlimit], "href")) links[:permworkflows] = - URI.parse(xpath_attr(doc, XPaths[:permwkf], "href")).path + URI.parse(xpath_attr(doc, XPaths[:permwkf], "href")) links end def get_runs(credentials = nil) - run_list = get_attribute("#{@links[:runs]}", "application/xml", - credentials) + run_list = read(links[:runs], "application/xml", credentials) doc = xml_document(run_list) # get list of run identifiers - ids = [] + run_list = {} xpath_find(doc, XPaths[:run]).each do |run| - ids << xml_node_attribute(run, "href").split('/')[-1] + uri = URI.parse(xml_node_attribute(run, "href")) + id = xml_node_content(run) + run_list[id] = uri end # cache run objects - this must be done per user user = credentials.nil? ? :all : credentials.username @runs[user] = {} unless @runs[user] - # add new runs - ids.each do |id| + # add new runs to the user cache + run_list.each_key do |id| if !@runs[user].has_key? id - @runs[user][id] = Run.create(self, "", credentials, id) + @runs[user][id] = Run.create(self, "", credentials, run_list[id]) end end # clear out the expired runs - if @runs[user].length > ids.length - @runs[user].delete_if {|key, val| !ids.member? key} + if @runs[user].length > run_list.length + @runs[user].delete_if {|key, val| !run_list.member? key} end @runs[user] end end