lib/ruote/part/storage_participant.rb in ruote-2.1.4 vs lib/ruote/part/storage_participant.rb in ruote-2.1.5
- old
+ new
@@ -25,19 +25,33 @@
require 'ruote/part/local_participant'
module Ruote
+ #
+ # A participant that stores the workitem in the same storage used by the
+ # engine and the worker(s).
+ #
+ # Does not thread by default.
+ #
class StorageParticipant
include LocalParticipant
include Enumerable
attr_accessor :context
- def initialize (options={})
+ def initialize (engine_or_options={}, options=nil)
+ if engine_or_options.respond_to?(:context)
+ @context = engine_or_options.context
+ else
+ options = engine_or_options
+ end
+
+ options ||= {}
+
@store_name = options['store_name']
end
# No need for a separate thread when delivering to this participant.
#
@@ -45,10 +59,12 @@
def consume (workitem)
doc = workitem.to_h
+ doc.delete('_rev')
+
doc.merge!(
'type' => 'workitems',
'_id' => to_id(doc['fei']),
'participant_name' => doc['participant_name'],
'wfid' => doc['fei']['wfid'])
@@ -57,30 +73,32 @@
@context.storage.put(doc)
end
alias :update :consume
- # Makes sure to remove the workitem from the in-memory hash.
+ # Removes the document/workitem from the storage
#
def cancel (fei, flavour)
- doc = fetch(fei)
+ doc = fetch(fei.to_h)
- r = @storage.delete(doc)
+ r = @context.storage.delete(doc)
cancel(fei, flavour) if r != nil
end
def [] (fei)
doc = fetch(fei)
- doc ? Ruote::WorkItem.new(doc) : nil
+ doc ? Ruote::Workitem.new(doc) : nil
end
def fetch (fei)
+ fei = fei.to_h if fei.respond_to?(:to_h)
+
@context.storage.get('workitems', to_id(fei))
end
# Removes the workitem from the in-memory hash and replies to the engine.
#
@@ -131,39 +149,77 @@
def by_wfid( wfid )
@context.storage.get_many('workitems', /!#{wfid}$/).map { |hwi| Ruote::Workitem.new(hwi) }
end
- # Return all workitems for the specified participant
+ # Returns all workitems for the specified participant name
#
- def by_participant( part )
+ def by_participant (participant_name)
- all.select { |wi| wi.participant_name == part }
+
+ hwis = if @context.storage.respond_to?(:by_participant)
+
+ @context.storage.by_participant('workitems', participant_name)
+
+ else
+
+ fetch_all.select { |wi| wi['participant_name'] == participant_name }
+ end
+
+ hwis.collect { |hwi| Ruote::Workitem.new(hwi) }
end
- # Clean this participant out completely
+ # field : returns all the workitems with the given field name present.
#
+ # field and value : returns all the workitems with the given field name
+ # and the given value for that field.
+ #
+ # Warning : only some storages are optimized for such queries (like
+ # CouchStorage), the others will load all the workitems and then filter
+ # them.
+ #
+ def by_field (field, value=nil)
+
+ hwis = if @context.storage.respond_to?(:by_field)
+
+ @context.storage.by_field('workitems', field, value)
+
+ else
+
+ fetch_all.select { |hwi|
+ hwi['fields'].keys.include?(field) &&
+ (value.nil? || hwi['fields'][field] == value)
+ }
+ end
+
+ hwis.collect { |hwi| Ruote::Workitem.new(hwi) }
+ end
+
+ # Cleans this participant out completely
+ #
def purge!
fetch_all.each { |hwi| @context.storage.delete( hwi ) }
end
protected
def fetch_all
- key = @store_name ? /^wi\_#{@store_name}::/ : nil
+ key = @store_name ? /^wi!#{@store_name}::/ : nil
@context.storage.get_many('workitems', key)
end
def to_id (fei)
- sid = Ruote.to_storage_id(fei)
+ a = [ Ruote.to_storage_id(fei) ]
- sid = @store_name ? "#{store_name}::#{sid}" : sid
+ a.unshift(@store_name) if @store_name
- "wi_#{sid}"
+ a.unshift('wi')
+
+ a.join('!')
end
end
end