lib/openwfe/expool/journal.rb in openwferu-0.9.7 vs lib/openwfe/expool/journal.rb in openwferu-0.9.8
- old
+ new
@@ -57,11 +57,11 @@
#
class Journal < Service
include MonitorMixin, OwfeServiceLocator
include JournalReplay
- attr_reader :workdir
+ attr_reader :workdir, :donedir
FREQ = "1m"
#
# once per minute, makes sure the buckets are flushed
@@ -70,20 +70,22 @@
super
@buckets = {}
@workdir = OpenWFE::get_work_directory + "/journal"
- FileUtils.makedirs(@workdir) unless File.exist?(@workdir)
+ @donedir = @workdir + "/done"
+ FileUtils.makedirs(@donedir) unless File.exist?(@donedir)
+
get_expression_pool.add_observer(:all) do |event, *args|
#ldebug { ":#{event} for #{args[0].to_debug_s}" }
queue_event(event, *args)
end
- #@thread_id = get_scheduler.schedule_every(FREQ) do
- # flush_buckets()
- #end
+ @thread_id = get_scheduler.schedule_every(FREQ) do
+ flush_buckets()
+ end
end
#
# Will flush the journal of every open instance.
#
@@ -92,32 +94,68 @@
flush_buckets()
end
protected
+ #
+ # Queues the events before a flush.
+ #
+ # If the event is a :terminate, the individual bucket will get
+ # flushed.
+ #
def queue_event (event, *args)
+ synchronize do
- return if event == :stop
- return if event == :launch
+ return if event == :stop
+ return if event == :launch
+ return if event == :reschedule
- wfid = args[0].parent_wfid
- #puts "__#{wfid} : #{event}"
+ wfid = args[0].parent_wfid
+ #puts "__#{wfid} : #{event}"
- e = serialize_event(event, *args)
+ e = serialize_event(event, *args)
- bucket = get_bucket(wfid)
- bucket << e
+ bucket = get_bucket(wfid)
+ bucket << e
- if event == :terminate
- bucket.flush
- @buckets.delete(wfid)
+ if event == :terminate
+
+ bucket.flush
+ @buckets.delete(wfid)
+
+ if @application_context[:keep_journals] == true
+ #
+ # 'move' journal to the done/ subdir of journal/
+ #
+ FileUtils.cp(
+ bucket.get_path,
+ @donedir + "/" + bucket.get_file_name)
+ end
+
+ FileUtils.rm bucket.get_path
+ end
end
end
+ #
+ # Makes sure that all the buckets are persisted to disk
+ #
def flush_buckets
+
+ count = 0
+
synchronize do
+
+ @buckets.each do |k, v|
+ v.flush
+ count += 1
+ end
+ @buckets.clear
end
+
+ linfo { "flush_buckets() flushed #{count} buckets" } \
+ if count > 0
end
def get_bucket (wfid)
@buckets[wfid] ||= Bucket.new(@workdir, wfid)
end
@@ -131,11 +169,10 @@
#
# for each process instance, there is one bucket holding the
# events waiting to get written in the journal
#
class Bucket
- include MonitorMixin
attr_reader :wfid, :events
def initialize (workdir, wfid)
super()
@@ -143,31 +180,32 @@
@wfid = wfid
@events = []
end
def << (event)
- synchronize do
- @events << event
- end
+ @events << event
end
def size
@events.size
end
alias :length :size
def flush
- synchronize do
-
- fn = @workdir + "/" + @wfid.to_s + ".journal"
-
- File.open(fn, "w+") do |f|
- @events.each do |e|
- f.puts e
- end
+ File.open(get_path, "w+") do |f|
+ @events.each do |e|
+ f.puts e
end
- @events.clear
end
+ @events.clear
+ end
+
+ def get_path
+ @workdir + "/" + get_file_name
+ end
+
+ def get_file_name
+ @wfid.to_s + ".journal"
end
end
end