lib/ruote/storage/base.rb in ruote-2.2.0 vs lib/ruote/storage/base.rb in ruote-2.3.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -20,20 +20,25 @@ # THE SOFTWARE. # # Made in Japan. #++ +require 'ostruct' require 'ruote/util/time' module Ruote # # Base methods for storage implementations. # module StorageBase + #-- + # misc + #++ + def context @context ||= Ruote::Context.new(self) end @@ -48,51 +53,60 @@ def reserve(doc) delete(doc).nil? end + # A helper for the #worker method, it returns that dummy worker + # when there is no reference to the calling worker in the current + # thread's local variables. + # + DUMMY_WORKER = OpenStruct.new( + :name => 'worker', :identity => 'unknown', :state => 'running') + + # Warning, this is not equivalent to doing @context.worker, this method + # fetches the worker from the local thread variables. + # + def worker + + Thread.current['ruote_worker'] || DUMMY_WORKER + end + #-- # configurations #++ def get_configuration(key) get('configurations', key) end + def replace_engine_configuration(options) + + return if options.delete('preserve_configuration') + + conf = get('configurations', 'engine') + + doc = options.merge('type' => 'configurations', '_id' => 'engine') + doc['_rev'] = conf['_rev'] if conf + + put(doc) + end + #-- # messages #++ def put_msg(action, options) msg = prepare_msg_doc(action, options) put(msg) - - #put(msg, :update_rev => true) - #(@local_msgs ||= []) << Ruote.fulldup(msg) end - #def get_local_msgs - # p @local_msgs - # if @local_msgs - # r = @local_msgs - # @local_msgs = nil - # r - # else - # [] - # end - #end - def get_msgs - get_many( - 'msgs', nil, :limit => 300 - ).sort { |a, b| - a['put_at'] <=> b['put_at'] - } + get_many('msgs', nil, :limit => 300).sort_by { |d| d['put_at'] } end def empty?(type) (get_many(type, nil, :count => true) == 0) @@ -100,17 +114,31 @@ #-- # expressions #++ + # Given a wfid, returns all the expressions of that process instance. + # + def find_expressions(wfid) + + get_many('expressions', wfid) + end + + # For a given wfid, returns all the expressions (array of Hash instances) + # that have a nil 'parent_id'. + # + def find_root_expressions(wfid) + + find_expressions(wfid).select { |hexp| hexp['parent_id'].nil? } + end + + # For a given wfid, fetches all the root expressions, sort by expid and + # return the first. Hopefully it's the right root_expression. + # def find_root_expression(wfid) - get_many('expressions', wfid).sort_by { |fexp| - fexp['fei']['expid'] - }.select { |e| - e['parent_id'].nil? - }.first + find_root_expressions(wfid).sort_by { |hexp| hexp['fei']['expid'] }.first end # Given all the expressions stored here, returns a sorted list of unique # wfids (this is used in Engine#processes(opts). # @@ -248,10 +276,42 @@ %w[ msgs schedules errors expressions workitems ].each do |type| purge_type!(type) end end + # Removes a process by removing all its schedules, expressions, errors, + # workitems and trackers. + # + # Warning: will not trigger any cancel behaviours at all, just removes + # the process. + # + def remove_process(wfid) + + 2.times do + # two passes + + Thread.pass + + %w[ schedules expressions errors workitems ].each do |type| + get_many(type, wfid).each { |d| delete(d) } + end + + doc = get_trackers + + doc['trackers'].delete_if { |k, v| k.end_with?("!#{wfid}") } + + @context.storage.put(doc) + end + end + + def dump(type) + + require 'yaml' + + YAML.dump({ type => get_many(type) }) + end + protected # Used by put_msg # def prepare_msg_doc(action, options) @@ -301,10 +361,11 @@ 'type' => 'schedules', 'flavour' => flavour, 'original' => s, 'at' => Ruote.time_to_utc_s(at), 'owner' => owner_fei, + 'wfid' => owner_fei['wfid'], 'msg' => msg } end def get_engine_variables @@ -320,30 +381,38 @@ now = Ruote.time_to_utc_s(now) schedules.select { |sch| sch['at'] <= now } end - ## Returns true if the doc wfid is included in the wfids passed. - ## - #def wfid_match? (doc, wfids) - # wfids.find { |wfid| doc['_id'].index(wfid) } != nil - #end - # Used by #get_many. Returns true whenever one of the keys matches the # doc['_id']. Works with strings (_id ends with key) or regexes (_id matches # key). # - # It's a class method meant to be used by the various storage - # implementations. + def key_match?(type, keys, doc) + + _id = doc.is_a?(Hash) ? doc['_id'] : doc + + if keys.first.is_a?(String) && type == 'schedules' + keys.find { |key| _id.match(/#{key}-\d+$/) } + elsif keys.first.is_a?(String) + keys.find { |key| _id.end_with?(key) } + else # Regexp + keys.find { |key| _id.match(key) } + end + end + + # (Only used by ruote-couch 2.2.x) # + # TODO: remove me at some point + # def self.key_match?(keys, doc) _id = doc.is_a?(Hash) ? doc['_id'] : doc if keys.first.is_a?(String) - keys.find { |key| _id[-key.length..-1] == key } + keys.find { |key| _id.end_with?(key) } else # Regexp - keys.find { |key| key.match(_id) } + keys.find { |key| _id.match(key) } end end end end