lib/ruote/part/storage_participant.rb in ruote-2.1.11 vs lib/ruote/part/storage_participant.rb in ruote-2.2.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -55,30 +55,30 @@
include LocalParticipant
include Enumerable
attr_accessor :context
- def initialize (engine_or_options={}, options=nil)
+ def initialize(engine_or_options={}, options=nil)
if engine_or_options.respond_to?(:context)
@context = engine_or_options.context
elsif engine_or_options.is_a?(Ruote::Context)
@context = engine_or_options
else
- options = engine_or_options
+ @options = engine_or_options
end
- options ||= {}
+ @options ||= {}
- @store_name = options['store_name']
+ @store_name = @options['store_name']
end
# No need for a separate thread when delivering to this participant.
#
def do_not_thread; true; end
- def consume (workitem)
+ def consume(workitem)
doc = workitem.to_h
doc.merge!(
'type' => 'workitems',
@@ -92,36 +92,39 @@
end
alias :update :consume
# Removes the document/workitem from the storage
#
- def cancel (fei, flavour)
+ def cancel(fei, flavour)
doc = fetch(fei)
r = @context.storage.delete(doc)
cancel(fei, flavour) if r != nil
end
- def [] (fei)
+ def [](fei)
doc = fetch(fei)
doc ? Ruote::Workitem.new(doc) : nil
end
- def fetch (fei)
+ def fetch(fei)
hfei = Ruote::FlowExpressionId.extract_h(fei)
@context.storage.get('workitems', to_id(hfei))
end
# Removes the workitem from the in-memory hash and replies to the engine.
#
- def reply (workitem)
+ # TODO : should it raise if the workitem can't be found ?
+ # TODO : should it accept just the fei ?
+ #
+ def reply(workitem)
# TODO: change method name (receiver mess cleanup)
doc = fetch(Ruote::FlowExpressionId.extract_h(workitem))
@@ -141,18 +144,18 @@
fetch_all(:count => true)
end
# Iterates over the workitems stored in here.
#
- def each (&block)
+ def each(&block)
all.each { |wi| block.call(wi) }
end
# Returns all the workitems stored in here.
#
- def all (opts={})
+ def all(opts={})
fetch_all(opts).map { |hwi| Ruote::Workitem.new(hwi) }
end
# A convenience method (especially when testing), returns the first
@@ -165,20 +168,20 @@
hwi ? Ruote::Workitem.new(hwi) : nil
end
# Return all workitems for the specified wfid
#
- def by_wfid (wfid)
+ def by_wfid(wfid)
@context.storage.get_many('workitems', wfid).collect { |hwi|
Ruote::Workitem.new(hwi)
}
end
# Returns all workitems for the specified participant name
#
- def by_participant (participant_name, opts={})
+ def by_participant(participant_name, opts={})
hwis = if @context.storage.respond_to?(:by_participant)
@context.storage.by_participant('workitems', participant_name, opts)
@@ -199,11 +202,11 @@
#
# Warning : only some storages are optimized for such queries (like
# CouchStorage), the others will load all the workitems and then filter
# them.
#
- def by_field (field, value=nil)
+ def by_field(field, value=nil)
hwis = if @context.storage.respond_to?(:by_field)
@context.storage.by_field('workitems', field, value)
@@ -235,11 +238,11 @@
# for pagination. 'skip' can be used instead of 'offset'.
#
# Note : the criteria is AND only, you'll have to do ORs (aggregation)
# by yourself.
#
- def query (criteria)
+ def query(criteria)
cr = criteria.inject({}) { |h, (k, v)| h[k.to_s] = v; h }
if @context.storage.respond_to?(:query_workitems)
return @context.storage.query_workitems(cr)
@@ -267,16 +270,16 @@
# Cleans this participant out completely
#
def purge!
- fetch_all.each { |hwi| @context.storage.delete( hwi ) }
+ fetch_all.each { |hwi| @context.storage.delete(hwi) }
end
# Used by #query when filtering workitems.
#
- def self.matches? (hwi, pname, criteria)
+ def self.matches?(hwi, pname, criteria)
return false if pname && hwi['participant_name'] != pname
fields = hwi['fields']
@@ -285,25 +288,42 @@
end
true
end
+ # Mostly a test method. Returns a Hash were keys are participant names
+ # and values are lists of workitems.
+ #
+ def per_participant
+
+ inject({}) { |h, wi| (h[wi.participant_name] ||= []) << wi; h }
+ end
+
+ # Mostly a test method. Returns a Hash were keys are participant names
+ # and values are integers, the count of workitems for a given participant
+ # name.
+ #
+ def per_participant_count
+
+ per_participant.inject({}) { |h, (k, v)| h[k] = v.size; h }
+ end
+
protected
# Fetches all the workitems. If there is a @store_name, will only fetch
# the workitems in that store.
#
- def fetch_all (opts={})
+ def fetch_all(opts={})
@context.storage.get_many(
'workitems',
@store_name ? /^wi!#{@store_name}::/ : nil,
opts)
end
# Computes the id for the document representing the document in the storage.
#
- def to_id (fei)
+ def to_id(fei)
a = [ Ruote.to_storage_id(fei) ]
a.unshift(@store_name) if @store_name