#-- # Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell # copies of the Software, and to permit persons to whom the Software is # furnished to do so, subject to the following conditions: # # The above copyright notice and this permission notice shall be included in # all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN # THE SOFTWARE. # # Made in Japan. #++ require 'cgi' module Ruote::Couch # # A database corresponds to a Couch database (not a Couch server). # # There is one database per ruote document type (msgs, workitems, # expressions, ...) # class Database attr_reader :type attr_reader :couch def initialize (host, port, type, name, opts={}) @couch = Rufus::Jig::Couch.new(host, port, name, opts) @couch.put('.') unless @couch.get('.') @type = type prepare end def put (doc, opts) doc['put_at'] = Ruote.now_to_utc_s @couch.put(doc, :update_rev => opts[:update_rev]) # # :update_rev => true : # updating the current doc _rev, this trick allows # direct "create then apply" chaining end def get (key, opts={}) @couch.get(key, opts) end def delete (doc) r = @couch.delete(doc) #p [ :del, doc['_id'], Thread.current.object_id.to_s[-3..-1], r.nil? ] Thread.pass # without this, test/functional/ct_0 fails after 1 to 10 runs... r rescue Rufus::Jig::TimeoutError => te true end # The get_many used by msgs, configurations and variables. # def get_many (key, opts) return query('_all_docs?include_docs=true') if ( ! key) && opts.size < 1 is = ids return is.length if opts[:count] is = is.reverse if opts[:descending] if key keys = Array(key).map { |k| k.is_a?(String) ? "!#{k}" : k } is = is.select { |i| Ruote::StorageBase.key_match?(keys, i) } end skip = opts[:skip] || 0 limit = opts[:limit] || is.length is = is[skip, limit] query_by_post('_all_docs?include_docs=true', is) # TODO # maybe _count come be of use here # http://wiki.apache.org/couchdb/Built-In_Reduce_Functions end # Returns a sorted list of the ids of all the docs in this database. # def ids @couch.get('_all_docs')['rows'].collect { |r| r['id'] }.reject { |i| DESIGN_DOC_REGEX.match(i) } end def dump s = "=== #{@type} ===\n" get_many(nil, {}).inject(s) do |s1, e| s1 << "\n" e.keys.sort.inject(s1) do |s2, k| s2 << " #{k} => #{e[k].inspect}\n" end end end # Makes sure to close the HTTP connection down. # def shutdown @couch.close end # Deletes all the documents in this database. # def purge! @couch.http.cache.clear @couch.get('_all_docs')['rows'].each do |row| next if row['id'].match(/^\_design\//) doc = { '_id' => row['id'], '_rev' => row['value']['rev'] } @couch.delete(doc) end # # which is faster than # #@couch.delete('.') #@couch.put('.') #@couch.http.cache.clear end protected def prepare # nothing to do for a index-less database end # These options are known and passed to CouchDB. # QUERY_OPTIONS = [ :skip, :limit, :descending ] # :limit and :skip support # def query_options (opts) opts = opts.select { |k, v| QUERY_OPTIONS.include?(k) && v != nil } s = opts.collect { |k, v| "#{k}=#{v}" }.join('&') s.length > 0 ? "&#{s}" : '' end # Used by #get_many and #ids when filtering design documents out of # '_all_docs'. # DESIGN_DOC_REGEX = /^\_design\// def filter_design_docs (docs) docs.reject { |d| DESIGN_DOC_REGEX.match(d['_id']) } end def query (uri) rs = @couch.get(uri) filter_design_docs(rs['rows'].collect { |e| e['doc'] }) end def query_by_post (uri, keys) keys = { 'keys' => keys } rs = @couch.post(uri, keys) filter_design_docs(rs['rows'].collect { |e| e['doc'] }.uniq) end end # # A Couch database with a by_wfid view. # class WfidIndexedDatabase < Database # The get_many used by errors, expressions and schedules. # def get_many (key, opts) return super(key, opts) unless key.is_a?(String) # key is a wfid query("_design/ruote/_view/by_wfid?key=%22#{key}%22&include_docs=true") end # Used by WorkitemDatabase#query # def by_wfid (wfid) #get_many(/!#{wfid}$/, {}) get_many(wfid, {}) end # Returns the design document that goes with this class of database # def self.design_doc self.allocate.send(:design_doc) end protected def design_doc { '_id' => '_design/ruote', 'views' => { 'by_wfid' => { 'map' => %{ function (doc) { if (doc.wfid) emit(doc.wfid, null); else if (doc.fei) emit(doc.fei.wfid, null); } } } } } end def prepare d = @couch.get('_design/ruote') return if d && d['views'] == design_doc['views'] d ||= design_doc d['views'] = design_doc['views'] @couch.put(design_doc) end end # # A Couch database with a by_wfid view and a by_field view. # class WorkitemDatabase < WfidIndexedDatabase # This method is called by CouchStorage#by_field # def by_field (field, value=nil, opts={}) field = { field => value } if value field = CGI.escape(Rufus::Json.encode(field)) query( "_design/ruote/_view/by_field?key=#{field}" + "&include_docs=true#{query_options(opts)}") end # This method is called by CouchStorage#by_participant # def by_participant (name, opts={}) query( "_design/ruote/_view/by_participant_name?key=%22#{name}%22" + "&include_docs=true#{query_options(opts)}") end # This method is called by CouchStorage#query # def query_workitems (criteria) offset = criteria.delete('offset') limit = criteria.delete('limit') wfid = criteria.delete('wfid') pname = criteria.delete('participant_name') || criteria.delete('participant') if criteria.empty? && (wfid.nil? ^ pname.nil?) return by_participant(pname) if pname return by_wfid(wfid) # if wfid end return get_many(nil, {}) if criteria.empty? cr = criteria.collect { |fname, fvalue| { fname => fvalue } } opts = { :skip => offset, :limit => limit } hwis = query_by_post( "_design/ruote/_view/by_field?include_docs=true#{query_options(opts)}", cr) hwis = hwis.select { |hwi| hwi['fei']['wfid'] == wfid } if wfid hwis.select { |hwi| Ruote::StorageParticipant.matches?(hwi, pname, criteria) } end # Returns the design document that goes with this class of database # def self.design_doc self.allocate.send(:design_doc) end protected def design_doc doc = super # NOTE : with 'by_field', for a workitem with N fields there are # currently 2 * N rows generated per workitem. # # Why not restrict { field => value } keys to only fields whose value # is a string, a boolean or null ? I have the impression that querying # for field whose value is 'complex' (array or hash) is not necessary # (though sounding crazy useful). doc['views']['by_field'] = { 'map' => %{ function(doc) { if (doc.fields) { for (var field in doc.fields) { emit(field, null); var entry = {}; entry[field] = doc.fields[field] emit(entry, null); // // have to use that 'entry' trick... // else the field is named 'field' } } } } } doc['views']['by_participant_name'] = { 'map' => %{ function (doc) { if (doc.participant_name) { emit(doc.participant_name, null); } } } } doc end end end