lib/ruote/storage/base.rb in ruote-2.2.0 vs lib/ruote/storage/base.rb in ruote-2.3.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -20,20 +20,25 @@
# THE SOFTWARE.
#
# Made in Japan.
#++
+require 'ostruct'
require 'ruote/util/time'
module Ruote
#
# Base methods for storage implementations.
#
module StorageBase
+ #--
+ # misc
+ #++
+
def context
@context ||= Ruote::Context.new(self)
end
@@ -48,51 +53,60 @@
def reserve(doc)
delete(doc).nil?
end
+ # A helper for the #worker method, it returns that dummy worker
+ # when there is no reference to the calling worker in the current
+ # thread's local variables.
+ #
+ DUMMY_WORKER = OpenStruct.new(
+ :name => 'worker', :identity => 'unknown', :state => 'running')
+
+ # Warning, this is not equivalent to doing @context.worker, this method
+ # fetches the worker from the local thread variables.
+ #
+ def worker
+
+ Thread.current['ruote_worker'] || DUMMY_WORKER
+ end
+
#--
# configurations
#++
def get_configuration(key)
get('configurations', key)
end
+ def replace_engine_configuration(options)
+
+ return if options.delete('preserve_configuration')
+
+ conf = get('configurations', 'engine')
+
+ doc = options.merge('type' => 'configurations', '_id' => 'engine')
+ doc['_rev'] = conf['_rev'] if conf
+
+ put(doc)
+ end
+
#--
# messages
#++
def put_msg(action, options)
msg = prepare_msg_doc(action, options)
put(msg)
-
- #put(msg, :update_rev => true)
- #(@local_msgs ||= []) << Ruote.fulldup(msg)
end
- #def get_local_msgs
- # p @local_msgs
- # if @local_msgs
- # r = @local_msgs
- # @local_msgs = nil
- # r
- # else
- # []
- # end
- #end
-
def get_msgs
- get_many(
- 'msgs', nil, :limit => 300
- ).sort { |a, b|
- a['put_at'] <=> b['put_at']
- }
+ get_many('msgs', nil, :limit => 300).sort_by { |d| d['put_at'] }
end
def empty?(type)
(get_many(type, nil, :count => true) == 0)
@@ -100,17 +114,31 @@
#--
# expressions
#++
+ # Given a wfid, returns all the expressions of that process instance.
+ #
+ def find_expressions(wfid)
+
+ get_many('expressions', wfid)
+ end
+
+ # For a given wfid, returns all the expressions (array of Hash instances)
+ # that have a nil 'parent_id'.
+ #
+ def find_root_expressions(wfid)
+
+ find_expressions(wfid).select { |hexp| hexp['parent_id'].nil? }
+ end
+
+ # For a given wfid, fetches all the root expressions, sort by expid and
+ # return the first. Hopefully it's the right root_expression.
+ #
def find_root_expression(wfid)
- get_many('expressions', wfid).sort_by { |fexp|
- fexp['fei']['expid']
- }.select { |e|
- e['parent_id'].nil?
- }.first
+ find_root_expressions(wfid).sort_by { |hexp| hexp['fei']['expid'] }.first
end
# Given all the expressions stored here, returns a sorted list of unique
# wfids (this is used in Engine#processes(opts).
#
@@ -248,10 +276,42 @@
%w[ msgs schedules errors expressions workitems ].each do |type|
purge_type!(type)
end
end
+ # Removes a process by removing all its schedules, expressions, errors,
+ # workitems and trackers.
+ #
+ # Warning: will not trigger any cancel behaviours at all, just removes
+ # the process.
+ #
+ def remove_process(wfid)
+
+ 2.times do
+ # two passes
+
+ Thread.pass
+
+ %w[ schedules expressions errors workitems ].each do |type|
+ get_many(type, wfid).each { |d| delete(d) }
+ end
+
+ doc = get_trackers
+
+ doc['trackers'].delete_if { |k, v| k.end_with?("!#{wfid}") }
+
+ @context.storage.put(doc)
+ end
+ end
+
+ def dump(type)
+
+ require 'yaml'
+
+ YAML.dump({ type => get_many(type) })
+ end
+
protected
# Used by put_msg
#
def prepare_msg_doc(action, options)
@@ -301,10 +361,11 @@
'type' => 'schedules',
'flavour' => flavour,
'original' => s,
'at' => Ruote.time_to_utc_s(at),
'owner' => owner_fei,
+ 'wfid' => owner_fei['wfid'],
'msg' => msg
}
end
def get_engine_variables
@@ -320,30 +381,38 @@
now = Ruote.time_to_utc_s(now)
schedules.select { |sch| sch['at'] <= now }
end
- ## Returns true if the doc wfid is included in the wfids passed.
- ##
- #def wfid_match? (doc, wfids)
- # wfids.find { |wfid| doc['_id'].index(wfid) } != nil
- #end
-
# Used by #get_many. Returns true whenever one of the keys matches the
# doc['_id']. Works with strings (_id ends with key) or regexes (_id matches
# key).
#
- # It's a class method meant to be used by the various storage
- # implementations.
+ def key_match?(type, keys, doc)
+
+ _id = doc.is_a?(Hash) ? doc['_id'] : doc
+
+ if keys.first.is_a?(String) && type == 'schedules'
+ keys.find { |key| _id.match(/#{key}-\d+$/) }
+ elsif keys.first.is_a?(String)
+ keys.find { |key| _id.end_with?(key) }
+ else # Regexp
+ keys.find { |key| _id.match(key) }
+ end
+ end
+
+ # (Only used by ruote-couch 2.2.x)
#
+ # TODO: remove me at some point
+ #
def self.key_match?(keys, doc)
_id = doc.is_a?(Hash) ? doc['_id'] : doc
if keys.first.is_a?(String)
- keys.find { |key| _id[-key.length..-1] == key }
+ keys.find { |key| _id.end_with?(key) }
else # Regexp
- keys.find { |key| key.match(_id) }
+ keys.find { |key| _id.match(key) }
end
end
end
end