lib/ruote/sequel/storage.rb in ruote-sequel-2.3.0 vs lib/ruote/sequel/storage.rb in ruote-sequel-2.3.0.2
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2013, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -195,17 +195,17 @@
ds = ds.filter(:wfid => keys) if keys && keys.first.is_a?(String)
return ds.count if opts[:count]
ds = ds.order(
- opts[:descending] ? :ide.desc : :ide.asc, :rev.desc
+ opts[:descending] ? ::Sequel.desc(:ide) : ::Sequel.asc(:ide), ::Sequel.desc(:rev)
).limit(
opts[:limit], opts[:skip] || opts[:offset]
)
docs = select_last_revs(ds)
- docs = docs.collect { |d| Rufus::Json.decode(d[:doc]) }
+ docs = docs.collect { |d| decode_doc(d) }
if keys && keys.first.is_a?(Regexp)
docs.select { |doc| keys.find { |key| key.match(doc['_id']) } }
else
docs
@@ -271,11 +271,11 @@
)
return docs.count if opts[:count]
docs = docs.order(
- :ide.asc, :rev.desc
+ ::Sequel.asc(:ide), ::Sequel.desc(:rev)
).limit(
opts[:limit], opts[:offset] || opts[:skip]
)
select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d[:doc]) }
@@ -292,17 +292,17 @@
lk.push('%')
docs = @sequel[@table].where(
:typ => type
).filter(
- :doc.like(lk.join)
+ ::Sequel.like(:doc, lk.join)
)
return docs.count if opts[:count]
docs = docs.order(
- :ide.asc, :rev.desc
+ ::Sequel.asc(:ide), ::Sequel.desc(:rev)
).limit(
opts[:limit], opts[:offset] || opts[:skip]
)
select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d[:doc]) }
@@ -320,20 +320,20 @@
wfid =
criteria.delete('wfid')
pname =
criteria.delete('participant_name') || criteria.delete('participant')
- ds = ds.filter(:ide.like("%!#{wfid}")) if wfid
+ ds = ds.filter(::Sequel.like(:ide, "%!#{wfid}")) if wfid
ds = ds.filter(:participant_name => pname) if pname
criteria.collect do |k, v|
- ds = ds.filter(:doc.like("%\"#{k}\":#{Rufus::Json.encode(v)}%"))
+ ds = ds.filter(::Sequel.like(:doc, "%\"#{k}\":#{Rufus::Json.encode(v)}%"))
end
return ds.count if count
- ds = ds.order(:ide.asc, :rev.desc).limit(limit, offset)
+ ds = ds.order(::Sequel.asc(:ide), ::Sequel.desc(:rev)).limit(limit, offset)
select_last_revs(ds).collect { |d| Ruote::Workitem.from_json(d[:doc]) }
end
# Used by the worker to indicate a new step begins. For ruote-sequel,
@@ -345,24 +345,49 @@
prepare_cache
end
protected
+ def decode_doc(doc)
+
+ return nil if doc.nil?
+
+ doc = doc[:doc]
+ doc = doc.read if doc.respond_to?(:read)
+
+ Rufus::Json.decode(doc)
+ end
+
def do_insert(doc, rev, update_rev=false)
doc = doc.send(
update_rev ? :merge! : :merge,
{ '_rev' => rev, 'put_at' => Ruote.now_to_utc_s })
- @sequel[@table].insert(
- :ide => doc['_id'],
- :rev => rev,
- :typ => doc['type'],
- :doc => Rufus::Json.encode(doc),
- :wfid => extract_wfid(doc),
- :participant_name => doc['participant_name']
- )
+ # Use bound variables
+ # http://sequel.rubyforge.org/rdoc/files/doc/prepared_statements_rdoc.html
+ #
+ # That makes Oracle happy (the doc field might > 4000 characters)
+ #
+ # Thanks Geoff Herney
+ #
+ @sequel[@table].call(
+ :insert, {
+ :ide => (doc['_id'] || ''),
+ :rev => (rev || ''),
+ :typ => (doc['type'] || ''),
+ :doc => (Rufus::Json.encode(doc) || ''),
+ :wfid => (extract_wfid(doc) || ''),
+ :participant_name => (doc['participant_name'] || '')
+ }, {
+ :ide => :$ide,
+ :rev => :$rev,
+ :typ => :$typ,
+ :doc => :$doc,
+ :wfid => :$wfid,
+ :participant_name => :$participant_name
+ })
end
def extract_wfid(doc)
doc['wfid'] || (doc['fei'] ? doc['fei']['wfid'] : nil)
@@ -372,11 +397,11 @@
d = @sequel[@table].select(:doc).where(
:typ => type, :ide => key
).reverse_order(:rev).first
- d ? Rufus::Json.decode(d[:doc]) : nil
+ decode_doc(d)
end
# Weed out older docs (same ide, smaller rev).
#
# This could all have been done via SQL, but those inconsistencies
@@ -408,13 +433,13 @@
@sequel[@table].select(
:ide, :typ, :doc
).where(
:typ => CACHED_TYPES
).order(
- :ide.asc, :rev.desc
+ ::Sequel.asc(:ide), ::Sequel.desc(:rev)
).each do |d|
- (cache[d[:typ]] ||= {})[d[:ide]] ||= Rufus::Json.decode(d[:doc])
+ (cache[d[:typ]] ||= {})[d[:ide]] ||= decode_doc(d)
end
cache['variables']['trackers'] ||=
{ '_id' => 'trackers', 'type' => 'variables', 'trackers' => {} }
end
@@ -457,6 +482,5 @@
(Thread.current["cache_#{worker.name}"] ||= {})
end
end
end
end
-