lib/ruote/sequel/storage.rb in ruote-sequel-2.2.0 vs lib/ruote/sequel/storage.rb in ruote-sequel-2.3.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -31,24 +31,33 @@ module Sequel # Creates the 'documents' table necessary for this storage. # # If re_create is set to true, it will destroy any previous 'documents' - # table and create it. + # table and create it. If false (default) then the table will be created + # if it doesn't already exist. # - def self.create_table(sequel, re_create=false) + # It's also possible to change the default table_name from 'documents' to + # something else with the optional third parameter + # + def self.create_table(sequel, re_create=false, table_name='documents') - m = re_create ? :create_table! : :create_table + m = re_create ? :create_table! : :create_table? - sequel.send(m, :documents) do + sequel.send(m, table_name.to_sym) do + String :ide, :size => 255, :null => false Integer :rev, :null => false String :typ, :size => 55, :null => false String :doc, :text => true, :null => false - String :wfid, :size => 255, :index => true + String :wfid, :size => 255 String :participant_name, :size => 512 - primary_key [ :ide, :rev, :typ ] + + primary_key [ :typ, :ide, :rev ] + + index :wfid + #index [ :typ, :wfid ] end end # # A Sequel storage implementation for ruote >= 2.2.0. @@ -78,13 +87,14 @@ attr_reader :sequel def initialize(sequel, options={}) @sequel = sequel - @options = options + #@options = options + @table = (options['sequel_table_name'] || :documents).to_sym - put_configuration + replace_engine_configuration(options) end def put_msg(action, options) # put_msg is a unique action, no need for all the complexity of put @@ -92,10 +102,21 @@ do_insert(prepare_msg_doc(action, options), 1) nil end + # Used to reserve 'msgs' and 'schedules'. Simply deletes the document, + # return true if the delete was successful (ie if the reservation is + # valid). + # + def reserve(doc) + + @sequel[@table].where( + :typ => doc['type'], :ide => doc['_id'], :rev => 1 + ).delete > 0 + end + def put_schedule(flavour, owner_fei, s, msg) # put_schedule is a unique action, no need for all the complexity of put doc = prepare_schedule_doc(flavour, owner_fei, s, msg) @@ -107,10 +128,12 @@ doc['_id'] end def put(doc, opts={}) + cache_clear(doc) + if doc['_rev'] d = get(doc['type'], doc['_id']) return true unless d @@ -120,98 +143,94 @@ nrev = doc['_rev'].to_i + 1 begin - do_insert(doc, nrev) + do_insert(doc, nrev, opts[:update_rev]) rescue ::Sequel::DatabaseError => de return (get(doc['type'], doc['_id']) || true) # failure end - @sequel[:documents].where( + @sequel[@table].where( :typ => doc['type'], :ide => doc['_id'] ).filter { rev < nrev }.delete - doc['_rev'] = nrev if opts[:update_rev] - nil # success end def get(type, key) - d = do_get(type, key) - - d ? Rufus::Json.decode(d[:doc]) : nil + cache_get(type, key) || do_get(type, key) end def delete(doc) raise ArgumentError.new('no _rev for doc') unless doc['_rev'] - count = do_delete(doc) + cache_clear(doc) + # usually not necessary, adding it not to forget it later on - return (get(doc['type'], doc['_id']) || true) if count != 1 + count = @sequel[@table].where( + :typ => doc['type'], :ide => doc['_id'], :rev => doc['_rev'].to_i + ).delete + + return (get(doc['type'], doc['_id']) || true) if count < 1 # failure nil # success end def get_many(type, key=nil, opts={}) - ds = @sequel[:documents].where(:typ => type) + cached = cache_get_many(type, key, opts) + return cached if cached + ds = @sequel[@table].where(:typ => type) + keys = key ? Array(key) : nil ds = ds.filter(:wfid => keys) if keys && keys.first.is_a?(String) - return ds.all.size if opts[:count] + return ds.count if opts[:count] ds = ds.order( - *(opts[:descending] ? [ :ide.desc, :rev.desc ] : [ :ide.asc, :rev.asc ]) + opts[:descending] ? :ide.desc : :ide.asc, :rev.desc + ).limit( + opts[:limit], opts[:skip] || opts[:offset] ) - ds = ds.limit(opts[:limit], opts[:skip]) - - docs = ds.all - docs = select_last_revs(docs, opts[:descending]) + docs = select_last_revs(ds) docs = docs.collect { |d| Rufus::Json.decode(d[:doc]) } - keys && keys.first.is_a?(Regexp) ? - docs.select { |doc| keys.find { |key| key.match(doc['_id']) } } : + if keys && keys.first.is_a?(Regexp) + docs.select { |doc| keys.find { |key| key.match(doc['_id']) } } + else docs + end # (pass on the dataset.filter(:wfid => /regexp/) for now # since we have potentially multiple keys) end # Returns all the ids of the documents of a given type. # def ids(type) - @sequel[:documents].where(:typ => type).collect { |d| d[:ide] }.uniq.sort + @sequel[@table].where(:typ => type).collect { |d| d[:ide] }.uniq.sort end # Nukes all the documents in this storage. # def purge! - @sequel[:documents].delete + @sequel[@table].delete end - # Returns a string representation the current content of the storage for - # a given type. - # - def dump(type) - - "=== #{type} ===\n" + - get_many(type).map { |h| " #{h['_id']} => #{h.inspect}" }.join("\n") - end - # Calls #disconnect on the db. According to Sequel's doc, it closes # all the idle connections in the pool (not the active ones). # def shutdown @@ -235,52 +254,71 @@ # Nukes a db type and reputs it (losing all the documents that were in it). # def purge_type!(type) - @sequel[:documents].where(:typ => type).delete + @sequel[@table].where(:typ => type).delete end # A provision made for workitems, allow to query them directly by # participant name. # - def by_participant(type, participant_name, opts) + def by_participant(type, participant_name, opts={}) raise NotImplementedError if type != 'workitems' - docs = @sequel[:documents].where( - :typ => type, :participant_name => participant_name) + docs = @sequel[@table].where( + :typ => type, :participant_name => participant_name + ) - select_last_revs(docs).collect { |d| Rufus::Json.decode(d[:doc]) } + return docs.count if opts[:count] + + docs = docs.order( + :ide.asc, :rev.desc + ).limit( + opts[:limit], opts[:offset] || opts[:skip] + ) + + select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d[:doc]) } end # Querying workitems by field (warning, goes deep into the JSON structure) # - def by_field(type, field, value=nil) + def by_field(type, field, value, opts={}) raise NotImplementedError if type != 'workitems' lk = [ '%"', field, '":' ] lk.push(Rufus::Json.encode(value)) if value lk.push('%') - docs = @sequel[:documents].where(:typ => type).filter(:doc.like(lk.join)) + docs = @sequel[@table].where( + :typ => type + ).filter( + :doc.like(lk.join) + ) - select_last_revs(docs).collect { |d| Rufus::Json.decode(d[:doc]) } + return docs.count if opts[:count] + + docs = docs.order( + :ide.asc, :rev.desc + ).limit( + opts[:limit], opts[:offset] || opts[:skip] + ) + + select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d[:doc]) } end def query_workitems(criteria) - ds = @sequel[:documents].where(:typ => 'workitems') + ds = @sequel[@table].where(:typ => 'workitems') - return select_last_revs(ds.all).size if criteria['count'] + count = criteria.delete('count') limit = criteria.delete('limit') - offset = criteria.delete('offset') + offset = criteria.delete('offset') || criteria.delete('skip') - ds = ds.limit(limit, offset) - wfid = criteria.delete('wfid') pname = criteria.delete('participant_name') || criteria.delete('participant') @@ -289,33 +327,39 @@ criteria.collect do |k, v| ds = ds.filter(:doc.like("%\"#{k}\":#{Rufus::Json.encode(v)}%")) end - select_last_revs(ds.all).collect { |d| - Ruote::Workitem.new(Rufus::Json.decode(d[:doc])) - } - end + return ds.count if count - protected + ds = ds.order(:ide.asc, :rev.desc).limit(limit, offset) - def do_delete(doc) + select_last_revs(ds).collect { |d| Ruote::Workitem.from_json(d[:doc]) } + end - @sequel[:documents].where( - :ide => doc['_id'], :typ => doc['type'], :rev => doc['_rev'].to_i - ).delete + # Used by the worker to indicate a new step begins. For ruote-sequel, + # it means the cache can be prepared (a unique select yielding + # all the info necessary for one worker step (expressions excluded)). + # + def begin_step + + prepare_cache end - def do_insert(doc, rev) + protected - @sequel[:documents].insert( + def do_insert(doc, rev, update_rev=false) + + doc = doc.send( + update_rev ? :merge! : :merge, + { '_rev' => rev, 'put_at' => Ruote.now_to_utc_s }) + + @sequel[@table].insert( :ide => doc['_id'], :rev => rev, :typ => doc['type'], - :doc => Rufus::Json.encode(doc.merge( - '_rev' => rev, - 'put_at' => Ruote.now_to_utc_s)), + :doc => Rufus::Json.encode(doc), :wfid => extract_wfid(doc), :participant_name => doc['participant_name'] ) end @@ -324,36 +368,94 @@ doc['wfid'] || (doc['fei'] ? doc['fei']['wfid'] : nil) end def do_get(type, key) - @sequel[:documents].where( + d = @sequel[@table].select(:doc).where( :typ => type, :ide => key ).reverse_order(:rev).first + + d ? Rufus::Json.decode(d[:doc]) : nil end - # Don't put configuration if it's already in + # Weed out older docs (same ide, smaller rev). # - # (avoid storages from trashing configuration...) + # This could all have been done via SQL, but those inconsistencies + # are rare, the cost of the pumped SQL is not constant :-( # - def put_configuration + def select_last_revs(docs) - return if get('configurations', 'engine') + docs.each_with_object([]) { |doc, a| + a << doc if a.last.nil? || doc[:ide] != a.last[:ide] + } + end - conf = { '_id' => 'engine', 'type' => 'configurations' }.merge(@options) - put(conf) + #-- + # worker step cache + # + # in order to cut down the number of selects, do one select with + # all the information the worker needs for one step of work + #++ + + CACHED_TYPES = %w[ msgs schedules configurations variables ] + + # One select to grab in all the info necessary for a worker step + # (expressions excepted). + # + def prepare_cache + + CACHED_TYPES.each { |t| cache[t] = {} } + + @sequel[@table].select( + :ide, :typ, :doc + ).where( + :typ => CACHED_TYPES + ).order( + :ide.asc, :rev.desc + ).each do |d| + (cache[d[:typ]] ||= {})[d[:ide]] ||= Rufus::Json.decode(d[:doc]) + end + + cache['variables']['trackers'] ||= + { '_id' => 'trackers', 'type' => 'variables', 'trackers' => {} } end - def select_last_revs(docs, reverse=false) + # Ask the cache for a doc. Returns nil if it's not cached. + # + def cache_get(type, key) - docs = docs.inject({}) { |h, doc| - h[doc[:ide]] = doc - h - }.values.sort_by { |h| - h[:ide] - } + (cache[type] || {})[key] + end - reverse ? docs.reverse : docs + # Ask the cache for a set of documents. Returns nil if it's not cached + # or caching is not OK. + # + def cache_get_many(type, keys, options) + + if !options[:batch] && CACHED_TYPES.include?(type) && cache[type] + cache[type].values + else + nil + end + end + + # Removes a document from the cache. + # + def cache_clear(doc) + + (cache[doc['type']] || {}).delete(doc['_id']) + end + + # Returns the cache for the given thread. Returns {} if there is no + # cache available. + # + def cache + + worker = Thread.current['ruote_worker'] + + return {} unless worker + + (Thread.current["cache_#{worker.name}"] ||= {}) end end end end