lib/openwfe/expool/journal.rb in openwferu-0.9.11 vs lib/openwfe/expool/journal.rb in openwferu-0.9.12
- old
+ new
@@ -103,46 +103,55 @@
#
# If the event is a :terminate, the individual bucket will get
# flushed.
#
def queue_event (event, *args)
- synchronize do
- #ldebug { "queue_event() :#{event}" }
+ #ldebug { "queue_event() :#{event}" }
- return if event == :stop
- return if event == :launch
- return if event == :reschedule
+ return if event == :stop
+ return if event == :launch
+ return if event == :reschedule
- wfid = extract_fei(args[0]).parent_wfid
- #
- # maybe args[0] could be a FlowExpression instead
- # of a FlowExpressionId instance
- #puts "___#{event}__wfid : #{wfid}"
+ wfid = extract_fei(args[0]).parent_wfid
+ #
+ # maybe args[0] could be a FlowExpression instead
+ # of a FlowExpressionId instance
+ #puts "___#{event}__wfid : #{wfid}"
- e = serialize_event(event, *args)
+ e = serialize_event(event, *args)
+ bucket = nil
+
+ synchronize do
+
bucket = get_bucket(wfid)
bucket << e
#ldebug { "queue_event() bucket : #{bucket.object_id}" }
if event == :terminate
bucket.flush
@buckets.delete(wfid)
+ end
+ end
+ #
+ # minimizing the sync block
- if @application_context[:keep_journals] == true
- #
- # 'move' journal to the done/ subdir of journal/
- #
- FileUtils.cp(
- bucket.path,
- @donedir + "/" + File.basename(bucket.path))
- end
-
- FileUtils.rm bucket.path
+ # TODO : spin that off this thread, to the
+ # flush thread...
+ #
+ if event == :terminate
+ if @application_context[:keep_journals] == true
+ #
+ # 'move' journal to the done/ subdir of journal/
+ #
+ FileUtils.cp(
+ bucket.path,
+ @donedir + "/" + File.basename(bucket.path))
end
+ FileUtils.rm bucket.path
end
end
#
# Makes sure that all the buckets are persisted to disk