lib/ruote/dm/storage.rb in ruote-dm-2.1.11 vs lib/ruote/dm/storage.rb in ruote-dm-2.2.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -49,10 +49,14 @@ property :participant_name, String, :length => 512 def to_h Rufus::Json.decode(doc) end + + def <=>(other) + self.ide <=> other.ide + end end # # A datamapper-powered storage for ruote. # @@ -65,216 +69,219 @@ # #DataMapper.setup(:default, 'sqlite3:ruote_test.db') # DataMapper.setup(:default, 'postgres://localhost/ruote_test') # # engine = Ruote::Engine.new( # Ruote::Worker.new( - # Ruote::Dm::DmStorage.new(:default))) + # Ruote::Dm::Storage.new(:default))) # - class DmStorage + class Storage include Ruote::StorageBase attr_reader :repository - def initialize (repository=nil, options={}) + def initialize(repository=nil, options={}) @options = options @repository = repository put_configuration end - def put_msg (action, options) + def put_msg(action, options) # put_msg is a unique action, no need for all the complexity of put DataMapper.repository(@repository) do - doc = prepare_msg_doc(action, options) - - insert(doc, 1) + do_insert(prepare_msg_doc(action, options) , 1) end nil end - def put_schedule (flavour, owner_fei, s, msg) + def put_schedule(flavour, owner_fei, s, msg) # put_schedule is a unique action, no need for all the complexity of put doc = prepare_schedule_doc(flavour, owner_fei, s, msg) return nil unless doc DataMapper.repository(@repository) do - insert(doc, 1) + do_insert(doc, 1) doc['_id'] end end - def put (doc, opts={}) + def put(doc, opts={}) DataMapper.repository(@repository) do - current = do_get(doc['type'], doc['_id']) + if doc['_rev'] - rev = doc['_rev'].to_i + d = get(doc['type'], doc['_id']) - return true if current.nil? && rev > 0 - return current.to_h if current && rev != current.rev + return true unless d + return d if d['_rev'] != doc['_rev'] + # failures + end - nrev = rev + 1 + nrev = doc['_rev'].to_i + 1 begin - insert(doc, nrev) + do_insert(doc, nrev) - current.destroy! if current + rescue DataObjects::IntegrityError => ie - doc['_rev'] = nrev if opts[:update_rev] + return (get(doc['type'], doc['_id']) || true) + # failure + end - return nil + Document.all( + :typ => doc['type'], :ide => doc['_id'], :rev.lt => nrev + ).destroy - rescue DataObjects::IntegrityError => ie - #p :clash - end + doc['_rev'] = nrev if opts[:update_rev] - get(doc['type'], doc['_id']) + nil + # success end end - def get (type, key) + def get(type, key) DataMapper.repository(@repository) do + d = do_get(type, key) + d ? d.to_h : nil end end - def delete (doc) + def delete(doc) raise ArgumentError.new('no _rev for doc') unless doc['_rev'] - DataMapper.repository(@repository) do + count = DataMapper.repository(@repository).adapter.delete( + Document.all(:typ => doc['type'], :ide => doc['_id'])) - r = put(doc) + return (get(doc['type'], doc['_id']) || true) if count < 1 - #p [ 0, true, doc['_id'], Thread.current.object_id.to_s[-3..-1] ] if r - - return true unless r.nil? - - r = Document.all(:typ => doc['type'], :ide => doc['_id']).destroy! - - #p [ 1, r ? nil : true, doc['_id'], Thread.current.object_id.to_s[-3..-1] ] - - r ? nil : true - end + nil + # success end - def get_many (type, key=nil, opts={}) + def get_many(type, key=nil, opts={}) q = { :typ => type } if l = opts[:limit]; q[:limit] = l; end if s = opts[:skip]; q[:offset] = s; end keys = key ? Array(key) : nil q[:wfid] = keys if keys && keys.first.is_a?(String) + q[:order] = ( + opts[:descending] ? [ :ide.desc, :rev.desc ] : [ :ide.asc, :rev.asc ] + ) unless opts[:count] + DataMapper.repository(@repository) do - return Document.all(q).count if opts[:count] + return select_last_revs(Document.all(q)).size if opts[:count] docs = Document.all(q) - docs = docs.reverse if opts[:descending] + docs = select_last_revs(docs, opts[:descending]) docs = docs.collect { |d| d.to_h } keys && keys.first.is_a?(Regexp) ? docs.select { |doc| keys.find { |key| key.match(doc['_id']) } } : docs end end - def ids (type) + def ids(type) DataMapper.repository(@repository) do - Document.all(:typ => type).collect { |d| d.ide } + + Document.all(:typ => type).collect { |d| d.ide }.uniq.sort end end def purge! DataMapper.repository(@repository) do + Document.all.destroy! end end - def dump (type) + def dump(type) - s = "=== #{type} ===\n" - - get_many(type).inject(s) do |s1, h| - s1 << " #{Ruote::FlowExpressionId.to_storage_id(h['fei'])}" - s1 << " => #{h['original_tree'].first} #{h['_rev']}\n" - end + "=== #{type} ===\n" + + get_many(type).map { |h| " #{h['_id']} => #{h.inspect}" }.join("\n") end def shutdown #@dbs.values.each { |db| db.shutdown } end # Mainly used by ruote's test/unit/ut_17_storage.rb # - def add_type (type) + def add_type(type) # does nothing, types are differentiated by the 'typ' column end # Nukes a db type and reputs it (losing all the documents that were in it). # - def purge_type! (type) + def purge_type!(type) DataMapper.repository(@repository) do + Document.all(:typ => type).destroy! end end # A provision made for workitems, allow to query them directly by # participant name. # - def by_participant (type, participant_name, opts) + def by_participant(type, participant_name, opts) raise NotImplementedError if type != 'workitems' query = { :typ => type, :participant_name => participant_name }.merge(opts) - Document.all(query).collect { |d| d.to_h } + select_last_revs(Document.all(query)).collect { |d| d.to_h } end # Querying workitems by field (warning, goes deep into the JSON structure) # - def by_field (type, field, value=nil) + def by_field(type, field, value=nil) raise NotImplementedError if type != 'workitems' like = [ '%"', field, '":' ] like.push(Rufus::Json.encode(value)) if value like.push('%') - Document.all(:typ => type, :doc.like => like.join).collect { |d| d.to_h } + select_last_revs( + Document.all(:typ => type, :doc.like => like.join) + ).collect { |d| d.to_h } end - def query_workitems (criteria) + def query_workitems(criteria) cr = { :typ => 'workitems' } - return Document.all(cr).count if criteria['count'] + return select_last_revs(Document.all(cr)).size if criteria['count'] offset = criteria.delete('offset') limit = criteria.delete('limit') wfid = @@ -292,16 +299,18 @@ end cr[:conditions] = [ ([ 'doc LIKE ?' ] * likes.size).join(' AND '), *likes ] unless likes.empty? - Document.all(cr).collect { |d| Ruote::Workitem.new(d.to_h) } + select_last_revs( + Document.all(cr) + ).collect { |d| Ruote::Workitem.new(d.to_h) } end protected - def insert (doc, rev) + def do_insert(doc, rev) Document.new( :ide => doc['_id'], :rev => rev, :typ => doc['type'], @@ -311,16 +320,16 @@ :wfid => extract_wfid(doc), :participant_name => doc['participant_name'] ).save! end - def extract_wfid (doc) + def extract_wfid(doc) doc['wfid'] || (doc['fei'] ? doc['fei']['wfid'] : nil) end - def do_get (type, key) + def do_get(type, key) Document.first(:typ => type, :ide => key, :order => :rev.desc) end # Don't put configuration if it's already in @@ -332,9 +341,23 @@ return if get('configurations', 'engine') conf = { '_id' => 'engine', 'type' => 'configurations' }.merge(@options) put(conf) end + + def select_last_revs(docs, reverse=false) + + docs = docs.inject({}) { |h, doc| h[doc.ide] = doc; h }.values.sort + + reverse ? docs.reverse : docs + end + end + + # + # Ruote::Dm::Storage should be used, but until ruote-dm 2.1.12, it + # was Ruote::Dm::DmStorage. This class is here for 'backward compatibility'. + # + class DmStorage < Storage end end end