lib/ruote/dm/storage.rb in ruote-dm-2.1.9 vs lib/ruote/dm/storage.rb in ruote-dm-2.1.10

- old
+ new

@@ -28,24 +28,48 @@ module Ruote module Dm + # + # All the ruote data is stored in a single ruote_dm_document table. + # + # The doc/data itself is stored in the 'doc' column, as JSON. + # + # Yajl-ruby is recommended for fast {de|en}coding with JSON. + # class Document include DataMapper::Resource - property :ide, String, :key => true, :length => 2048, :required => true + property :ide, String, :key => true, :length => 255, :required => true property :rev, Integer, :key => true, :required => true property :typ, String, :key => true, :required => true property :doc, Text, :length => 2**32 - 1, :required => true, :lazy => false property :participant_name, String, :length => 512 + + def to_h + Rufus::Json.decode(doc) + end end # # A datamapper-powered storage for ruote. # + # require 'rubygems' + # require 'json' # gem install json + # require 'ruote' + # require 'ruote-dm' # gem install ruote-dm + # + # #DataMapper.setup(:default, 'sqlite3::memory:') + # #DataMapper.setup(:default, 'sqlite3:ruote_test.db') + # DataMapper.setup(:default, 'postgres://localhost/ruote_test') + # + # engine = Ruote::Engine.new( + # Ruote::Worker.new( + # Ruote::Dm::DmStorage.new(:default))) + # class DmStorage include Ruote::StorageBase attr_reader :repository @@ -56,69 +80,96 @@ @repository = repository put_configuration end + def put_msg (action, options) + + # put_msg is a unique action, no need for all the complexity of put + + DataMapper.repository(@repository) do + + doc = prepare_msg_doc(action, options) + + insert(doc, 1) + end + + nil + end + + def put_schedule (flavour, owner_fei, s, msg) + + # put_schedule is a unique action, no need for all the complexity of put + + doc = prepare_schedule_doc(flavour, owner_fei, s, msg) + + return nil unless doc + + DataMapper.repository(@repository) do + + insert(doc, 1) + + doc['_id'] + end + end + def put (doc, opts={}) DataMapper.repository(@repository) do - d = Document.first(:ide => doc['_id'], :typ => doc['type']) + current = do_get(doc['type'], doc['_id']) - return Rufus::Json.decode(d.doc) if d && d.rev != doc['_rev'] + rev = doc['_rev'].to_i - if doc['_rev'].nil? + return true if current.nil? && rev > 0 + return current.to_h if current && rev != current.rev - d = Document.new( - :ide => doc['_id'], - :rev => 0, - :typ => doc['type'], - :doc => Rufus::Json.encode(doc.merge( - '_rev' => 0, 'put_at' => Ruote.now_to_utc_s)), - :participant_name => doc['participant_name'] - ).save + nrev = rev + 1 - doc['_rev'] = 0 if opts[:update_rev] + begin - else + insert(doc, nrev) - return true unless d + current.destroy! if current - d.rev = d.rev + 1 - d.doc = Rufus::Json.encode(doc.merge( - '_rev' => d.rev, 'put_at' => Ruote.now_to_utc_s)) - d.save + doc['_rev'] = nrev if opts[:update_rev] - doc['_rev'] = d.rev if opts[:update_rev] + return nil + + rescue DataObjects::IntegrityError => ie + #p :clash end - nil + get(doc['type'], doc['_id']) end end def get (type, key) DataMapper.repository(@repository) do - d = Document.first(:typ => type, :ide => key) - d ? Rufus::Json.decode(d.doc) : nil + d = do_get(type, key) + d ? d.to_h : nil end end def delete (doc) raise ArgumentError.new('no _rev for doc') unless doc['_rev'] DataMapper.repository(@repository) do - d = Document.first( - :typ => doc['type'], :ide => doc['_id'], :rev => doc['_rev']) + r = put(doc) - return true unless d + #p [ 0, true, doc['_id'], Thread.current.object_id.to_s[-3..-1] ] if r - d.destroy! + return true unless r.nil? - nil + r = Document.all(:typ => doc['type'], :ide => doc['_id']).destroy! + + #p [ 1, r ? nil : true, doc['_id'], Thread.current.object_id.to_s[-3..-1] ] + + r ? nil : true end end def get_many (type, key=nil, opts={}) @@ -137,11 +188,11 @@ "%#{key.source}%" end end DataMapper.repository(@repository) do - Document.all(q).collect { |d| Rufus::Json.decode(d.doc) } + Document.all(q).collect { |d| d.to_h } end end def ids (type) @@ -155,14 +206,20 @@ DataMapper.repository(@repository) do Document.all.destroy! end end - #def dump (type) - # @dbs[type].dump - #end + def dump (type) + s = "=== #{type} ===\n" + + get_many(type).inject(s) do |s1, h| + s1 << " #{Ruote::FlowExpressionId.to_storage_id(h['fei'])}" + s1 << " => #{h['original_tree'].first} #{h['_rev']}\n" + end + end + def shutdown #@dbs.values.each { |db| db.shutdown } end @@ -190,11 +247,11 @@ raise NotImplementedError if type != 'workitems' Document.all( :typ => type, :participant_name => participant_name ).collect { |d| - Rufus::Json.decode(d.doc) + d.to_h } end # Querying workitems by field (warning, goes deep into the JSON structure) # @@ -204,13 +261,11 @@ like = [ '%"', field, '":' ] like.push(Rufus::Json.encode(value)) if value like.push('%') - Document.all(:typ => type, :doc.like => like.join).collect { |d| - Rufus::Json.decode(d.doc) - } + Document.all(:typ => type, :doc.like => like.join).collect { |d| d.to_h } end def query_workitems (criteria) offset = criteria.delete('offset') @@ -233,15 +288,31 @@ end cr[:conditions] = [ ([ 'doc LIKE ?' ] * likes.size).join(' AND '), *likes ] unless likes.empty? - Document.all(cr).collect { |d| - Ruote::Workitem.new(Rufus::Json.decode(d.doc)) - } + Document.all(cr).collect { |d| d.to_h } end protected + + def insert (doc, rev) + + Document.new( + :ide => doc['_id'], + :rev => rev, + :typ => doc['type'], + :doc => Rufus::Json.encode(doc.merge( + '_rev' => rev, + 'put_at' => Ruote.now_to_utc_s)), + :participant_name => doc['participant_name'] + ).save! + end + + def do_get (type, key) + + Document.first(:typ => type, :ide => key, :order => :rev.desc) + end # Don't put configuration if it's already in # # (avoid storages from trashing configuration...) #