lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.6 vs lib/openwfe/expool/yamlexpstorage.rb in openwferu-0.9.7

- old
+ new

@@ -40,10 +40,11 @@ # John Mettraux at openwfe.org # require 'openwfe/utils' require 'openwfe/rudefinitions' +require 'openwfe/storage/yamlextras' require 'openwfe/storage/yamlfilestorage' require 'openwfe/expressions/flowexpression' require 'openwfe/expressions/raw_xml' # @@ -54,59 +55,24 @@ module OpenWFE # - # reopening some classes in order to facilitate their - # yaml serialization - # - - # - # opening for tuning yaml persistence - # - class FlowExpression - - def to_yaml_properties - - l = super() - - l.delete("@application_context") - - l.delete("@timeout_job_id") - l.delete("@scheduler_job_id") - # - # scheduler ids should not get persisted - - return l - end - end - - # - # opening for tuning yaml persistence - # - class XmlRawExpression - def to_yaml_properties - l = super() - l.delete("@raw_representation") - return l - end - end - - # # yaml expression storage # class YamlFileExpressionStorage < YamlFileStorage include OwfeServiceLocator def initialize (service_name, application_context) - path = if (@application_context) - @application_context[:work_directory] - else - DEFAULT_WORK_DIRECTORY - end - path = path + '/expool' + #path = if (@application_context) + # @application_context[:work_directory] + #else + # DEFAULT_WORK_DIRECTORY + #end + #path = path + '/expool' + path = OpenWFE::get_work_directory + '/expool' super(service_name, application_context, path) observe_expool() end @@ -180,15 +146,17 @@ return @basepath + "/engine_environment.yaml" \ if fei.workflow_instance_id == "0" wfid = fei.parent_workflow_instance_id + + #a_wfid = OpenWFE::split_wfid(wfid) + a_wfid = get_wfid_generator.split_wfid(wfid) @basepath + - #wfid[-3, 1] + "/" + - wfid[-2, 1] + "/" + - wfid[-1, 1] + "/" + + a_wfid[-2] + "/" + + a_wfid[-1] + "/" + fei.workflow_instance_id + "__" + fei.expression_id + "_" + fei.expression_name + ".yaml" end @@ -207,28 +175,32 @@ # With this extension of YmalFileExpressionStorage, persistence occurs # in a separate thread, for a snappier response. # class ThreadedYamlFileExpressionStorage < YamlFileExpressionStorage + FREQ = "400" # milliseconds + # + # the frequency at which the event queue should be processed + def initialize (service_name, application_context) super @events = {} @op_count = 0 - @stopped = false - start_processing_thread() + # + # which sets @thread_id end # # Will take care of stopping the 'queue processing' thread. # def stop - @stopped = true + get_scheduler.unschedule(@thread_id) if @thread_id process_queue() # # flush every remaining events (especially the :delete ones) end @@ -256,15 +228,12 @@ end protected def start_processing_thread - Thread.new do - while true - sleep(0.33) - break if @stopped - process_queue() - end + + @thread_id = get_scheduler.schedule_every(FREQ) do + process_queue() end end def queue (event, fei, fe=nil) synchronize do