require 'active_support'
require 'active_support/core_ext'
require 'nokogiri'
require 'retries'
require 'faraday'
require 'dor/workflow_exception'
module Dor
# TODO: major version revision: change pattern of usage to be normal non-singleton class
# TODO: convert @@class_vars to regular attributes
# TODO: create normal initalize method, deprecate configure
# TODO: hardcoded 'true' returns are dumb, instead return the response object where possible
# TODO: VALID_STATUS should be just another attribute w/ default
# TODO: allow constructor/initalizer to receive Faraday object(s), not just URLs (solves SSL/proxy config problem)
# TODO: allow constructor/initalizer to receive logger
# Create and update workflows
class WorkflowService
class << self
@@handler = nil
@@logger = nil
@@resource = nil
@@dor_services_url = nil
@@http_conn = nil
# From Workflow Service's admin/
VALID_STATUS = %w{waiting completed error queued skipped hold}
# Creates a workflow for a given object in the repository. If this particular workflow for this objects exists,
# it will replace the old workflow with wf_xml passed to this method. You have the option of creating a datastream or not.
# Returns true on success. Caller must handle any exceptions
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object
# @param [String] workflow_name The name of the workflow you want to create
# @param [String] wf_xml The xml that represents the workflow
# @param [Hash] opts optional params
# @option opts [Boolean] :create_ds if true, a workflow datastream will be created in Fedora. Set to false if you do not want a datastream to be created
# If you do not pass in an opts Hash, then :create_ds is set to true by default
# @option opts [String] :lane_id adds laneId attribute to all process elements in the wf_xml workflow xml. Defaults to a value of 'default'
# @return [Boolean] always true
def create_workflow(repo, druid, workflow_name, wf_xml, opts = {:create_ds => true})
lane_id = opts.fetch(:lane_id, 'default')
xml = add_lane_id_to_workflow_xml(lane_id, wf_xml)
status = workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow_name}", 'put', xml,
:content_type => 'application/xml',
:params => { 'create-ds' => opts[:create_ds] }
# Updates the status of one step in a workflow.
# Returns true on success. Caller must handle any exceptions
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object
# @param [String] workflow The name of the workflow
# @param [String] process The name of the process step
# @param [String] status The status that you want to set -- using one of the values in VALID_STATUS
# @param [Hash] opts optional values for the workflow step
# @option opts [Float] :elapsed The number of seconds it took to complete this step. Can have a decimal. Is set to 0 if not passed in.
# @option opts [String] :lifecycle Bookeeping label for this particular workflow step. Examples are: 'registered', 'shelved'
# @option opts [String] :note Any kind of string annotation that you want to attach to the workflow
# @option opts [String] :lane_id Id of processing lane used by the job manager. Can convey priority or name of an applicaiton specific processing lane (e.g. 'high', 'critical', 'hydrus')
# @option opts [String] :current_status Setting this string tells the workflow service to compare the current status to this value. If the current value does not match this value, the update is not performed
# @return [Boolean] always true
# Http Call
# ==
# The method does an HTTP PUT to the URL defined in `Dor::WF_URI`. As an example:
# PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
# "
def update_workflow_status(repo, druid, workflow, process, status, opts = {})
raise ArgumentError, "Unknown status value #{status}" unless VALID_STATUS.include?(status.downcase)
opts = { :elapsed => 0, :lifecycle => nil, :note => nil }.merge!(opts)
opts[:elapsed] = opts[:elapsed].to_s
current_status = opts.delete(:current_status)
xml = create_process_xml({ :name => process, :status => status.downcase }.merge!(opts))
uri = "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}"
uri << "?current-status=#{current_status.downcase}" if current_status
workflow_resource_method(uri, 'put', xml, { :content_type => 'application/xml' })
# Retrieves the process status of the given workflow for the given object identifier
# @param [String] repo The repository the object resides in. Currently recoginzes "dor" and "sdr".
# @param [String] druid The id of the object
# @param [String] workflow The name of the workflow
# @param [String] process The name of the process step
# @return [String] status for repo-workflow-process-druid
def get_workflow_status(repo, druid, workflow, process)
workflow_md = get_workflow_xml(repo, druid, workflow)
doc = Nokogiri::XML(workflow_md)
raise"Unable to parse response:\n#{workflow_md}") if doc.root.nil?
processes = doc.root.xpath("//process[@name='#{process}']")
process = processes.max { |a, b| a.attr('version').to_i <=> b.attr('version').to_i }
# Retrieves the raw XML for the given workflow
# @param [String] repo The repository the object resides in. Currently recoginzes "dor" and "sdr".
# @param [String] druid The id of the object
# @param [String] workflow The name of the workflow
# @return [String] XML of the workflow
def get_workflow_xml(repo, druid, workflow)
workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}"
# Get workflow names into an array for given PID
# This method only works when this gem is used in a project that is configured to connect to DOR
# @param [String] pid of druid
# @param [String] repo repository for the object
# @return [Array] list of worklows
# @example
# Dor::WorkflowService.get_workflows('druid:sr100hp0609')
# => ["accessionWF", "assemblyWF", "disseminationWF"]
def get_workflows(pid, repo = 'dor')
xml_doc = Nokogiri::XML(get_workflow_xml(repo, pid, ''))
xml_doc.xpath('//workflow').collect {|workflow| workflow['id']}
# Get active workflow names into an array for given PID
# This method only works when this gem is used in a project that is configured to connect to DOR
# @param [String] repo repository of the object
# @param [String] pid id of object
# @return [Array] list of active worklows. Returns an empty Array if none are found
# @example
# Dor::WorkflowService.get_active_workflows('dor', 'druid:sr100hp0609')
# => ["accessionWF", "assemblyWF", "disseminationWF"]
def get_active_workflows(repo, pid)
doc = Nokogiri::XML(get_workflow_xml(repo, pid, ''))
doc.xpath( %(//workflow[not(process/@archived)]/@id ) ).map {|n| n.value}
# Updates the status of one step in a workflow to error.
# Returns true on success. Caller must handle any exceptions
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object
# @param [String] workflow The name of the workflow
# @param [String] error_msg The error message. Ideally, this is a brief message describing the error
# @param [Hash] opts optional values for the workflow step
# @option opts [String] :error_text A slot to hold more information about the error, like a full stacktrace
# @return [Boolean] always true
# Http Call
# ==
# The method does an HTTP PUT to the URL defined in `Dor::WF_URI`.
# PUT "/dor/objects/pid:123/workflows/GoogleScannedWF/convert"
# "
def update_workflow_error_status(repo, druid, workflow, process, error_msg, opts = {})
opts = {:error_text => nil}.merge!(opts)
xml = create_process_xml({:name => process, :status => 'error', :errorMessage => error_msg}.merge!(opts))
workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}/#{process}", 'put', xml, {:content_type => 'application/xml'}
# Deletes a workflow from a particular repository and druid
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object to delete the workflow from
# @param [String] workflow The name of the workflow to be deleted
# @return [Boolean] always true
def delete_workflow(repo, druid, workflow)
workflow_resource_method "#{repo}/objects/#{druid}/workflows/#{workflow}", 'delete'
# Returns the Date for a requested milestone from workflow lifecycle
# @param [String] repo repository name
# @param [String] druid object id
# @param [String] milestone name of the milestone being queried for
# @return [Time] when the milestone was achieved. Returns nil if the milestone does not exist
# @example An example lifecycle xml from the workflow service.
# registered
# inprocess
# released
def get_lifecycle(repo, druid, milestone)
doc = query_lifecycle(repo, druid)
milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']")
return Time.parse(milestone['date']) if milestone
# Returns the Date for a requested milestone ONLY FROM THE ACTIVE workflow table
# @param [String] repo repository name
# @param [String] druid object id
# @param [String] milestone name of the milestone being queried for
# @return [Time] when the milestone was achieved. Returns nil if the milestone does not exist
# @example An example lifecycle xml from the workflow service.
# registered
# inprocess
# released
def get_active_lifecycle(repo, druid, milestone)
doc = query_lifecycle(repo, druid, true)
milestone = doc.at_xpath("//lifecycle/milestone[text() = '#{milestone}']")
return Time.parse(milestone['date']) if milestone
# @return [Hash]
def get_milestones(repo, druid)
doc = query_lifecycle(repo, druid)
doc.xpath('//lifecycle/milestone').collect do |node|
{ :milestone => node.text, :at => Time.parse(node['date']), :version => node['version'] }
# Converts repo-workflow-step into repo:workflow:step
# @param [String] default_repository
# @param [String] default_workflow
# @param [String] step if contains colon :, then the value for workflow and/or workflow/repository. For example: 'jp2-create', 'assemblyWF:jp2-create' or 'dor:assemblyWF:jp2-create'
# @return [String] repo:workflow:step
# @example
# dor:assemblyWF:jp2-create
def qualify_step(default_repository, default_workflow, step)
current = step.split(/:/, 3)
current.unshift(default_workflow) if current.length < 3
current.unshift(default_repository) if current.length < 3
# Returns a list of druids from the WorkflowService that meet the criteria of the passed in completed and waiting params
# @param [Array, String] completed An array or single String of the completed steps, should use the qualified format: `repository:workflow:step-name`
# @param [String] waiting name of the waiting step
# @param [String] repository default repository to use if it isn't passed in the qualified-step-name
# @param [String] workflow default workflow to use if it isn't passed in the qualified-step-name
# @param [String] lane_id issue a query for a specific lane_id for the waiting step
# @param [Hash] options
# @param options [String] :default_repository repository to query for if not using the qualified format
# @param options [String] :default_workflow workflow to query for if not using the qualified format
# @option options [Integer] :limit maximum number of druids to return (nil for no limit)
# @return [Array] Array of druids
# @example
# get_objects_for_workstep(...)
# => [
# "druid:py156ps0477",
# "druid:tt628cb6479",
# "druid:ct021wp7863"
# ]
# @example
# get_objects_for_workstep(..., "lane1")
# => {
# "druid:py156ps0477",
# "druid:tt628cb6479",
# }
# @example
# get_objects_for_workstep(..., "lane1", limit: 1)
# => {
# "druid:py156ps0477",
# }
def get_objects_for_workstep(completed, waiting, lane_id = 'default', options = {})
waiting_param = qualify_step(options[:default_repository], options[:default_workflow], waiting)
uri_string = "workflow_queue?waiting=#{waiting_param}"
if completed
Array(completed).each do |step|
completed_param = qualify_step(options[:default_repository], options[:default_workflow], step)
uri_string << "&completed=#{completed_param}"
uri_string << "&limit=#{options[:limit].to_i}" if options[:limit] && options[:limit].to_i > 0
uri_string << "&lane-id=#{lane_id}"
resp = workflow_resource_method uri_string
# response looks like:
# convert into:
# ['druid:ab123de4567', 'druid:ab123de9012']
result = Nokogiri::XML(resp).xpath('//object[@id]') { |n| n[:id] }
# Get a list of druids that have errored out in a particular workflow and step
# @param [String] workflow name
# @param [String] step name
# @param [String] repository -- optional, default=dor
# @return [Hash] hash of results, with key has a druid, and value as the error message
# @example
# Dor::WorkflowService.get_errored_objects_for_workstep('accessionWF','content-metadata')
# => {"druid:qd556jq0580"=>"druid:qd556jq0580 - Item error; caused by
# #"}
def get_errored_objects_for_workstep(workflow, step, repository = 'dor')
resp = workflow_resource_method "workflow_queue?repository=#{repository}&workflow=#{workflow}&error=#{step}"
result = {}
Nokogiri::XML(resp).xpath('//object').collect do |node|
result.merge!(node['id'] => node['errorMessage'])
# Returns the number of objects that have a status of 'error' in a particular workflow and step
# @param [String] workflow name
# @param [String] step name
# @param [String] repository -- optional, default=dor
# @return [Integer] Number of objects with this repository:workflow:step that have a status of 'error'
def count_errored_for_workstep(workflow, step, repository = 'dor')
count_objects_in_step(workflow, step, 'error', repository)
# Returns the number of objects that have a status of 'queued' in a particular workflow and step
# @param [String] workflow name
# @param [String] step name
# @param [String] repository -- optional, default=dor
# @return [Integer] Number of objects with this repository:workflow:step that have a status of 'queued'
def count_queued_for_workstep(workflow, step, repository = 'dor')
count_objects_in_step(workflow, step, 'queued', repository)
# Returns the number of objects that have completed a particular workflow
# @param [String] workflow name
# @param [String] repository -- optional, default=dor
# @return [Integer] Number of objects with this repository:workflow that have been archived
def count_archived_for_workflow(workflow, repository = 'dor')
resp = workflow_resource_method "workflow_archive?repository=#{repository}&workflow=#{workflow}&count-only=true"
# Gets all of the workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in
# This will enable re-queueing of jobs that have been lost by the job manager
# @param [String] repository name of the repository you want to query, like 'dor' or 'sdr'
# @param [Hash] opts optional values for query
# @option opts [Integer] :hours_ago steps older than this value will be returned by the query. If not passed in, the service defaults to 0 hours,
# meaning you will get all queued workflows
# @option opts [Integer] :limit sets the maximum number of workflow steps that can be returned. Defaults to no limit
# @return [Array[Hash]] each Hash represents a workflow step. It will have the following keys:
# :workflow, :step, :druid, :lane_id
def get_stale_queued_workflows(repository, opts = {})
uri_string = build_queued_uri(repository, opts)
parse_queued_workflows_response workflow_resource_method(uri_string)
# Returns a count of workflow steps that have a status of 'queued' that have a last-updated timestamp older than the number of hours passed in
# @param [String] repository name of the repository you want to query, like 'dor' or 'sdr'
# @param [Hash] opts optional values for query
# @option opts [Integer] :hours_ago steps older than this value will be returned by the query. If not passed in, the service defaults to 0 hours,
# meaning you will get all queued workflows
# @return [Integer] number of stale, queued steps if the :count_only option was set to true
def count_stale_queued_workflows(repository, opts = {})
uri_string = build_queued_uri(repository, opts) + '&count-only=true'
doc = Nokogiri::XML(workflow_resource_method uri_string)
# @param [Hash] params
# @return [String]
def create_process_xml(params)
builder = do |xml|
attrs = params.reject { |k, v| v.nil? }
attrs = Hash[ {|k, v| [k.to_s.camelize(:lower), v]}] # camelize all the keys in the attrs hash
# @return [Nokogiri::XML::Document]
def query_lifecycle(repo, druid, active_only = false)
req = "#{repo}/objects/#{druid}/lifecycle"
req << '?active-only=true' if active_only
Nokogiri::XML(workflow_resource_method req)
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object to archive the workflows from
def archive_active_workflow(repo, druid)
workflows = get_active_workflows(repo, druid)
workflows.each do |wf|
archive_workflow(repo, druid, wf)
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object to delete the workflow from
def archive_workflow(repo, druid, wf_name, version_num = nil)
raise 'Please call Dor::WorkflowService.configure(workflow_service_url, :dor_services_url => DOR_SERVIES_URL) once before archiving workflow' if @@dor_services_url.nil?
url = "/v1/objects/#{druid}/workflows/#{wf_name}/archive"
url << "/#{version_num}" if version_num
workflow_resource_method(url, 'post', '')
# Calls the versionClose endpoint of the WorkflowService:
# - completes the versioningWF:submit-version and versioningWF:start-accession steps
# - initiates accesssionWF
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] druid The id of the object to delete the workflow from
# @param [Boolean] create_accession_wf Option to create accessionWF when closing a version. Defaults to true
def close_version(repo, druid, create_accession_wf = true)
uri = "#{repo}/objects/#{druid}/versionClose"
uri << '?create-accession=false' unless create_accession_wf
workflow_resource_method(uri, 'post', '')
# Returns all the distinct laneIds for a given workflow step
# @param [String] repo The repository the object resides in. The service recoginzes "dor" and "sdr" at the moment
# @param [String] workflow name
# @param [String] process name
# @return [Array] all of the distinct laneIds. Array will be empty if no lane ids were found
def get_lane_ids(repo, workflow, process)
uri = "workflow_queue/lane_ids?step=#{repo}:#{workflow}:#{process}"
doc = Nokogiri::XML(workflow_resource_method uri)
nodes = doc.xpath('/lanes/lane') { |n| n['id'] }
# @return [Faraday::Connection] the REST client resource created during configure()
def workflow_resource
raise 'Please call Dor::WorkflowService.configure(url) once before calling any WorkflowService methods' if @@http_conn.nil?
# Get the configured URL for the connection
def base_url
# Among other things, a distinct method helps tests mock default logger
# @param [String, IO] logdev The log device. This is a filename (String) or IO object (typically STDOUT, STDERR, or an open file).
# @param [String, Integer] shift_age Number of old log files to keep, or frequency of rotation (daily, weekly or monthly).
# @return [Logger] default logger object
def default_logger(logdev = 'workflow_service.log', shift_age = 'weekly'), shift_age)
def workflow_service_exceptions_to_catch
# Configure the workflow service
# TODO: replace with initialize
# @param [String] url points to the workflow service
# @param [Hash] opts optional params
# @option opts [Logger] :logger defaults writing to workflow_service.log with weekly rotation
# @option opts [String] :dor_services_url uri to the DOR REST service
# @option opts [Integer] :timeout number of seconds for HTTP timeout
# @option opts [String] :client_cert_file path to an SSL client certificate (deprecated)
# @option opts [String] :client_key_file path to an SSL key file (deprecated)
# @option opts [String] :client_key_pass password for the key file (deprecated)
# @return [Faraday::Connection] the REST client resource
def configure(url_or_connection, opts = {})
@@logger = opts[:logger] || default_logger
@@dor_services_url = opts[:dor_services_url] if opts[:dor_services_url]
# params[:ssl_client_cert] =[:client_cert_file])) if opts[:client_cert_file]
# params[:ssl_client_key] =[:client_key_file]), opts[:client_key_pass]) if opts[:client_key_file]
@@handler = do |exception, attempt_number, total_delay|
@@logger.warn "[Attempt #{attempt_number}] #{exception.class}: #{exception.message}; #{total_delay} seconds elapsed."
@@http_conn = case url_or_connection
when String url_or_connection) do |faraday|
faraday.response :logger if opts[:debug] # logs to STDOUT
faraday.use Faraday::Response::RaiseError
faraday.adapter :net_http_persistent # use Keep-Alive connections
faraday.options.params_encoder = Faraday::FlatParamsEncoder
if opts.key? :timeout
faraday.options.timeout = opts[:timeout]
faraday.options.open_timeout = opts[:timeout]
def count_objects_in_step(workflow, step, type, repo)
resp = workflow_resource_method "workflow_queue?repository=#{repo}&workflow=#{workflow}{type}=#{step}"
def build_queued_uri(repository, opts = {})
uri_string = "workflow_queue/all_queued?repository=#{repository}"
uri_string << "&hours-ago=#{opts[:hours_ago]}" if opts[:hours_ago]
uri_string << "&limit=#{opts[:limit]}" if opts[:limit]
def parse_queued_workflows_response(xml)
doc = Nokogiri::XML(xml)
doc.xpath('/workflows/workflow').collect do |wf_node|
:workflow => wf_node['name'],
:step => wf_node['process'],
:druid => wf_node['druid'],
:lane_id => wf_node['laneId']
# Adds laneId attributes to each process of workflow xml
# @param [String] lane_id to add to each process element
# @param [String] wf_xml the workflow xml
# @return [String] wf_xml with lane_id attributes
def add_lane_id_to_workflow_xml(lane_id, wf_xml)
doc = Nokogiri::XML(wf_xml)
doc.xpath('/workflow/process').each { |proc| proc['laneId'] = lane_id }
def extract_object_count(resp)
node = Nokogiri::XML(resp).at_xpath('/objects')
raise'Unable to determine count from response') if node.nil?
# calls workflow_resource[uri_string]."#{meth}" with variable number of optional arguments
# The point of this is to wrap ALL remote calls with consistent error handling and logging
# @param [String] uri_string resource to request
# @param [String] meth REST method to use on resource (get, put, post, delete, etc.)
# @param [String] payload body for (e.g. put) request
# @param [Hash] opts addtional headers options
# @return [Object] response from method
def workflow_resource_method(uri_string, meth = 'get', payload = '', opts = {})
with_retries(:max_tries => 2, :handler => @@handler, :rescue => workflow_service_exceptions_to_catch) do |attempt| "[Attempt #{attempt}] #{meth} #{base_url}/#{uri_string}"
response = send_workflow_resource_request(uri_string, meth, payload, opts)
rescue *workflow_service_exceptions_to_catch => e
msg = "Failed to retrieve resource: #{meth} #{base_url}#{uri_string}"
msg += " (HTTP status #{e.response[:status]})" if e.respond_to?(:response) && e.response
raise Dor::WorkflowException, msg
def send_workflow_resource_request(uri_string, meth = 'get', payload = '', opts = {})
workflow_resource.send(meth, uri_string) do |req|
req.body = payload unless meth == 'delete'
req.params.update opts[:params] if opts[:params]
req.headers.update opts.except(:params)