lib/ruote/sequel/storage.rb in ruote-sequel-2.3.0 vs lib/ruote/sequel/storage.rb in ruote-sequel-2.3.0.2

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -195,17 +195,17 @@ ds = ds.filter(:wfid => keys) if keys && keys.first.is_a?(String) return ds.count if opts[:count] ds = ds.order( - opts[:descending] ? :ide.desc : :ide.asc, :rev.desc + opts[:descending] ? ::Sequel.desc(:ide) : ::Sequel.asc(:ide), ::Sequel.desc(:rev) ).limit( opts[:limit], opts[:skip] || opts[:offset] ) docs = select_last_revs(ds) - docs = docs.collect { |d| Rufus::Json.decode(d[:doc]) } + docs = docs.collect { |d| decode_doc(d) } if keys && keys.first.is_a?(Regexp) docs.select { |doc| keys.find { |key| key.match(doc['_id']) } } else docs @@ -271,11 +271,11 @@ ) return docs.count if opts[:count] docs = docs.order( - :ide.asc, :rev.desc + ::Sequel.asc(:ide), ::Sequel.desc(:rev) ).limit( opts[:limit], opts[:offset] || opts[:skip] ) select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d[:doc]) } @@ -292,17 +292,17 @@ lk.push('%') docs = @sequel[@table].where( :typ => type ).filter( - :doc.like(lk.join) + ::Sequel.like(:doc, lk.join) ) return docs.count if opts[:count] docs = docs.order( - :ide.asc, :rev.desc + ::Sequel.asc(:ide), ::Sequel.desc(:rev) ).limit( opts[:limit], opts[:offset] || opts[:skip] ) select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d[:doc]) } @@ -320,20 +320,20 @@ wfid = criteria.delete('wfid') pname = criteria.delete('participant_name') || criteria.delete('participant') - ds = ds.filter(:ide.like("%!#{wfid}")) if wfid + ds = ds.filter(::Sequel.like(:ide, "%!#{wfid}")) if wfid ds = ds.filter(:participant_name => pname) if pname criteria.collect do |k, v| - ds = ds.filter(:doc.like("%\"#{k}\":#{Rufus::Json.encode(v)}%")) + ds = ds.filter(::Sequel.like(:doc, "%\"#{k}\":#{Rufus::Json.encode(v)}%")) end return ds.count if count - ds = ds.order(:ide.asc, :rev.desc).limit(limit, offset) + ds = ds.order(::Sequel.asc(:ide), ::Sequel.desc(:rev)).limit(limit, offset) select_last_revs(ds).collect { |d| Ruote::Workitem.from_json(d[:doc]) } end # Used by the worker to indicate a new step begins. For ruote-sequel, @@ -345,24 +345,49 @@ prepare_cache end protected + def decode_doc(doc) + + return nil if doc.nil? + + doc = doc[:doc] + doc = doc.read if doc.respond_to?(:read) + + Rufus::Json.decode(doc) + end + def do_insert(doc, rev, update_rev=false) doc = doc.send( update_rev ? :merge! : :merge, { '_rev' => rev, 'put_at' => Ruote.now_to_utc_s }) - @sequel[@table].insert( - :ide => doc['_id'], - :rev => rev, - :typ => doc['type'], - :doc => Rufus::Json.encode(doc), - :wfid => extract_wfid(doc), - :participant_name => doc['participant_name'] - ) + # Use bound variables + # http://sequel.rubyforge.org/rdoc/files/doc/prepared_statements_rdoc.html + # + # That makes Oracle happy (the doc field might > 4000 characters) + # + # Thanks Geoff Herney + # + @sequel[@table].call( + :insert, { + :ide => (doc['_id'] || ''), + :rev => (rev || ''), + :typ => (doc['type'] || ''), + :doc => (Rufus::Json.encode(doc) || ''), + :wfid => (extract_wfid(doc) || ''), + :participant_name => (doc['participant_name'] || '') + }, { + :ide => :$ide, + :rev => :$rev, + :typ => :$typ, + :doc => :$doc, + :wfid => :$wfid, + :participant_name => :$participant_name + }) end def extract_wfid(doc) doc['wfid'] || (doc['fei'] ? doc['fei']['wfid'] : nil) @@ -372,11 +397,11 @@ d = @sequel[@table].select(:doc).where( :typ => type, :ide => key ).reverse_order(:rev).first - d ? Rufus::Json.decode(d[:doc]) : nil + decode_doc(d) end # Weed out older docs (same ide, smaller rev). # # This could all have been done via SQL, but those inconsistencies @@ -408,13 +433,13 @@ @sequel[@table].select( :ide, :typ, :doc ).where( :typ => CACHED_TYPES ).order( - :ide.asc, :rev.desc + ::Sequel.asc(:ide), ::Sequel.desc(:rev) ).each do |d| - (cache[d[:typ]] ||= {})[d[:ide]] ||= Rufus::Json.decode(d[:doc]) + (cache[d[:typ]] ||= {})[d[:ide]] ||= decode_doc(d) end cache['variables']['trackers'] ||= { '_id' => 'trackers', 'type' => 'variables', 'trackers' => {} } end @@ -457,6 +482,5 @@ (Thread.current["cache_#{worker.name}"] ||= {}) end end end end -