lib/ruote/part/storage_participant.rb in ruote-2.2.0 vs lib/ruote/part/storage_participant.rb in ruote-2.3.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -53,12 +53,10 @@ class StorageParticipant include LocalParticipant include Enumerable - attr_accessor :context - def initialize(engine_or_options={}, options=nil) if engine_or_options.respond_to?(:context) @context = engine_or_options.context elsif engine_or_options.is_a?(Ruote::Context) @@ -74,11 +72,14 @@ # No need for a separate thread when delivering to this participant. # def do_not_thread; true; end - def consume(workitem) + # This is the method called by ruote when passing a workitem to + # this participant. + # + def on_workitem doc = workitem.to_h doc.merge!( 'type' => 'workitems', @@ -88,55 +89,90 @@ doc['store_name'] = @store_name if @store_name @context.storage.put(doc) end - alias :update :consume - # Removes the document/workitem from the storage + # Used by client code when "saving" a workitem (fields may have changed). + # Calling #update won't proceed the workitem. # - def cancel(fei, flavour) + # Returns nil in case of success, true if the workitem is already gone and + # the newer version of the workitem if the workitem changed in the mean + # time. + # + def update(workitem) + r = @context.storage.put(workitem.h) + + r.is_a?(Hash) ? Ruote::Workitem.new(r) : r + end + + # Removes the document/workitem from the storage. + # + # Warning: this method is called by the engine (worker), i.e. not by you. + # + def on_cancel + doc = fetch(fei) + return unless doc + r = @context.storage.delete(doc) - cancel(fei, flavour) if r != nil + on_cancel(fei, flavour) if r != nil end + # Given a fei (or its string version, a sid), returns the corresponding + # workitem (or nil). + # def [](fei) doc = fetch(fei) doc ? Ruote::Workitem.new(doc) : nil end - def fetch(fei) + alias by_fei [] - hfei = Ruote::FlowExpressionId.extract_h(fei) + # Removes the workitem from the storage and replies to the engine. + # + def proceed(workitem) - @context.storage.get('workitems', to_id(hfei)) + r = remove_workitem('proceed', workitem) + + return proceed(workitem) if r != nil + + workitem.h.delete('_rev') + + reply_to_engine(workitem) end - # Removes the workitem from the in-memory hash and replies to the engine. + # Removes the workitem and hands it back to the flow with an error to + # raise for the participant expression that emitted the workitem. # - # TODO : should it raise if the workitem can't be found ? - # TODO : should it accept just the fei ? - # - def reply(workitem) + def flunk(workitem, err_class_or_instance, *err_arguments) - # TODO: change method name (receiver mess cleanup) + r = remove_workitem('reject', workitem) - doc = fetch(Ruote::FlowExpressionId.extract_h(workitem)) + return flunk(workitem) if r != nil - r = @context.storage.delete(doc) + workitem.h.delete('_rev') - return reply(workitem) if r != nil + super(workitem, err_class_or_instance, *err_arguments) + end - workitem.h.delete('_rev') + # (soon to be removed) + # + def reply(workitem) - reply_to_engine(workitem) + puts '-' * 80 + puts '*** WARNING : please use the Ruote::StorageParticipant#proceed(wi)' + puts ' instead of #reply(wi) which is deprecated' + #caller.each { |l| puts l } + puts '-' * 80 + + proceed(workitem) end # Returns the count of workitems stored in this participant. # def size @@ -153,48 +189,45 @@ # Returns all the workitems stored in here. # def all(opts={}) - fetch_all(opts).map { |hwi| Ruote::Workitem.new(hwi) } + res = fetch_all(opts) + + res.is_a?(Array) ? res.map { |hwi| Ruote::Workitem.new(hwi) } : res end # A convenience method (especially when testing), returns the first # (only ?) workitem in the participant. # def first - hwi = fetch_all.first - - hwi ? Ruote::Workitem.new(hwi) : nil + wi(fetch_all({}).first) end # Return all workitems for the specified wfid # - def by_wfid(wfid) + def by_wfid(wfid, opts={}) - @context.storage.get_many('workitems', wfid).collect { |hwi| - Ruote::Workitem.new(hwi) - } + if @context.storage.respond_to?(:by_wfid) + return @context.storage.by_wfid('workitems', wfid, opts) + end + + wis(@context.storage.get_many('workitems', wfid, opts)) end # Returns all workitems for the specified participant name # def by_participant(participant_name, opts={}) - hwis = if @context.storage.respond_to?(:by_participant) + return @context.storage.by_participant( + 'workitems', participant_name, opts + ) if @context.storage.respond_to?(:by_participant) - @context.storage.by_participant('workitems', participant_name, opts) - - else - - fetch_all(opts).select { |wi| - wi['participant_name'] == participant_name - } + do_select(opts) do |hwi| + hwi['participant_name'] == participant_name end - - hwis.collect { |hwi| Ruote::Workitem.new(hwi) } end # field : returns all the workitems with the given field name present. # # field and value : returns all the workitems with the given field name @@ -202,25 +235,22 @@ # # Warning : only some storages are optimized for such queries (like # CouchStorage), the others will load all the workitems and then filter # them. # - def by_field(field, value=nil) + def by_field(field, value=nil, opts={}) - hwis = if @context.storage.respond_to?(:by_field) + (value, opts = nil, value) if value.is_a?(Hash) - @context.storage.by_field('workitems', field, value) - - else - - fetch_all.select { |hwi| - hwi['fields'].keys.include?(field) && - (value.nil? || hwi['fields'][field] == value) - } + if @context.storage.respond_to?(:by_field) + return @context.storage.by_field('workitems', field, value, opts) end - hwis.collect { |hwi| Ruote::Workitem.new(hwi) } + do_select(opts) do |hwi| + hwi['fields'].keys.include?(field) && + (value.nil? || hwi['fields'][field] == value) + end end # Queries the store participant for workitems. # # Some examples : @@ -240,11 +270,11 @@ # Note : the criteria is AND only, you'll have to do ORs (aggregation) # by yourself. # def query(criteria) - cr = criteria.inject({}) { |h, (k, v)| h[k.to_s] = v; h } + cr = Ruote.keys_to_s(criteria) if @context.storage.respond_to?(:query_workitems) return @context.storage.query_workitems(cr) end @@ -252,29 +282,33 @@ opts[:skip] = cr.delete('offset') || cr.delete('skip') opts[:limit] = cr.delete('limit') opts[:count] = cr.delete('count') wfid = cr.delete('wfid') + + count = opts[:count] + pname = cr.delete('participant_name') || cr.delete('participant') + opts.delete(:count) if pname hwis = wfid ? @context.storage.get_many('workitems', wfid, opts) : fetch_all(opts) - return hwis if opts[:count] + return hwis unless hwis.is_a?(Array) - hwis.select { |hwi| + hwis = hwis.select { |hwi| Ruote::StorageParticipant.matches?(hwi, pname, cr) - }.collect { |hwi| - Ruote::Workitem.new(hwi) } + + count ? hwis.size : wis(hwis) end # Cleans this participant out completely # def purge! - fetch_all.each { |hwi| @context.storage.delete(hwi) } + fetch_all({}).each { |hwi| @context.storage.delete(hwi) } end # Used by #query when filtering workitems. # def self.matches?(hwi, pname, criteria) @@ -293,35 +327,135 @@ # Mostly a test method. Returns a Hash were keys are participant names # and values are lists of workitems. # def per_participant - inject({}) { |h, wi| (h[wi.participant_name] ||= []) << wi; h } + each_with_object({}) { |wi, h| (h[wi.participant_name] ||= []) << wi } end # Mostly a test method. Returns a Hash were keys are participant names # and values are integers, the count of workitems for a given participant # name. # def per_participant_count - per_participant.inject({}) { |h, (k, v)| h[k] = v.size; h } + per_participant.remap { |(k, v), h| h[k] = v.size } end + # Claims a workitem. Returns the [updated] workitem if successful. + # + # Returns nil if the workitem is already reserved. + # + # Fails if the workitem can't be found, is gone, or got modified + # elsewhere. + # + # Here is a mini-diagram explaining the reserve/delegate/proceed flow: + # + # in delegate(nil) delegate(other) + # | +---------------+ +------+ + # v v | | v + # +-------+ reserve +----------+ proceed + # | ready | ---------> | reserved | ---------> out + # +-------+ +----------+ + # + def reserve(workitem_or_fei, owner) + + hwi = fetch(workitem_or_fei) + + fail ArgumentError.new("workitem not found") if hwi.nil? + + return nil if hwi['owner'] && hwi['owner'] != owner + + hwi['owner'] = owner + + r = @context.storage.put(hwi, :update_rev => true) + + fail ArgumentError.new("workitem is gone") if r == true + fail ArgumentError.new("workitem got modified meanwhile") if r != nil + + Workitem.new(hwi) + end + + # Delegates a currently owned workitem to a new owner. + # + # Fails if the workitem can't be found, belongs to noone, or if the + # workitem passed as argument is out of date (got modified in the mean + # time). + # + # It's OK to delegate to nil, thus freeing the workitem. + # + # See #reserve for an an explanation of the reserve/delegate/proceed flow. + # + def delegate(workitem, new_owner) + + hwi = fetch(workitem) + + fail ArgumentError.new( + "workitem not found" + ) if hwi == nil + + fail ArgumentError.new( + "cannot delegate, workitem doesn't belong to anyone" + ) if hwi['owner'] == nil + + fail ArgumentError.new( + "cannot delegate, " + + "workitem owned by '#{hwi['owner']}', not '#{workitem.owner}'" + ) if hwi['owner'] != workitem.owner + + hwi['owner'] = new_owner + + r = @context.storage.put(hwi, :update_rev => true) + + fail ArgumentError.new("workitem is gone") if r == true + fail ArgumentError.new("workitem got modified meanwhile") if r != nil + + Workitem.new(hwi) + end + protected + # Fetches a workitem in its raw form (Hash). + # + def fetch(workitem_or_fei) + + hfei = Ruote::FlowExpressionId.extract_h(workitem_or_fei) + + @context.storage.get('workitems', to_id(hfei)) + end + # Fetches all the workitems. If there is a @store_name, will only fetch # the workitems in that store. # - def fetch_all(opts={}) + def fetch_all(opts) @context.storage.get_many( 'workitems', @store_name ? /^wi!#{@store_name}::/ : nil, opts) end + # Given a few options and a block, returns all the workitems that match + # the block + # + def do_select(opts, &block) + + skip = opts[:offset] || opts[:skip] + limit = opts[:limit] + count = opts[:count] + + hwis = fetch_all({}) + hwis = hwis.select(&block) + + hwis = hwis[skip..-1] if skip + hwis = hwis[0, limit] if limit + + return hwis.size if count + + hwis.collect { |hwi| Ruote::Workitem.new(hwi) } + end + # Computes the id for the document representing the document in the storage. # def to_id(fei) a = [ Ruote.to_storage_id(fei) ] @@ -329,9 +463,43 @@ a.unshift(@store_name) if @store_name a.unshift('wi') a.join('!') + end + + def wi(hwi) + + hwi ? Ruote::Workitem.new(hwi) : nil + end + + def wis(workitems_or_count) + + workitems_or_count.is_a?(Array) ? + workitems_or_count.collect { |wi| Ruote::Workitem.new(wi) } : + workitems_or_count + end + + def remove_workitem(action, workitem) + + hwi = fetch(workitem) + + fail ArgumentError.new( + "cannot #{action}, workitem not found" + ) if hwi == nil + + fail ArgumentError.new( + "cannot #{action}, " + + "workitem is owned by '#{hwi['owner']}', not '#{workitem.owner}'" + ) if hwi['owner'] && hwi['owner'] != workitem.owner + + r = @context.storage.delete(hwi) + + fail ArgumentError.new( + "cannot #{action}, workitem is gone" + ) if r == true + + r end end end