lib/ruote/dm/storage.rb in ruote-dm-2.1.11 vs lib/ruote/dm/storage.rb in ruote-dm-2.2.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -49,10 +49,14 @@
property :participant_name, String, :length => 512
def to_h
Rufus::Json.decode(doc)
end
+
+ def <=>(other)
+ self.ide <=> other.ide
+ end
end
#
# A datamapper-powered storage for ruote.
#
@@ -65,216 +69,219 @@
# #DataMapper.setup(:default, 'sqlite3:ruote_test.db')
# DataMapper.setup(:default, 'postgres://localhost/ruote_test')
#
# engine = Ruote::Engine.new(
# Ruote::Worker.new(
- # Ruote::Dm::DmStorage.new(:default)))
+ # Ruote::Dm::Storage.new(:default)))
#
- class DmStorage
+ class Storage
include Ruote::StorageBase
attr_reader :repository
- def initialize (repository=nil, options={})
+ def initialize(repository=nil, options={})
@options = options
@repository = repository
put_configuration
end
- def put_msg (action, options)
+ def put_msg(action, options)
# put_msg is a unique action, no need for all the complexity of put
DataMapper.repository(@repository) do
- doc = prepare_msg_doc(action, options)
-
- insert(doc, 1)
+ do_insert(prepare_msg_doc(action, options) , 1)
end
nil
end
- def put_schedule (flavour, owner_fei, s, msg)
+ def put_schedule(flavour, owner_fei, s, msg)
# put_schedule is a unique action, no need for all the complexity of put
doc = prepare_schedule_doc(flavour, owner_fei, s, msg)
return nil unless doc
DataMapper.repository(@repository) do
- insert(doc, 1)
+ do_insert(doc, 1)
doc['_id']
end
end
- def put (doc, opts={})
+ def put(doc, opts={})
DataMapper.repository(@repository) do
- current = do_get(doc['type'], doc['_id'])
+ if doc['_rev']
- rev = doc['_rev'].to_i
+ d = get(doc['type'], doc['_id'])
- return true if current.nil? && rev > 0
- return current.to_h if current && rev != current.rev
+ return true unless d
+ return d if d['_rev'] != doc['_rev']
+ # failures
+ end
- nrev = rev + 1
+ nrev = doc['_rev'].to_i + 1
begin
- insert(doc, nrev)
+ do_insert(doc, nrev)
- current.destroy! if current
+ rescue DataObjects::IntegrityError => ie
- doc['_rev'] = nrev if opts[:update_rev]
+ return (get(doc['type'], doc['_id']) || true)
+ # failure
+ end
- return nil
+ Document.all(
+ :typ => doc['type'], :ide => doc['_id'], :rev.lt => nrev
+ ).destroy
- rescue DataObjects::IntegrityError => ie
- #p :clash
- end
+ doc['_rev'] = nrev if opts[:update_rev]
- get(doc['type'], doc['_id'])
+ nil
+ # success
end
end
- def get (type, key)
+ def get(type, key)
DataMapper.repository(@repository) do
+
d = do_get(type, key)
+
d ? d.to_h : nil
end
end
- def delete (doc)
+ def delete(doc)
raise ArgumentError.new('no _rev for doc') unless doc['_rev']
- DataMapper.repository(@repository) do
+ count = DataMapper.repository(@repository).adapter.delete(
+ Document.all(:typ => doc['type'], :ide => doc['_id']))
- r = put(doc)
+ return (get(doc['type'], doc['_id']) || true) if count < 1
- #p [ 0, true, doc['_id'], Thread.current.object_id.to_s[-3..-1] ] if r
-
- return true unless r.nil?
-
- r = Document.all(:typ => doc['type'], :ide => doc['_id']).destroy!
-
- #p [ 1, r ? nil : true, doc['_id'], Thread.current.object_id.to_s[-3..-1] ]
-
- r ? nil : true
- end
+ nil
+ # success
end
- def get_many (type, key=nil, opts={})
+ def get_many(type, key=nil, opts={})
q = { :typ => type }
if l = opts[:limit]; q[:limit] = l; end
if s = opts[:skip]; q[:offset] = s; end
keys = key ? Array(key) : nil
q[:wfid] = keys if keys && keys.first.is_a?(String)
+ q[:order] = (
+ opts[:descending] ? [ :ide.desc, :rev.desc ] : [ :ide.asc, :rev.asc ]
+ ) unless opts[:count]
+
DataMapper.repository(@repository) do
- return Document.all(q).count if opts[:count]
+ return select_last_revs(Document.all(q)).size if opts[:count]
docs = Document.all(q)
- docs = docs.reverse if opts[:descending]
+ docs = select_last_revs(docs, opts[:descending])
docs = docs.collect { |d| d.to_h }
keys && keys.first.is_a?(Regexp) ?
docs.select { |doc| keys.find { |key| key.match(doc['_id']) } } :
docs
end
end
- def ids (type)
+ def ids(type)
DataMapper.repository(@repository) do
- Document.all(:typ => type).collect { |d| d.ide }
+
+ Document.all(:typ => type).collect { |d| d.ide }.uniq.sort
end
end
def purge!
DataMapper.repository(@repository) do
+
Document.all.destroy!
end
end
- def dump (type)
+ def dump(type)
- s = "=== #{type} ===\n"
-
- get_many(type).inject(s) do |s1, h|
- s1 << " #{Ruote::FlowExpressionId.to_storage_id(h['fei'])}"
- s1 << " => #{h['original_tree'].first} #{h['_rev']}\n"
- end
+ "=== #{type} ===\n" +
+ get_many(type).map { |h| " #{h['_id']} => #{h.inspect}" }.join("\n")
end
def shutdown
#@dbs.values.each { |db| db.shutdown }
end
# Mainly used by ruote's test/unit/ut_17_storage.rb
#
- def add_type (type)
+ def add_type(type)
# does nothing, types are differentiated by the 'typ' column
end
# Nukes a db type and reputs it (losing all the documents that were in it).
#
- def purge_type! (type)
+ def purge_type!(type)
DataMapper.repository(@repository) do
+
Document.all(:typ => type).destroy!
end
end
# A provision made for workitems, allow to query them directly by
# participant name.
#
- def by_participant (type, participant_name, opts)
+ def by_participant(type, participant_name, opts)
raise NotImplementedError if type != 'workitems'
query = {
:typ => type, :participant_name => participant_name
}.merge(opts)
- Document.all(query).collect { |d| d.to_h }
+ select_last_revs(Document.all(query)).collect { |d| d.to_h }
end
# Querying workitems by field (warning, goes deep into the JSON structure)
#
- def by_field (type, field, value=nil)
+ def by_field(type, field, value=nil)
raise NotImplementedError if type != 'workitems'
like = [ '%"', field, '":' ]
like.push(Rufus::Json.encode(value)) if value
like.push('%')
- Document.all(:typ => type, :doc.like => like.join).collect { |d| d.to_h }
+ select_last_revs(
+ Document.all(:typ => type, :doc.like => like.join)
+ ).collect { |d| d.to_h }
end
- def query_workitems (criteria)
+ def query_workitems(criteria)
cr = { :typ => 'workitems' }
- return Document.all(cr).count if criteria['count']
+ return select_last_revs(Document.all(cr)).size if criteria['count']
offset = criteria.delete('offset')
limit = criteria.delete('limit')
wfid =
@@ -292,16 +299,18 @@
end
cr[:conditions] = [
([ 'doc LIKE ?' ] * likes.size).join(' AND '), *likes
] unless likes.empty?
- Document.all(cr).collect { |d| Ruote::Workitem.new(d.to_h) }
+ select_last_revs(
+ Document.all(cr)
+ ).collect { |d| Ruote::Workitem.new(d.to_h) }
end
protected
- def insert (doc, rev)
+ def do_insert(doc, rev)
Document.new(
:ide => doc['_id'],
:rev => rev,
:typ => doc['type'],
@@ -311,16 +320,16 @@
:wfid => extract_wfid(doc),
:participant_name => doc['participant_name']
).save!
end
- def extract_wfid (doc)
+ def extract_wfid(doc)
doc['wfid'] || (doc['fei'] ? doc['fei']['wfid'] : nil)
end
- def do_get (type, key)
+ def do_get(type, key)
Document.first(:typ => type, :ide => key, :order => :rev.desc)
end
# Don't put configuration if it's already in
@@ -332,9 +341,23 @@
return if get('configurations', 'engine')
conf = { '_id' => 'engine', 'type' => 'configurations' }.merge(@options)
put(conf)
end
+
+ def select_last_revs(docs, reverse=false)
+
+ docs = docs.inject({}) { |h, doc| h[doc.ide] = doc; h }.values.sort
+
+ reverse ? docs.reverse : docs
+ end
+ end
+
+ #
+ # Ruote::Dm::Storage should be used, but until ruote-dm 2.1.12, it
+ # was Ruote::Dm::DmStorage. This class is here for 'backward compatibility'.
+ #
+ class DmStorage < Storage
end
end
end