lib/t2-server/server.rb in t2-server-0.9.1 vs lib/t2-server/server.rb in t2-server-0.9.2
- old
+ new
@@ -38,14 +38,15 @@
# An interface for directly communicating with one or more Taverna 2 Server
# instances.
class Server
include XML::Methods
- # The version of the remote Taverna Server instance.
- attr_reader :version
-
# :stopdoc:
+ # Internal references to the main rest and admin top-level resource
+ # endpoints.
+ REST_ENDPOINT = "rest/"
+
XPaths = {
# Server top-level XPath queries
:server => XML::Methods.xpath_compile("//nsr:serverDescription"),
:policy => XML::Methods.xpath_compile("//nsr:policy"),
:run => XML::Methods.xpath_compile("//nsr:run"),
@@ -83,17 +84,16 @@
end
# setup connection
@connection = ConnectionFactory.connect(uri, params)
- # add a slash to the end of this address to work around this bug:
- # http://www.mygrid.org.uk/dev/issues/browse/TAVSERV-113
- server_description = xml_document(get_attribute("#{uri.path}/rest/",
- "application/xml"))
- @version = get_version(server_description)
- @links = get_description(server_description)
- @links[:admin] = "#{uri.path}/admin"
+ # The following four fields hold cached data about the server that is
+ # only downloaded the first time it is requested.
+ @server_doc = nil
+ @version = nil
+ @version_components = nil
+ @links = nil
# initialize run object cache
@runs = {}
yield(self) if block_given?
@@ -147,15 +147,41 @@
def initialize_run(workflow, credentials = nil)
# set up the run object cache - this must be done per user
user = credentials.nil? ? :all : credentials.username
@runs[user] = {} unless @runs[user]
- @connection.POST_run("#{@links[:runs]}",
- XML::Fragments::WORKFLOW % workflow, credentials)
+ @connection.POST(links[:runs], workflow,
+ "application/vnd.taverna.t2flow+xml", credentials)
end
# :call-seq:
+ # version -> String
+ #
+ # The version string of the remote Taverna Server.
+ def version
+ if @version.nil?
+ @version = _get_version
+ end
+
+ @version
+ end
+
+ # :call-seq:
+ # version_components -> Array
+ #
+ # An array of the major, minor and patch version components of the remote
+ # Taverna Server.
+ def version_components
+ if @version_components.nil?
+ comps = version.split(".")
+ @version_components = comps.map { |v| v.to_i }
+ end
+
+ @version_components
+ end
+
+ # :call-seq:
# uri -> URI
#
# The URI of the connection to the remote Taverna Server.
def uri
@connection.uri
@@ -166,11 +192,11 @@
#
# The maximum number of runs that this server will allow at any one time.
# Runs in any state (+Initialized+, +Running+ and +Finished+) are counted
# against this maximum.
def run_limit(credentials = nil)
- get_attribute(@links[:runlimit], "text/plain", credentials).to_i
+ read(links[:runlimit], "text/plain", credentials).to_i
end
# :call-seq:
# runs(credentials = nil) -> [runs]
#
@@ -185,33 +211,36 @@
# Return the specified run.
def run(identifier, credentials = nil)
get_runs(credentials)[identifier]
end
- # :call-seq:
- # delete_run(run, credentials = nil) -> bool
- #
- # Delete the specified run from the server, discarding all of its state.
- # _run_ can be either a Run instance or a identifier.
+ # :stopdoc:
def delete_run(run, credentials = nil)
+ warn "[DEPRECATION] 'Server#delete_run' is deprecated and will be " +
+ "removed in 1.0. Please use 'Run#delete' to delete a run."
+
# get the identifier from the run if that is what is passed in
if run.instance_of? Run
run = run.identifier
end
- if delete_attribute("#{@links[:runs]}/#{run}", credentials)
+ run_uri = Util.append_to_uri_path(links[:runs], run)
+ if delete(run_uri, credentials)
# delete cached run object - this must be done per user
user = credentials.nil? ? :all : credentials.username
@runs[user].delete(run) if @runs[user]
true
end
end
+ # :startdoc:
# :call-seq:
# delete_all_runs(credentials = nil)
#
- # Delete all runs on this server, discarding all of their state.
+ # Delete all runs on this server, discarding all of their state. Note that
+ # only those runs that the provided credentials have permission to delete
+ # will be deleted.
def delete_all_runs(credentials = nil)
# first refresh run list
runs(credentials).each {|run| run.delete}
end
@@ -242,156 +271,62 @@
end
run.input_port(input).remote_file = filename
end
- def create_dir(run, root, dir, credentials = nil)
- # get the identifier from the run if that is what is passed in
- if run.instance_of? Run
- run = run.identifier
- end
-
- raise AccessForbiddenError.new("subdirectories (#{dir})") if dir.include? ?/
- @connection.POST_dir("#{@links[:runs]}/#{run}/#{root}",
- XML::Fragments::MKDIR % dir, run, dir, credentials)
+ def mkdir(uri, dir, credentials = nil)
+ @connection.POST(uri, XML::Fragments::MKDIR % dir, "application/xml",
+ credentials)
end
def make_run_dir(run, root, dir, credentials = nil)
warn "[DEPRECATION] 'Server#make_run_dir' is deprecated and will be " +
"removed in 1.0. Please use 'Run#mkdir' instead."
create_dir(run, root, dir, credentials)
end
- def upload_file(run, filename, location, rename, credentials = nil)
+ def upload_file(filename, uri, remote_name, credentials = nil)
contents = IO.read(filename)
- rename = filename.split('/')[-1] if rename == ""
+ remote_name = filename.split('/')[-1] if remote_name == ""
- if upload_data(run, contents, rename, location, credentials)
- rename
- end
+ upload_data(contents, remote_name, uri, credentials)
end
- def upload_data(run, data, remote_name, location, credentials = nil)
- # get the identifier from the run if that is what is passed in
- if run.instance_of? Run
- run = run.identifier
- end
+ def upload_data(data, remote_name, uri, credentials = nil)
+ # Different Server versions support different upload methods
+ (major, minor, patch) = version_components
- contents = Base64.encode64(data)
-
- @connection.POST_file("#{@links[:runs]}/#{run}/#{location}",
- XML::Fragments::UPLOAD % [remote_name, contents], run, credentials)
+ if minor == 4 && patch >= 1
+ put_uri = Util.append_to_uri_path(uri, remote_name)
+ @connection.PUT(put_uri, data, "application/octet-stream", credentials)
+ else
+ contents = Base64.encode64(data)
+ @connection.POST(uri,
+ XML::Fragments::UPLOAD % [remote_name, contents], "application/xml",
+ credentials)
+ end
end
def upload_run_file(run, filename, location, rename, credentials = nil)
warn "[DEPRECATION] 'Server#upload_run_file' is deprecated and will " +
"be removed in 1.0. Please use 'Run#upload_file' or " +
"'Run#input_port(input).file = filename' instead."
upload_file(run, filename, location, rename, credentials)
end
- def create_run_attribute(run, path, value, type, credentials = nil)
- # get the identifier from the run if that is what is passed in
- if run.instance_of? Run
- run = run.identifier
- end
-
- create_attribute("#{@links[:runs]}/#{run}/#{path}", value, type,
- credentials)
- rescue AttributeNotFoundError => e
- if get_runs(credentials).has_key? run
- raise e
- else
- raise RunNotFoundError.new(run)
- end
- end
-
- def get_run_attribute(run, path, type, credentials = nil)
- # get the identifier from the run if that is what is passed in
- if run.instance_of? Run
- run = run.identifier
- end
-
- get_attribute("#{@links[:runs]}/#{run}/#{path}", type, credentials)
- rescue AttributeNotFoundError => e
- if get_runs(credentials).has_key? run
- raise e
- else
- raise RunNotFoundError.new(run)
- end
- end
-
- def set_run_attribute(run, path, value, type, credentials = nil)
- # get the identifier from the run if that is what is passed in
- if run.instance_of? Run
- run = run.identifier
- end
-
- set_attribute("#{@links[:runs]}/#{run}/#{path}", value, type,
- credentials)
- rescue AttributeNotFoundError => e
- if get_runs(credentials).has_key? run
- raise e
- else
- raise RunNotFoundError.new(run)
- end
- end
-
- def delete_run_attribute(run, path, credentials = nil)
- # get the identifier from the run if that is what is passed in
- if run.instance_of? Run
- run = run.identifier
- end
-
- delete_attribute("#{@links[:runs]}/#{run}/#{path}", credentials)
- rescue AttributeNotFoundError => e
- if get_runs(credentials).has_key? run
- raise e
- else
- raise RunNotFoundError.new(run)
- end
- end
-
- def download_run_file(run, path, range, credentials = nil)
- # get the identifier from the run if that is what is passed in
- if run.instance_of? Run
- run = run.identifier
- end
-
- get_attribute("#{@links[:runs]}/#{run}/#{path}",
- "application/octet-stream", range, credentials)
- rescue AttributeNotFoundError => e
- if get_runs(credentials).has_key? run
- raise e
- else
- raise RunNotFoundError.new(run)
- end
- end
-
- def get_admin_attribute(path, credentials = nil)
- get_attribute("#{@links[:admin]}/#{path}", "*/*", credentials)
- end
-
- def set_admin_attribute(path, value, credentials = nil)
- set_attribute("#{@links[:admin]}/#{path}", value, "text/plain",
- credentials)
- end
-
- def admin_resource_writable?(path, credentials = nil)
- headers = @connection.OPTIONS("#{@links[:admin]}/#{path}", credentials)
+ def is_resource_writable?(uri, credentials = nil)
+ headers = @connection.OPTIONS(uri, credentials)
headers["allow"][0].split(",").include? "PUT"
end
- # :startdoc:
- private
- def create_attribute(path, value, type, credentials = nil)
- @connection.POST(path, value, type, credentials)
+ def create(uri, value, type, credentials = nil)
+ @connection.POST(uri, value, type, credentials)
end
- def get_attribute(path, type, *rest)
+ def read(uri, type, *rest)
credentials = nil
range = nil
rest.each do |param|
case param
@@ -403,80 +338,114 @@
range = param[0]..param[1]
end
end
begin
- @connection.GET(path, type, range, credentials)
+ @connection.GET(uri, type, range, credentials)
rescue ConnectionRedirectError => cre
+ # We've been redirected so save the new connection object with the new
+ # server URI and try again with the new URI.
@connection = cre.redirect
+ uri = Util.replace_uri_path(@connection.uri, uri.path)
retry
end
end
- def set_attribute(path, value, type, credentials = nil)
- @connection.PUT(path, value, type, credentials)
+ def update(uri, value, type, credentials = nil)
+ @connection.PUT(uri, value, type, credentials)
end
- def delete_attribute(path, credentials = nil)
- @connection.DELETE(path, credentials)
+ def delete(uri, credentials = nil)
+ @connection.DELETE(uri, credentials)
end
+ # :startdoc:
- def get_version(doc)
+ private
+
+ def links
+ @links = _get_server_links if @links.nil?
+
+ @links
+ end
+
+ def _get_server_description
+ if @server_doc.nil?
+ rest_uri = Util.append_to_uri_path(uri, REST_ENDPOINT)
+ @server_doc = xml_document(read(rest_uri, "application/xml"))
+ end
+
+ @server_doc
+ end
+
+ def _get_version
+ doc = _get_server_description
version = xpath_attr(doc, XPaths[:server], "serverVersion")
if version == nil
raise RuntimeError.new("Taverna Servers prior to version 2.3 " +
"are no longer supported.")
else
- return version.to_f
+ # Remove SNAPSHOT tag if it's there.
+ if version.end_with? "-SNAPSHOT"
+ version.gsub!("-SNAPSHOT", "")
+ end
+
+ # Add .0 if we only have a major and minor component.
+ if version.split(".").length == 2
+ version += ".0"
+ end
+
+ return version
end
end
- def get_description(doc)
+ def _get_server_links
+ doc = _get_server_description
links = {}
- links[:runs] = URI.parse(xpath_attr(doc, XPaths[:runs], "href")).path
+ links[:runs] = URI.parse(xpath_attr(doc, XPaths[:runs], "href"))
- links[:policy] = URI.parse(xpath_attr(doc, XPaths[:policy], "href")).path
- doc = xml_document(get_attribute(links[:policy], "application/xml"))
+ links[:policy] = URI.parse(xpath_attr(doc, XPaths[:policy], "href"))
+ doc = xml_document(read(links[:policy], "application/xml"))
links[:permlisteners] =
- URI.parse(xpath_attr(doc, XPaths[:permlstt], "href")).path
+ URI.parse(xpath_attr(doc, XPaths[:permlstt], "href"))
links[:notifications] =
- URI.parse(xpath_attr(doc, XPaths[:notify], "href")).path
+ URI.parse(xpath_attr(doc, XPaths[:notify], "href"))
links[:runlimit] =
- URI.parse(xpath_attr(doc, XPaths[:runlimit], "href")).path
+ URI.parse(xpath_attr(doc, XPaths[:runlimit], "href"))
links[:permworkflows] =
- URI.parse(xpath_attr(doc, XPaths[:permwkf], "href")).path
+ URI.parse(xpath_attr(doc, XPaths[:permwkf], "href"))
links
end
def get_runs(credentials = nil)
- run_list = get_attribute("#{@links[:runs]}", "application/xml",
- credentials)
+ run_list = read(links[:runs], "application/xml", credentials)
doc = xml_document(run_list)
# get list of run identifiers
- ids = []
+ run_list = {}
xpath_find(doc, XPaths[:run]).each do |run|
- ids << xml_node_attribute(run, "href").split('/')[-1]
+ uri = URI.parse(xml_node_attribute(run, "href"))
+ id = xml_node_content(run)
+ run_list[id] = uri
end
# cache run objects - this must be done per user
user = credentials.nil? ? :all : credentials.username
@runs[user] = {} unless @runs[user]
- # add new runs
- ids.each do |id|
+ # add new runs to the user cache
+ run_list.each_key do |id|
if !@runs[user].has_key? id
- @runs[user][id] = Run.create(self, "", credentials, id)
+ @runs[user][id] = Run.create(self, "", credentials, run_list[id])
end
end
# clear out the expired runs
- if @runs[user].length > ids.length
- @runs[user].delete_if {|key, val| !ids.member? key}
+ if @runs[user].length > run_list.length
+ @runs[user].delete_if {|key, val| !run_list.member? key}
end
@runs[user]
end
end