lib/ruote/dm/storage.rb in ruote-dm-2.1.9 vs lib/ruote/dm/storage.rb in ruote-dm-2.1.10
- old
+ new
@@ -28,24 +28,48 @@
module Ruote
module Dm
+ #
+ # All the ruote data is stored in a single ruote_dm_document table.
+ #
+ # The doc/data itself is stored in the 'doc' column, as JSON.
+ #
+ # Yajl-ruby is recommended for fast {de|en}coding with JSON.
+ #
class Document
include DataMapper::Resource
- property :ide, String, :key => true, :length => 2048, :required => true
+ property :ide, String, :key => true, :length => 255, :required => true
property :rev, Integer, :key => true, :required => true
property :typ, String, :key => true, :required => true
property :doc, Text, :length => 2**32 - 1, :required => true, :lazy => false
property :participant_name, String, :length => 512
+
+ def to_h
+ Rufus::Json.decode(doc)
+ end
end
#
# A datamapper-powered storage for ruote.
#
+ # require 'rubygems'
+ # require 'json' # gem install json
+ # require 'ruote'
+ # require 'ruote-dm' # gem install ruote-dm
+ #
+ # #DataMapper.setup(:default, 'sqlite3::memory:')
+ # #DataMapper.setup(:default, 'sqlite3:ruote_test.db')
+ # DataMapper.setup(:default, 'postgres://localhost/ruote_test')
+ #
+ # engine = Ruote::Engine.new(
+ # Ruote::Worker.new(
+ # Ruote::Dm::DmStorage.new(:default)))
+ #
class DmStorage
include Ruote::StorageBase
attr_reader :repository
@@ -56,69 +80,96 @@
@repository = repository
put_configuration
end
+ def put_msg (action, options)
+
+ # put_msg is a unique action, no need for all the complexity of put
+
+ DataMapper.repository(@repository) do
+
+ doc = prepare_msg_doc(action, options)
+
+ insert(doc, 1)
+ end
+
+ nil
+ end
+
+ def put_schedule (flavour, owner_fei, s, msg)
+
+ # put_schedule is a unique action, no need for all the complexity of put
+
+ doc = prepare_schedule_doc(flavour, owner_fei, s, msg)
+
+ return nil unless doc
+
+ DataMapper.repository(@repository) do
+
+ insert(doc, 1)
+
+ doc['_id']
+ end
+ end
+
def put (doc, opts={})
DataMapper.repository(@repository) do
- d = Document.first(:ide => doc['_id'], :typ => doc['type'])
+ current = do_get(doc['type'], doc['_id'])
- return Rufus::Json.decode(d.doc) if d && d.rev != doc['_rev']
+ rev = doc['_rev'].to_i
- if doc['_rev'].nil?
+ return true if current.nil? && rev > 0
+ return current.to_h if current && rev != current.rev
- d = Document.new(
- :ide => doc['_id'],
- :rev => 0,
- :typ => doc['type'],
- :doc => Rufus::Json.encode(doc.merge(
- '_rev' => 0, 'put_at' => Ruote.now_to_utc_s)),
- :participant_name => doc['participant_name']
- ).save
+ nrev = rev + 1
- doc['_rev'] = 0 if opts[:update_rev]
+ begin
- else
+ insert(doc, nrev)
- return true unless d
+ current.destroy! if current
- d.rev = d.rev + 1
- d.doc = Rufus::Json.encode(doc.merge(
- '_rev' => d.rev, 'put_at' => Ruote.now_to_utc_s))
- d.save
+ doc['_rev'] = nrev if opts[:update_rev]
- doc['_rev'] = d.rev if opts[:update_rev]
+ return nil
+
+ rescue DataObjects::IntegrityError => ie
+ #p :clash
end
- nil
+ get(doc['type'], doc['_id'])
end
end
def get (type, key)
DataMapper.repository(@repository) do
- d = Document.first(:typ => type, :ide => key)
- d ? Rufus::Json.decode(d.doc) : nil
+ d = do_get(type, key)
+ d ? d.to_h : nil
end
end
def delete (doc)
raise ArgumentError.new('no _rev for doc') unless doc['_rev']
DataMapper.repository(@repository) do
- d = Document.first(
- :typ => doc['type'], :ide => doc['_id'], :rev => doc['_rev'])
+ r = put(doc)
- return true unless d
+ #p [ 0, true, doc['_id'], Thread.current.object_id.to_s[-3..-1] ] if r
- d.destroy!
+ return true unless r.nil?
- nil
+ r = Document.all(:typ => doc['type'], :ide => doc['_id']).destroy!
+
+ #p [ 1, r ? nil : true, doc['_id'], Thread.current.object_id.to_s[-3..-1] ]
+
+ r ? nil : true
end
end
def get_many (type, key=nil, opts={})
@@ -137,11 +188,11 @@
"%#{key.source}%"
end
end
DataMapper.repository(@repository) do
- Document.all(q).collect { |d| Rufus::Json.decode(d.doc) }
+ Document.all(q).collect { |d| d.to_h }
end
end
def ids (type)
@@ -155,14 +206,20 @@
DataMapper.repository(@repository) do
Document.all.destroy!
end
end
- #def dump (type)
- # @dbs[type].dump
- #end
+ def dump (type)
+ s = "=== #{type} ===\n"
+
+ get_many(type).inject(s) do |s1, h|
+ s1 << " #{Ruote::FlowExpressionId.to_storage_id(h['fei'])}"
+ s1 << " => #{h['original_tree'].first} #{h['_rev']}\n"
+ end
+ end
+
def shutdown
#@dbs.values.each { |db| db.shutdown }
end
@@ -190,11 +247,11 @@
raise NotImplementedError if type != 'workitems'
Document.all(
:typ => type, :participant_name => participant_name
).collect { |d|
- Rufus::Json.decode(d.doc)
+ d.to_h
}
end
# Querying workitems by field (warning, goes deep into the JSON structure)
#
@@ -204,13 +261,11 @@
like = [ '%"', field, '":' ]
like.push(Rufus::Json.encode(value)) if value
like.push('%')
- Document.all(:typ => type, :doc.like => like.join).collect { |d|
- Rufus::Json.decode(d.doc)
- }
+ Document.all(:typ => type, :doc.like => like.join).collect { |d| d.to_h }
end
def query_workitems (criteria)
offset = criteria.delete('offset')
@@ -233,15 +288,31 @@
end
cr[:conditions] = [
([ 'doc LIKE ?' ] * likes.size).join(' AND '), *likes
] unless likes.empty?
- Document.all(cr).collect { |d|
- Ruote::Workitem.new(Rufus::Json.decode(d.doc))
- }
+ Document.all(cr).collect { |d| d.to_h }
end
protected
+
+ def insert (doc, rev)
+
+ Document.new(
+ :ide => doc['_id'],
+ :rev => rev,
+ :typ => doc['type'],
+ :doc => Rufus::Json.encode(doc.merge(
+ '_rev' => rev,
+ 'put_at' => Ruote.now_to_utc_s)),
+ :participant_name => doc['participant_name']
+ ).save!
+ end
+
+ def do_get (type, key)
+
+ Document.first(:typ => type, :ide => key, :order => :rev.desc)
+ end
# Don't put configuration if it's already in
#
# (avoid storages from trashing configuration...)
#