lib/ruote/log/storage_history.rb in ruote-2.2.0 vs lib/ruote/log/storage_history.rb in ruote-2.3.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -39,29 +39,26 @@
#
# # ...
#
# process_history = engine.history.by_wfid(wfid0)
#
- # Note that, by default, the history is an in-memory history (and it is
- # useless when there are multiple workers).
#
+ # == final note
+ #
+ # By default, the history is an in-memory history (see Ruote::DefaultHistory)
+ # (and it is worthless when there are multiple workers).
+ #
class StorageHistory
DATE_REGEX = /!(\d{4}-\d{2}-\d{2})!/
def initialize(context, options={})
@context = context
@options = options
- if @context.worker
-
- # only care about logging if there is a worker present
-
- @context.storage.add_type('history')
- @context.worker.subscribe(:all, self)
- end
+ @context.storage.add_type('history')
end
# Returns all the wfids for which there are history items (msgs) stored.
#
def wfids
@@ -124,15 +121,17 @@
def clear!
@context.storage.purge_type!('history')
end
- # This is the method called by the workqueue. Incoming engine events
- # are 'processed' here.
+ # This method is called by the worker via the context. Successfully
+ # processed msgs are passed here.
#
- def notify(msg)
+ def on_msg(msg)
+ return unless accept?(msg)
+
msg = msg.dup
# a shallow copy is sufficient
si = if fei = msg['fei']
Ruote::FlowExpressionId.to_storage_id(fei)
@@ -148,9 +147,20 @@
msg['original_put_at'] = msg['put_at']
msg.delete('_rev')
@context.storage.put(msg)
+ end
+
+ protected
+
+ # This default implementation lets all the messages in.
+ #
+ # Feel free to override this method in a subclass.
+ #
+ def accept?(msg)
+
+ true
end
end
end