lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.6 vs lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.7
- old
+ new
@@ -40,10 +40,11 @@
# John Mettraux at openwfe.org
#
require 'openwfe/utils'
require 'openwfe/rudefinitions'
+require 'openwfe/storage/yamlextras'
require 'openwfe/storage/yamlfilestorage'
require 'openwfe/expressions/flowexpression'
require 'openwfe/expressions/raw_xml'
#
@@ -54,59 +55,24 @@
module OpenWFE
#
- # reopening some classes in order to facilitate their
- # yaml serialization
- #
-
- #
- # opening for tuning yaml persistence
- #
- class FlowExpression
-
- def to_yaml_properties
-
- l = super()
-
- l.delete("@application_context")
-
- l.delete("@timeout_job_id")
- l.delete("@scheduler_job_id")
- #
- # scheduler ids should not get persisted
-
- return l
- end
- end
-
- #
- # opening for tuning yaml persistence
- #
- class XmlRawExpression
- def to_yaml_properties
- l = super()
- l.delete("@raw_representation")
- return l
- end
- end
-
- #
# yaml expression storage
#
class YamlFileExpressionStorage < YamlFileStorage
include OwfeServiceLocator
def initialize (service_name, application_context)
- path = if (@application_context)
- @application_context[:work_directory]
- else
- DEFAULT_WORK_DIRECTORY
- end
- path = path + '/expool'
+ #path = if (@application_context)
+ # @application_context[:work_directory]
+ #else
+ # DEFAULT_WORK_DIRECTORY
+ #end
+ #path = path + '/expool'
+ path = OpenWFE::get_work_directory + '/expool'
super(service_name, application_context, path)
observe_expool()
end
@@ -180,15 +146,17 @@
return @basepath + "/engine_environment.yaml" \
if fei.workflow_instance_id == "0"
wfid = fei.parent_workflow_instance_id
+
+ #a_wfid = OpenWFE::split_wfid(wfid)
+ a_wfid = get_wfid_generator.split_wfid(wfid)
@basepath +
- #wfid[-3, 1] + "/" +
- wfid[-2, 1] + "/" +
- wfid[-1, 1] + "/" +
+ a_wfid[-2] + "/" +
+ a_wfid[-1] + "/" +
fei.workflow_instance_id + "__" +
fei.expression_id + "_" +
fei.expression_name + ".yaml"
end
@@ -207,28 +175,32 @@
# With this extension of YmalFileExpressionStorage, persistence occurs
# in a separate thread, for a snappier response.
#
class ThreadedYamlFileExpressionStorage < YamlFileExpressionStorage
+ FREQ = "400" # milliseconds
+ #
+ # the frequency at which the event queue should be processed
+
def initialize (service_name, application_context)
super
@events = {}
@op_count = 0
- @stopped = false
-
start_processing_thread()
+ #
+ # which sets @thread_id
end
#
# Will take care of stopping the 'queue processing' thread.
#
def stop
- @stopped = true
+ get_scheduler.unschedule(@thread_id) if @thread_id
process_queue()
#
# flush every remaining events (especially the :delete ones)
end
@@ -256,15 +228,12 @@
end
protected
def start_processing_thread
- Thread.new do
- while true
- sleep(0.33)
- break if @stopped
- process_queue()
- end
+
+ @thread_id = get_scheduler.schedule_every(FREQ) do
+ process_queue()
end
end
def queue (event, fei, fe=nil)
synchronize do