lib/openwfe/expool/journal.rb in openwferu-0.9.7 vs lib/openwfe/expool/journal.rb in openwferu-0.9.8

- old
+ new

@@ -57,11 +57,11 @@ # class Journal < Service include MonitorMixin, OwfeServiceLocator include JournalReplay - attr_reader :workdir + attr_reader :workdir, :donedir FREQ = "1m" # # once per minute, makes sure the buckets are flushed @@ -70,20 +70,22 @@ super @buckets = {} @workdir = OpenWFE::get_work_directory + "/journal" - FileUtils.makedirs(@workdir) unless File.exist?(@workdir) + @donedir = @workdir + "/done" + FileUtils.makedirs(@donedir) unless File.exist?(@donedir) + get_expression_pool.add_observer(:all) do |event, *args| #ldebug { ":#{event} for #{args[0].to_debug_s}" } queue_event(event, *args) end - #@thread_id = get_scheduler.schedule_every(FREQ) do - # flush_buckets() - #end + @thread_id = get_scheduler.schedule_every(FREQ) do + flush_buckets() + end end # # Will flush the journal of every open instance. # @@ -92,32 +94,68 @@ flush_buckets() end protected + # + # Queues the events before a flush. + # + # If the event is a :terminate, the individual bucket will get + # flushed. + # def queue_event (event, *args) + synchronize do - return if event == :stop - return if event == :launch + return if event == :stop + return if event == :launch + return if event == :reschedule - wfid = args[0].parent_wfid - #puts "__#{wfid} : #{event}" + wfid = args[0].parent_wfid + #puts "__#{wfid} : #{event}" - e = serialize_event(event, *args) + e = serialize_event(event, *args) - bucket = get_bucket(wfid) - bucket << e + bucket = get_bucket(wfid) + bucket << e - if event == :terminate - bucket.flush - @buckets.delete(wfid) + if event == :terminate + + bucket.flush + @buckets.delete(wfid) + + if @application_context[:keep_journals] == true + # + # 'move' journal to the done/ subdir of journal/ + # + FileUtils.cp( + bucket.get_path, + @donedir + "/" + bucket.get_file_name) + end + + FileUtils.rm bucket.get_path + end end end + # + # Makes sure that all the buckets are persisted to disk + # def flush_buckets + + count = 0 + synchronize do + + @buckets.each do |k, v| + v.flush + count += 1 + end + @buckets.clear end + + linfo { "flush_buckets() flushed #{count} buckets" } \ + if count > 0 end def get_bucket (wfid) @buckets[wfid] ||= Bucket.new(@workdir, wfid) end @@ -131,11 +169,10 @@ # # for each process instance, there is one bucket holding the # events waiting to get written in the journal # class Bucket - include MonitorMixin attr_reader :wfid, :events def initialize (workdir, wfid) super() @@ -143,31 +180,32 @@ @wfid = wfid @events = [] end def << (event) - synchronize do - @events << event - end + @events << event end def size @events.size end alias :length :size def flush - synchronize do - - fn = @workdir + "/" + @wfid.to_s + ".journal" - - File.open(fn, "w+") do |f| - @events.each do |e| - f.puts e - end + File.open(get_path, "w+") do |f| + @events.each do |e| + f.puts e end - @events.clear end + @events.clear + end + + def get_path + @workdir + "/" + get_file_name + end + + def get_file_name + @wfid.to_s + ".journal" end end end