lib/openwfe/engine/engine.rb in openwferu-0.9.5 vs lib/openwfe/engine/engine.rb in openwferu-0.9.6
- old
+ new
@@ -43,10 +43,11 @@
require 'logger'
require 'openwfe/workitem'
require 'openwfe/rudefinitions'
require 'openwfe/service'
+require 'openwfe/util/scheduler'
require 'openwfe/util/schedulers'
require 'openwfe/expool/expressionpool'
require 'openwfe/expool/expstorage'
require 'openwfe/expressions/expressionmap'
require 'openwfe/participants/participantmap'
@@ -59,10 +60,13 @@
# No persistence is used, everything is stored in memory.
#
class Engine < Service
include OwfeServiceLocator
+ #
+ # Builds an OpenWFEru engine.
+ #
def initialize ()
super(S_ENGINE, {})
@application_context[@service_name] = self
@@ -109,15 +113,21 @@
def launch (launch_object, async=false)
launchitem = nil
if launch_object.kind_of? OpenWFE::LaunchItem
+
launchitem = launch_object
+
elsif launch_object.kind_of? Class
+
launchitem = LaunchItem.new(launch_object)
+
elsif launch_object.kind_of? String
+
launchitem = OpenWFE::LaunchItem.new
+
if launch_object[0] == '<'
launchitem.workflowDefinitionUrl = "field:__definition"
launchitem['definition'] = launch_object
else
launchitem.workflowDefinitionUrl = launch_object
@@ -126,17 +136,42 @@
get_expression_pool.launch(launchitem, async)
end
#
+ # Returns the list of applied expressions belonging to a given
+ # workflow instance.
+ # May be used to determine where a process instance currently is.
+ #
+ def get_flow_position (workflow_instance_id)
+
+ get_expression_pool.get_flow_position(workflow_instance_id)
+ end
+
+ #
# This method is used to feed a workitem back to the engine (after
# it got sent to a worklist or wherever by a participant).
# Participant implementations themselves do call this method usually.
#
+ # This method also accepts LaunchItem instances.
+ #
def reply (workitem)
- get_expression_pool.reply(workitem.flow_expression_id, workitem)
+ if workitem.kind_of? InFlowWorkItem
+
+ get_expression_pool.reply(workitem.flow_expression_id, workitem)
+
+ elsif workitem.kind_of? LaunchItem
+
+ get_expression_pool.launch(workitem, false)
+
+ else
+
+ raise \
+ "engine.reply() " +
+ "cannot handle instances of #{workitem.class}"
+ end
end
#
# Registers a participant in this [embedded] engine.
# This method is a shortcut to the ParticipantMap method
@@ -160,10 +195,87 @@
get_participant_map.lookup_participant(participant_name)
end
#
+ # Removes the first participant matching the given name from the
+ # participant map kept by the engine.
+ #
+ def unregister_participant (participant_name)
+
+ get_participant_map.unregister_participant(participant_name)
+ end
+
+ #
+ # Adds a workitem listener to this engine.
+ #
+ # The 'freq' parameters if present might indicate how frequently
+ # the resource should be polled for incoming workitems.
+ #
+ # engine.add_workitem_listener(listener, "3m10s")
+ # # every 3 minutes and 10 seconds
+ #
+ # engine.add_workitem_listener(listener, "0 22 * * 1-5")
+ # # every weekday at 10pm
+ #
+ # TODO : block handling...
+ #
+ def add_workitem_listener (listener, freq=nil)
+
+ name = nil
+
+ if listener.kind_of? Class
+
+ listener = init_service(nil, listener)
+
+ name = listener.service_name
+ else
+
+ name = listener.name if listener.respond_to? :name
+ name = "#{listener.class}::#{listener.object_id}" unless name
+
+ @application_context[name] = listener
+ end
+
+ result = nil
+
+ if freq
+
+ freq = freq.to_s.strip
+
+ result = if Scheduler.is_cron_string(freq)
+
+ get_scheduler.schedule(freq, nil, listener, nil)
+ else
+
+ get_scheduler.schedule_every(freq, listener, nil)
+ end
+ end
+
+ linfo { "add_workitem_listener() added '#{name}'" }
+
+ result
+ end
+
+ #
+ # Makes the current thread join the engine's scheduler thread
+ #
+ # You can thus make an engine standalone with something like :
+ #
+ # require 'openwfe/engine/engine'
+ #
+ # the_engine = OpenWFE::Engine.new
+ # the_engine.join
+ #
+ # And you'll have to hit CTRL-C to make it stop.
+ #
+ def join
+
+ get_scheduler.join
+ end
+
+ #
# Stopping the engine will stop all the services in the
# application context.
#
def stop
@@ -188,26 +300,30 @@
# the following methods may get overridden upon extension
# see for example file_persisted_engine.rb
#
def build_expression_map ()
+
init_service(S_EXPRESSION_MAP, ExpressionMap)
end
def build_expression_pool ()
+
init_service(S_EXPRESSION_POOL, ExpressionPool)
end
def build_expression_storage ()
+
init_service(S_EXPRESSION_STORAGE, InMemoryExpressionStorage)
end
def build_participant_map ()
+
init_service(S_PARTICIPANT_MAP, ParticipantMap)
end
def build_scheduler ()
- #ldebug { "build_scheduler()" }
+
init_service(S_SCHEDULER, SchedulerService)
end
end