lib/openwfe/expool/journal.rb in ruote-0.9.18 vs lib/openwfe/expool/journal.rb in ruote-0.9.19

- old
+ new

@@ -1,34 +1,34 @@ # #-- # Copyright (c) 2007-2008, John Mettraux, OpenWFE.org # All rights reserved. -# -# Redistribution and use in source and binary forms, with or without +# +# Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions are met: -# +# # . Redistributions of source code must retain the above copyright notice, this -# list of conditions and the following disclaimer. -# -# . Redistributions in binary form must reproduce the above copyright notice, -# this list of conditions and the following disclaimer in the documentation +# list of conditions and the following disclaimer. +# +# . Redistributions in binary form must reproduce the above copyright notice, +# this list of conditions and the following disclaimer in the documentation # and/or other materials provided with the distribution. -# +# # . Neither the name of the "OpenWFE" nor the names of its contributors may be # used to endorse or promote products derived from this software without # specific prior written permission. -# -# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" -# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE -# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE -# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE -# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR -# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF -# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS -# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN -# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) -# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" +# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE +# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE +# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR +# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF +# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS +# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN +# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) +# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE # POSSIBILITY OF SUCH DAMAGE. #++ # # @@ -47,178 +47,178 @@ require 'openwfe/storage/yamlcustom' require 'openwfe/expool/journal_replay' module OpenWFE - - # - # Keeping a replayable track of the events in an OpenWFEru engine - # - class Journal < Service - include MonitorMixin - include OwfeServiceLocator - include JournalReplay - include FeiMixin - attr_reader :workdir, :donedir + # + # Keeping a replayable track of the events in an OpenWFEru engine + # + class Journal < Service + include MonitorMixin + include OwfeServiceLocator + include JournalReplay + include FeiMixin - FREQ = "1m" - # - # once per minute, makes sure the buckets are flushed - - def initialize (service_name, application_context) + attr_reader :workdir, :donedir - super + FREQ = "1m" + # + # once per minute, makes sure the buckets are flushed - @buckets = {} + def initialize (service_name, application_context) - @workdir = get_work_directory + "/journal" - @donedir = @workdir + "/done" + super - FileUtils.makedirs(@donedir) unless File.exist?(@donedir) + @buckets = {} - get_expression_pool.add_observer(:all) do |event, *args| - #ldebug { ":#{event} for #{args[0].class.name}" } - queue_event(event, *args) - end + @workdir = get_work_directory + "/journal" + @donedir = @workdir + "/done" - @thread_id = get_scheduler.schedule_every(FREQ) do - flush_buckets() - end - end + FileUtils.makedirs(@donedir) unless File.exist?(@donedir) - # - # Will flush the journal of every open instance. - # - def stop - get_scheduler.unschedule(@thread_id) if @thread_id - flush_buckets() - end + get_expression_pool.add_observer(:all) do |event, *args| + #ldebug { ":#{event} for #{args[0].class.name}" } + queue_event(event, *args) + end - protected + @thread_id = get_scheduler.schedule_every(FREQ) do + flush_buckets() + end + end - # - # Queues the events before a flush. - # - # If the event is a :terminate, the individual bucket will get - # flushed. - # - def queue_event (event, *args) + # + # Will flush the journal of every open instance. + # + def stop + get_scheduler.unschedule(@thread_id) if @thread_id + flush_buckets() + end - #ldebug { "queue_event() :#{event}" } + protected - return if event == :stop - return if event == :launch - return if event == :reschedule + # + # Queues the events before a flush. + # + # If the event is a :terminate, the individual bucket will get + # flushed. + # + def queue_event (event, *args) - wfid = extract_fei(args[0]).parent_wfid - # - # maybe args[0] could be a FlowExpression instead - # of a FlowExpressionId instance - #puts "___#{event}__wfid : #{wfid}" + #ldebug { "queue_event() :#{event}" } - e = serialize_event(event, *args) + return if event == :stop + return if event == :launch + return if event == :reschedule - bucket = nil + wfid = extract_fei(args[0]).parent_wfid + # + # maybe args[0] could be a FlowExpression instead + # of a FlowExpressionId instance + #puts "___#{event}__wfid : #{wfid}" - synchronize do + e = serialize_event(event, *args) - bucket = get_bucket(wfid) - bucket << e + bucket = nil - #ldebug { "queue_event() bucket : #{bucket.object_id}" } + synchronize do - if event == :terminate + bucket = get_bucket(wfid) + bucket << e - bucket.flush - @buckets.delete(wfid) - end - end - # - # minimizing the sync block + #ldebug { "queue_event() bucket : #{bucket.object_id}" } - # TODO : spin that off this thread, to the - # flush thread... - # - if event == :terminate - if @application_context[:keep_journals] == true - # - # 'move' journal to the done/ subdir of journal/ - # - FileUtils.cp( - bucket.path, - @donedir + "/" + File.basename(bucket.path)) - end - FileUtils.rm bucket.path - end - end + if event == :terminate + bucket.flush + @buckets.delete(wfid) + end + end + # + # minimizing the sync block + + # TODO : spin that off this thread, to the + # flush thread... + # + if event == :terminate + if @application_context[:keep_journals] == true # - # Makes sure that all the buckets are persisted to disk + # 'move' journal to the done/ subdir of journal/ # - def flush_buckets + FileUtils.cp( + bucket.path, + @donedir + "/" + File.basename(bucket.path)) + end + FileUtils.rm bucket.path + end + end - count = 0 + # + # Makes sure that all the buckets are persisted to disk + # + def flush_buckets - synchronize do + count = 0 - @buckets.each do |k, v| - v.flush - count += 1 - end - @buckets.clear - end + synchronize do - linfo { "flush_buckets() flushed #{count} buckets" } \ - if count > 0 - end + @buckets.each do |k, v| + v.flush + count += 1 + end + @buckets.clear + end - def get_bucket (wfid) - @buckets[wfid] ||= Bucket.new(get_path(wfid)) - end + linfo { "flush_buckets() flushed #{count} buckets" } \ + if count > 0 + end - def serialize_event (event, *args) - args.insert(0, event) - args.insert(1, Time.now) - args.to_yaml - end + def get_bucket (wfid) + @buckets[wfid] ||= Bucket.new(get_path(wfid)) + end - def get_path (wfid) - @workdir + "/" + wfid.to_s + ".journal" - end + def serialize_event (event, *args) + args.insert(0, event) + args.insert(1, Time.now) + args.to_yaml + end - # - # for each process instance, there is one bucket holding the - # events waiting to get written in the journal - # - class Bucket + def get_path (wfid) + @workdir + "/" + wfid.to_s + ".journal" + end - attr_reader :path, :events + # + # for each process instance, there is one bucket holding the + # events waiting to get written in the journal + # + class Bucket - def initialize (path) - super() - @path = path - @events = [] - end + attr_reader :path, :events - def << (event) - @events << event - end + def initialize (path) + super() + @path = path + @events = [] + end - def size - @events.size - end - alias :length :size + def << (event) + @events << event + end - def flush - File.open(@path, "a+") do |f| - @events.each do |e| - f.puts e - end - end - @events.clear - end + def size + @events.size + end + alias :length :size + + def flush + File.open(@path, "a+") do |f| + @events.each do |e| + f.puts e end + end + @events.clear + end + end - end + end end