lib/openwfe/expool/journal.rb in ruote-0.9.18 vs lib/openwfe/expool/journal.rb in ruote-0.9.19
- old
+ new
@@ -1,34 +1,34 @@
#
#--
# Copyright (c) 2007-2008, John Mettraux, OpenWFE.org
# All rights reserved.
-#
-# Redistribution and use in source and binary forms, with or without
+#
+# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
-#
+#
# . Redistributions of source code must retain the above copyright notice, this
-# list of conditions and the following disclaimer.
-#
-# . Redistributions in binary form must reproduce the above copyright notice,
-# this list of conditions and the following disclaimer in the documentation
+# list of conditions and the following disclaimer.
+#
+# . Redistributions in binary form must reproduce the above copyright notice,
+# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution.
-#
+#
# . Neither the name of the "OpenWFE" nor the names of its contributors may be
# used to endorse or promote products derived from this software without
# specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
#++
#
#
@@ -47,178 +47,178 @@
require 'openwfe/storage/yamlcustom'
require 'openwfe/expool/journal_replay'
module OpenWFE
-
- #
- # Keeping a replayable track of the events in an OpenWFEru engine
- #
- class Journal < Service
- include MonitorMixin
- include OwfeServiceLocator
- include JournalReplay
- include FeiMixin
- attr_reader :workdir, :donedir
+ #
+ # Keeping a replayable track of the events in an OpenWFEru engine
+ #
+ class Journal < Service
+ include MonitorMixin
+ include OwfeServiceLocator
+ include JournalReplay
+ include FeiMixin
- FREQ = "1m"
- #
- # once per minute, makes sure the buckets are flushed
-
- def initialize (service_name, application_context)
+ attr_reader :workdir, :donedir
- super
+ FREQ = "1m"
+ #
+ # once per minute, makes sure the buckets are flushed
- @buckets = {}
+ def initialize (service_name, application_context)
- @workdir = get_work_directory + "/journal"
- @donedir = @workdir + "/done"
+ super
- FileUtils.makedirs(@donedir) unless File.exist?(@donedir)
+ @buckets = {}
- get_expression_pool.add_observer(:all) do |event, *args|
- #ldebug { ":#{event} for #{args[0].class.name}" }
- queue_event(event, *args)
- end
+ @workdir = get_work_directory + "/journal"
+ @donedir = @workdir + "/done"
- @thread_id = get_scheduler.schedule_every(FREQ) do
- flush_buckets()
- end
- end
+ FileUtils.makedirs(@donedir) unless File.exist?(@donedir)
- #
- # Will flush the journal of every open instance.
- #
- def stop
- get_scheduler.unschedule(@thread_id) if @thread_id
- flush_buckets()
- end
+ get_expression_pool.add_observer(:all) do |event, *args|
+ #ldebug { ":#{event} for #{args[0].class.name}" }
+ queue_event(event, *args)
+ end
- protected
+ @thread_id = get_scheduler.schedule_every(FREQ) do
+ flush_buckets()
+ end
+ end
- #
- # Queues the events before a flush.
- #
- # If the event is a :terminate, the individual bucket will get
- # flushed.
- #
- def queue_event (event, *args)
+ #
+ # Will flush the journal of every open instance.
+ #
+ def stop
+ get_scheduler.unschedule(@thread_id) if @thread_id
+ flush_buckets()
+ end
- #ldebug { "queue_event() :#{event}" }
+ protected
- return if event == :stop
- return if event == :launch
- return if event == :reschedule
+ #
+ # Queues the events before a flush.
+ #
+ # If the event is a :terminate, the individual bucket will get
+ # flushed.
+ #
+ def queue_event (event, *args)
- wfid = extract_fei(args[0]).parent_wfid
- #
- # maybe args[0] could be a FlowExpression instead
- # of a FlowExpressionId instance
- #puts "___#{event}__wfid : #{wfid}"
+ #ldebug { "queue_event() :#{event}" }
- e = serialize_event(event, *args)
+ return if event == :stop
+ return if event == :launch
+ return if event == :reschedule
- bucket = nil
+ wfid = extract_fei(args[0]).parent_wfid
+ #
+ # maybe args[0] could be a FlowExpression instead
+ # of a FlowExpressionId instance
+ #puts "___#{event}__wfid : #{wfid}"
- synchronize do
+ e = serialize_event(event, *args)
- bucket = get_bucket(wfid)
- bucket << e
+ bucket = nil
- #ldebug { "queue_event() bucket : #{bucket.object_id}" }
+ synchronize do
- if event == :terminate
+ bucket = get_bucket(wfid)
+ bucket << e
- bucket.flush
- @buckets.delete(wfid)
- end
- end
- #
- # minimizing the sync block
+ #ldebug { "queue_event() bucket : #{bucket.object_id}" }
- # TODO : spin that off this thread, to the
- # flush thread...
- #
- if event == :terminate
- if @application_context[:keep_journals] == true
- #
- # 'move' journal to the done/ subdir of journal/
- #
- FileUtils.cp(
- bucket.path,
- @donedir + "/" + File.basename(bucket.path))
- end
- FileUtils.rm bucket.path
- end
- end
+ if event == :terminate
+ bucket.flush
+ @buckets.delete(wfid)
+ end
+ end
+ #
+ # minimizing the sync block
+
+ # TODO : spin that off this thread, to the
+ # flush thread...
+ #
+ if event == :terminate
+ if @application_context[:keep_journals] == true
#
- # Makes sure that all the buckets are persisted to disk
+ # 'move' journal to the done/ subdir of journal/
#
- def flush_buckets
+ FileUtils.cp(
+ bucket.path,
+ @donedir + "/" + File.basename(bucket.path))
+ end
+ FileUtils.rm bucket.path
+ end
+ end
- count = 0
+ #
+ # Makes sure that all the buckets are persisted to disk
+ #
+ def flush_buckets
- synchronize do
+ count = 0
- @buckets.each do |k, v|
- v.flush
- count += 1
- end
- @buckets.clear
- end
+ synchronize do
- linfo { "flush_buckets() flushed #{count} buckets" } \
- if count > 0
- end
+ @buckets.each do |k, v|
+ v.flush
+ count += 1
+ end
+ @buckets.clear
+ end
- def get_bucket (wfid)
- @buckets[wfid] ||= Bucket.new(get_path(wfid))
- end
+ linfo { "flush_buckets() flushed #{count} buckets" } \
+ if count > 0
+ end
- def serialize_event (event, *args)
- args.insert(0, event)
- args.insert(1, Time.now)
- args.to_yaml
- end
+ def get_bucket (wfid)
+ @buckets[wfid] ||= Bucket.new(get_path(wfid))
+ end
- def get_path (wfid)
- @workdir + "/" + wfid.to_s + ".journal"
- end
+ def serialize_event (event, *args)
+ args.insert(0, event)
+ args.insert(1, Time.now)
+ args.to_yaml
+ end
- #
- # for each process instance, there is one bucket holding the
- # events waiting to get written in the journal
- #
- class Bucket
+ def get_path (wfid)
+ @workdir + "/" + wfid.to_s + ".journal"
+ end
- attr_reader :path, :events
+ #
+ # for each process instance, there is one bucket holding the
+ # events waiting to get written in the journal
+ #
+ class Bucket
- def initialize (path)
- super()
- @path = path
- @events = []
- end
+ attr_reader :path, :events
- def << (event)
- @events << event
- end
+ def initialize (path)
+ super()
+ @path = path
+ @events = []
+ end
- def size
- @events.size
- end
- alias :length :size
+ def << (event)
+ @events << event
+ end
- def flush
- File.open(@path, "a+") do |f|
- @events.each do |e|
- f.puts e
- end
- end
- @events.clear
- end
+ def size
+ @events.size
+ end
+ alias :length :size
+
+ def flush
+ File.open(@path, "a+") do |f|
+ @events.each do |e|
+ f.puts e
end
+ end
+ @events.clear
+ end
+ end
- end
+ end
end