lib/openwfe/expool/journal.rb in openwferu-0.9.11 vs lib/openwfe/expool/journal.rb in openwferu-0.9.12

- old
+ new

@@ -103,46 +103,55 @@ # # If the event is a :terminate, the individual bucket will get # flushed. # def queue_event (event, *args) - synchronize do - #ldebug { "queue_event() :#{event}" } + #ldebug { "queue_event() :#{event}" } - return if event == :stop - return if event == :launch - return if event == :reschedule + return if event == :stop + return if event == :launch + return if event == :reschedule - wfid = extract_fei(args[0]).parent_wfid - # - # maybe args[0] could be a FlowExpression instead - # of a FlowExpressionId instance - #puts "___#{event}__wfid : #{wfid}" + wfid = extract_fei(args[0]).parent_wfid + # + # maybe args[0] could be a FlowExpression instead + # of a FlowExpressionId instance + #puts "___#{event}__wfid : #{wfid}" - e = serialize_event(event, *args) + e = serialize_event(event, *args) + bucket = nil + + synchronize do + bucket = get_bucket(wfid) bucket << e #ldebug { "queue_event() bucket : #{bucket.object_id}" } if event == :terminate bucket.flush @buckets.delete(wfid) + end + end + # + # minimizing the sync block - if @application_context[:keep_journals] == true - # - # 'move' journal to the done/ subdir of journal/ - # - FileUtils.cp( - bucket.path, - @donedir + "/" + File.basename(bucket.path)) - end - - FileUtils.rm bucket.path + # TODO : spin that off this thread, to the + # flush thread... + # + if event == :terminate + if @application_context[:keep_journals] == true + # + # 'move' journal to the done/ subdir of journal/ + # + FileUtils.cp( + bucket.path, + @donedir + "/" + File.basename(bucket.path)) end + FileUtils.rm bucket.path end end # # Makes sure that all the buckets are persisted to disk