require 'rest-client'
require 'active_support'
require 'active_support/core_ext'
require 'nokogiri'
module Dor
# Methods to create and update workflow
module WorkflowService
class << self
@@resource = nil
@@dor_services_url = nil
# From Workflow Service's admin/Process.java
VALID_STATUS = %w{waiting completed error queued skipped hold}
# Creates a workflow for a given object in the repository. If this particular workflow for this objects exists,
# it will replace the old workflow with wf_xml passed to this method. You have the option of creating a datastream or not.
# Returns true on success. Caller must handle any exceptions
#
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object
# @param [String] workflow_name The name of the workflow you want to create
# @param [String] wf_xml The xml that represents the workflow
# @param [Hash] opts optional params
# @option opts [Boolean] :create_ds if true, a workflow datastream will be created in Fedora. Set to false if you do not want a datastream to be created
# If you do not pass in an opts Hash, then :create_ds is set to true by default
# @option opts [String] :lane_id adds laneId attribute to all process elements in the wf_xml workflow xml. Defaults to a value of 'default'
# @return [Boolean] always true
#
def create_workflow(repo, druid, workflow_name, wf_xml, opts = {:create_ds => true})
lane_id = opts.fetch(:lane_id, 'default')
xml = add_lane_id_to_workflow_xml(lane_id, wf_xml)
workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow_name}"].put(xml, :content_type => 'application/xml',
:params => {'create-ds' => opts[:create_ds] })
return true
end
# Updates the status of one step in a workflow.
# Returns true on success. Caller must handle any exceptions
#
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object
# @param [String] workflow The name of the workflow
# @param [String] process The name of the process step
# @param [String] status The status that you want to set -- using one of the values in VALID_STATUS
# @param [Hash] opts optional values for the workflow step
# @option opts [Float] :elapsed The number of seconds it took to complete this step. Can have a decimal. Is set to 0 if not passed in.
# @option opts [String] :lifecycle Bookeeping label for this particular workflow step. Examples are: 'registered', 'shelved'
# @option opts [String] :note Any kind of string annotation that you want to attach to the workflow
# @option opts [String] :lane_id Id of processing lane used by the job manager. Can convey priority or name of an applicaiton specific processing lane (e.g. 'high', 'critical', 'hydrus')
# @option opts [String] :current_status Setting this string tells the workflow service to compare the current status to this value. If the current value does not match this value, the update is not performed
# @return [Boolean] always true
# Http Call
# ==
# The method does an HTTP PUT to the URL defined in `Dor::WF_URI`. As an example:
#
# PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
# "
def update_workflow_status(repo, druid, workflow, process, status, opts = {})
raise ArgumentError, "Unknown status value #{status}" unless VALID_STATUS.include?(status.downcase)
opts = {:elapsed => 0, :lifecycle => nil, :note => nil}.merge!(opts)
opts[:elapsed] = opts[:elapsed].to_s
current_status = opts.delete(:current_status)
xml = create_process_xml({:name => process, :status => status.downcase}.merge!(opts))
uri = "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}"
uri << "?current-status=#{current_status.downcase}" if current_status
workflow_resource[uri].put(xml, :content_type => 'application/xml')
return true
end
#
# Retrieves the process status of the given workflow for the given object identifier
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object
# @param [String] workflow The name of the workflow
# @param [String] process The name of the process step
# @return [String] status for repo-workflow-process-druid
def get_workflow_status(repo, druid, workflow, process)
workflow_md = get_workflow_xml(repo, druid, workflow)
doc = Nokogiri::XML(workflow_md)
raise Exception.new("Unable to parse response:\n#{workflow_md}") if(doc.root.nil?)
status = doc.root.at_xpath("//process[@name='#{process}']/@status")
if status
status=status.content
end
return status
end
#
# Retrieves the raw XML for the given workflow
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object
# @param [String] workflow The name of the workflow
# @return [String] XML of the workflow
def get_workflow_xml(repo, druid, workflow)
workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow}"].get
end
# Get workflow names into an array for given PID
# This method only works when this gem is used in a project that is configured to connect to DOR
#
# @param [String] pid of druid
# @param [String] repo repository for the object. Defaults to 'dor'
# @return [Array] list of worklows
# @example
# Dor::WorkflowService.get_workflows('druid:sr100hp0609')
# => ["accessionWF", "assemblyWF", "disseminationWF"]
def get_workflows(pid, repo='dor')
xml_doc=Nokogiri::XML(get_workflow_xml(repo,pid,''))
return xml_doc.xpath('//workflow').collect {|workflow| workflow['id']}
end
# Get active workflow names into an array for given PID
# This method only works when this gem is used in a project that is configured to connect to DOR
#
# @param [String] repo repository of the object
# @param [String] pid id of object
# @return [Array] list of active worklows. Returns an empty Array if none are found
# @example
# Dor::WorkflowService.get_workflows('dor', 'druid:sr100hp0609')
# => ["accessionWF", "assemblyWF", "disseminationWF"]
def get_active_workflows(repo, pid)
doc = Nokogiri::XML(get_workflow_xml(repo,pid,''))
doc.xpath( %(//workflow[not(process/@archived)]/@id ) ).map {|n| n.value}
end
# Updates the status of one step in a workflow to error.
# Returns true on success. Caller must handle any exceptions
#
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object
# @param [String] workflow The name of the workflow
# @param [String] error_msg The error message. Ideally, this is a brief message describing the error
# @param [Hash] opts optional values for the workflow step
# @option opts [String] :error_text A slot to hold more information about the error, like a full stacktrace
# @return [Boolean] always true
#
# Http Call
# ==
# The method does an HTTP PUT to the URL defined in `Dor::WF_URI`.
#
# PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
# "
def update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {})
opts = {:error_text => nil}.merge!(opts)
xml = create_process_xml({:name => process, :status => 'error', :errorMessage => error_msg}.merge!(opts))
workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}"].put(xml, :content_type => 'application/xml')
return true
end
# Deletes a workflow from a particular repository and druid
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object to delete the workflow from
# @param [String] workflow The name of the workflow to be deleted
# @return [Boolean] always true
def delete_workflow(repo, druid, workflow)
workflow_resource["#{repo}/objects/#{druid}/workflows/#{workflow}"].delete
return true
end
# Returns the Date for a requested milestone from workflow lifecycle
# @param [String] repo repository name
# @param [String] druid object id
# @param [String] milestone name of the milestone being queried for
# @return [Time] when the milestone was achieved. Returns nil if the milestone does not exist
# @example An example lifecycle xml from the workflow service.
#
# registered
# inprocess
# released
#
def get_lifecycle(repo, druid, milestone)
doc = self.query_lifecycle(repo, druid)
milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']")
if(milestone)
return Time.parse(milestone['date'])
end
nil
end
# Returns the Date for a requested milestone ONLY FROM THE ACTIVE workflow table
# @param [String] repo repository name
# @param [String] druid object id
# @param [String] milestone name of the milestone being queried for
# @return [Time] when the milestone was achieved. Returns nil if the milestone does not exist
# @example An example lifecycle xml from the workflow service.
#
# registered
# inprocess
# released
#
def get_active_lifecycle(repo, druid, milestone)
doc = self.query_lifecycle(repo, druid, true)
milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']")
if(milestone)
return Time.parse(milestone['date'])
end
nil
end
# @return [Hash]
def get_milestones(repo, druid)
doc = self.query_lifecycle(repo, druid)
doc.xpath("//lifecycle/milestone").collect do |node|
{ :milestone => node.text, :at => Time.parse(node['date']), :version => node['version'] }
end
end
# Converts repo-workflow-step into repo:workflow:step
# @param [String] default_repository
# @param [String] default_workflow
# @param [String] step if contains colon :, then uses
# the value for workflow and/or workflow/repository.
# for example, jp2-create, or assemblyWF:jp2-create,
# or dor:assemblyWF:jp2-create
# @return [String] repo:workflow:step
# @example
# dor:assemblyWF:jp2-create
def qualify_step(default_repository, default_workflow, step)
current = step.split(/:/,3)
current.unshift(default_workflow) if current.length < 3
current.unshift(default_repository) if current.length < 3
current.join(':')
end
# Returns a list of druids from the WorkflowService that meet the criteria of the passed in completed and waiting params
#
# @param [Array, String] completed An array or single String of the completed steps, should use the qualified format: `repository:workflow:step-name`
# @param [String] waiting name of the waiting step
# @param [String] repository default repository to use if it isn't passed in the qualified-step-name
# @param [String] workflow default workflow to use if it isn't passed in the qualified-step-name
# @param [String] lane_id issue a query for a specific lane_id for the waiting step
# @param [Hash] options
# @param options [String] :default_repository repository to query for if not using the qualified format
# @param options [String] :default_workflow workflow to query for if not using the qualified format
# @option options [Integer] :limit maximum number of druids to return (nil for no limit)
# @return [Array] Array of druids
#
# @example
# get_objects_for_workstep(...)
# => [
# "druid:py156ps0477",
# "druid:tt628cb6479",
# "druid:ct021wp7863"
# ]
#
# @example
# get_objects_for_workstep(..., "lane1")
# => {
# "druid:py156ps0477",
# "druid:tt628cb6479",
# }
#
# @example
# get_objects_for_workstep(..., "lane1", limit: 1)
# => {
# "druid:py156ps0477",
# }
#
def get_objects_for_workstep completed, waiting, lane_id='default', options = {}
result = nil
waiting_param = qualify_step(options[:default_repository],options[:default_workflow],waiting)
uri_string = "workflow_queue?waiting=#{waiting_param}"
if(completed)
Array(completed).each do |step|
completed_param = qualify_step(options[:default_repository],options[:default_workflow],step)
uri_string << "&completed=#{completed_param}"
end
end
if options[:limit] and options[:limit].to_i > 0
uri_string << "&limit=#{options[:limit].to_i}"
end
uri_string << "&lane-id=#{lane_id}"
workflow_resource.options[:timeout] = 5 * 60 unless(workflow_resource.options.include?(:timeout))
resp = workflow_resource[uri_string].get
#
# response looks like:
#
#
#
#
#
# convert into:
# ['druid:ab123de4567', 'druid:ab123de9012']
#
result = Nokogiri::XML(resp).xpath('//object[@id]')
result.map { |n| n[:id] }
end
# Get a list of druids that have errored out in a particular workflow and step
#
# @param [String] workflow name
# @param [String] step name
# @param [String] repository -- optional, default=dor
#
# @return [Hash] hash of results, with key has a druid, and value as the error message
# @example
# Dor::WorkflowService.get_errored_objects_for_workstep('accessionWF','content-metadata')
# => {"druid:qd556jq0580"=>"druid:qd556jq0580 - Item error; caused by
# #"}
def get_errored_objects_for_workstep workflow, step, repository='dor'
result = {}
uri_string = "workflow_queue?repository=#{repository}&workflow=#{workflow}&error=#{step}"
resp = workflow_resource[uri_string].get
objs = Nokogiri::XML(resp).xpath('//object').collect do |node|
result.merge!(node['id'] => node['errorMessage'])
end
result
end
# Returns the number of objects that have a status of 'error' in a particular workflow and step
#
# @param [String] workflow name
# @param [String] step name
# @param [String] repository -- optional, default=dor
#
# @return [Integer] Number of objects with this repository:workflow:step that have a status of 'error'
def count_errored_for_workstep(workflow, step, repository='dor')
count_objects_in_step(workflow, step, repository, 'error')
end
# Returns the number of objects that have a status of 'queued' in a particular workflow and step
#
# @param [String] workflow name
# @param [String] step name
# @param [String] repository -- optional, default=dor
#
# @return [Integer] Number of objects with this repository:workflow:step that have a status of 'queued'
def count_queued_for_workstep(workflow, step, repository='dor')
count_objects_in_step(workflow, step, repository, 'queued')
end
# Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in
# This will enable re-queueing of jobs that have been lost by the job manager
# @param [String] repository name of the repository you want to query, like 'dor' or 'sdr'
# @param [Hash] opts optional values for query
# @option opts [Integer] :hours_ago steps older than this value will be returned by the query. If not passed in, the service defaults to 0 hours,
# meaning you will get all queued workflows
# @option opts [Integer] :limit sets the maximum number of workflow steps that can be returned. Defaults to no limit
# @return [Array[Hash]] each Hash represents a workflow step. It will have the following keys:
# :workflow, :step, :druid, :lane_id
def get_stale_queued_workflows(repository, opts = {})
uri_string = build_queued_uri(repository, opts)
xml = workflow_resource[uri_string].get
parse_queued_workflows_response xml
end
# Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in
# @param [String] repository name of the repository you want to query, like 'dor' or 'sdr'
# @param [Hash] opts optional values for query
# @option opts [Integer] :hours_ago steps older than this value will be returned by the query. If not passed in, the service defaults to 0 hours,
# meaning you will get all queued workflows
# @return [Integer] number of stale, queued steps if the :count_only option was set to true
def count_stale_queued_workflows(repository, opts = {})
uri_string = build_queued_uri(repository, opts)
uri_string << "&count-only=true"
xml = workflow_resource[uri_string].get
doc = Nokogiri::XML(xml)
return doc.at_xpath('/objects/@count').value.to_i
end
# @return [String]
def create_process_xml(params)
builder = Nokogiri::XML::Builder.new do |xml|
attrs = params.reject { |k,v| v.nil? }
attrs = Hash[ attrs.map {|k,v| [k.to_s.camelize(:lower), v]}] # camelize all the keys in the attrs hash
xml.process(attrs)
end
return builder.to_xml
end
# @return [Nokogiri::XML::Document]
def query_lifecycle(repo, druid, active_only = false)
req = "#{repo}/objects/#{druid}/lifecycle"
req << '?active-only=true' if active_only
lifecycle_xml = workflow_resource[req].get
return Nokogiri::XML(lifecycle_xml)
end
def archive_active_workflow(repo, druid)
workflows = get_active_workflows(repo, druid)
workflows.each do |wf|
archive_workflow(repo, druid, wf)
end
end
def archive_workflow(repo, druid, wf_name, version_num=nil)
raise "Please call Dor::WorkflowService.configure(workflow_service_url, :dor_services_url => DOR_SERVIES_URL) once before archiving workflow" if(@@dor_services_url.nil?)
dor_services = RestClient::Resource.new(@@dor_services_url)
url = "/v1/objects/#{druid}/workflows/#{wf_name}/archive"
url << "/#{version_num}" if(version_num)
dor_services[url].post ''
end
# Calls the versionClose endpoint of the WorkflowService:
#
# - completes the versioningWF:submit-version and versioningWF:start-accession steps
# - initiates accesssionWF
#
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object to delete the workflow from
# @param [Boolean] create_accession_wf Option to create accessionWF when closing a version. Defaults to true
def close_version(repo, druid, create_accession_wf = true)
uri = "#{repo}/objects/#{druid}/versionClose"
uri << "?create-accession=false" if(!create_accession_wf)
workflow_resource[uri].post ''
return true
end
# Returns all the distinct laneIds for a given workflow step
#
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] workflow name
# @param [String] process name
# @return [Array] all of the distinct laneIds. Array will be empty if no lane ids were found
def get_lane_ids(repo, workflow, process)
uri = "workflow_queue/lane_ids?step=#{repo}:#{workflow}:#{process}"
doc = Nokogiri::XML(workflow_resource[uri].get)
nodes = doc.xpath('/lanes/lane')
nodes.map {|n| n['id']}
end
# @return [RestClient::Resource] the REST client resource
def workflow_resource
raise "Please call Dor::WorkflowService.configure(url) once before calling any WorkflowService methods" if(@@resource.nil?)
@@resource
end
# Configure the workflow service
#
# @param [String] url points to the workflow service
# @param [Hash] opts optional params
# @option opts [String] :dor_services_uri uri to the DOR REST service
# @option opts [Integer] :timeout number of seconds for RestClient timeout
# @option opts [String] :client_cert_file path to an SSL client certificate (deprecated)
# @option opts [String] :client_key_file path to an SSL key file (deprecated)
# @option opts [String] :client_key_pass password for the key file (deprecated)
# @return [RestClient::Resource] the REST client resource
def configure(url, opts={})
params = {}
params[:timeout] = opts[:timeout] if opts[:timeout]
@@dor_services_url = opts[:dor_services_url] if opts[:dor_services_url]
#params[:ssl_client_cert] = OpenSSL::X509::Certificate.new(File.read(opts[:client_cert_file])) if opts[:client_cert_file]
#params[:ssl_client_key] = OpenSSL::PKey::RSA.new(File.read(opts[:client_key_file]), opts[:client_key_pass]) if opts[:client_key_file]
@@resource = RestClient::Resource.new(url, params)
end
protected
def build_queued_uri(repository, opts = {})
uri_string = "workflow_queue/all_queued?repository=#{repository}"
uri_string << "&hours-ago=#{opts[:hours_ago]}" if opts[:hours_ago]
uri_string << "&limit=#{opts[:limit]}" if opts[:limit]
uri_string
end
def parse_queued_workflows_response(xml)
res = []
doc = Nokogiri::XML(xml)
doc.xpath('/workflows/workflow').each do |wf_node|
wf = {}
wf[:workflow] = wf_node['name']
wf[:step] = wf_node['process']
wf[:druid] = wf_node['druid']
wf[:lane_id] = wf_node['laneId']
res << wf
end
res
end
# Adds laneId attributes to each process of workflow xml
#
# @param [String] lane_id to add to each process element
# @param [String] wf_xml the workflow xml
# @return [String] wf_xml with lane_id attributes
def add_lane_id_to_workflow_xml(lane_id, wf_xml)
doc = Nokogiri::XML(wf_xml)
doc.xpath('/workflow/process').each { |proc| proc['laneId'] = lane_id }
doc.to_xml
end
def count_objects_in_step(workflow, step, type, repo)
uri_string = "workflow_queue?repository=#{repo}&workflow=#{workflow}{type}=#{step}"
resp = @workflow_resource[uri_string].get
node = Nokogiri::XML(resp).at_xpath('/objects')
raise "Unable to determine count from response" if node.nil?
node['count'].to_i
end
end
end
end