lib/ruote/sequel/storage.rb in ruote-sequel-2.2.0 vs lib/ruote/sequel/storage.rb in ruote-sequel-2.3.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2012, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -31,24 +31,33 @@
module Sequel
# Creates the 'documents' table necessary for this storage.
#
# If re_create is set to true, it will destroy any previous 'documents'
- # table and create it.
+ # table and create it. If false (default) then the table will be created
+ # if it doesn't already exist.
#
- def self.create_table(sequel, re_create=false)
+ # It's also possible to change the default table_name from 'documents' to
+ # something else with the optional third parameter
+ #
+ def self.create_table(sequel, re_create=false, table_name='documents')
- m = re_create ? :create_table! : :create_table
+ m = re_create ? :create_table! : :create_table?
- sequel.send(m, :documents) do
+ sequel.send(m, table_name.to_sym) do
+
String :ide, :size => 255, :null => false
Integer :rev, :null => false
String :typ, :size => 55, :null => false
String :doc, :text => true, :null => false
- String :wfid, :size => 255, :index => true
+ String :wfid, :size => 255
String :participant_name, :size => 512
- primary_key [ :ide, :rev, :typ ]
+
+ primary_key [ :typ, :ide, :rev ]
+
+ index :wfid
+ #index [ :typ, :wfid ]
end
end
#
# A Sequel storage implementation for ruote >= 2.2.0.
@@ -78,13 +87,14 @@
attr_reader :sequel
def initialize(sequel, options={})
@sequel = sequel
- @options = options
+ #@options = options
+ @table = (options['sequel_table_name'] || :documents).to_sym
- put_configuration
+ replace_engine_configuration(options)
end
def put_msg(action, options)
# put_msg is a unique action, no need for all the complexity of put
@@ -92,10 +102,21 @@
do_insert(prepare_msg_doc(action, options), 1)
nil
end
+ # Used to reserve 'msgs' and 'schedules'. Simply deletes the document,
+ # return true if the delete was successful (ie if the reservation is
+ # valid).
+ #
+ def reserve(doc)
+
+ @sequel[@table].where(
+ :typ => doc['type'], :ide => doc['_id'], :rev => 1
+ ).delete > 0
+ end
+
def put_schedule(flavour, owner_fei, s, msg)
# put_schedule is a unique action, no need for all the complexity of put
doc = prepare_schedule_doc(flavour, owner_fei, s, msg)
@@ -107,10 +128,12 @@
doc['_id']
end
def put(doc, opts={})
+ cache_clear(doc)
+
if doc['_rev']
d = get(doc['type'], doc['_id'])
return true unless d
@@ -120,98 +143,94 @@
nrev = doc['_rev'].to_i + 1
begin
- do_insert(doc, nrev)
+ do_insert(doc, nrev, opts[:update_rev])
rescue ::Sequel::DatabaseError => de
return (get(doc['type'], doc['_id']) || true)
# failure
end
- @sequel[:documents].where(
+ @sequel[@table].where(
:typ => doc['type'], :ide => doc['_id']
).filter { rev < nrev }.delete
- doc['_rev'] = nrev if opts[:update_rev]
-
nil
# success
end
def get(type, key)
- d = do_get(type, key)
-
- d ? Rufus::Json.decode(d[:doc]) : nil
+ cache_get(type, key) || do_get(type, key)
end
def delete(doc)
raise ArgumentError.new('no _rev for doc') unless doc['_rev']
- count = do_delete(doc)
+ cache_clear(doc)
+ # usually not necessary, adding it not to forget it later on
- return (get(doc['type'], doc['_id']) || true) if count != 1
+ count = @sequel[@table].where(
+ :typ => doc['type'], :ide => doc['_id'], :rev => doc['_rev'].to_i
+ ).delete
+
+ return (get(doc['type'], doc['_id']) || true) if count < 1
# failure
nil
# success
end
def get_many(type, key=nil, opts={})
- ds = @sequel[:documents].where(:typ => type)
+ cached = cache_get_many(type, key, opts)
+ return cached if cached
+ ds = @sequel[@table].where(:typ => type)
+
keys = key ? Array(key) : nil
ds = ds.filter(:wfid => keys) if keys && keys.first.is_a?(String)
- return ds.all.size if opts[:count]
+ return ds.count if opts[:count]
ds = ds.order(
- *(opts[:descending] ? [ :ide.desc, :rev.desc ] : [ :ide.asc, :rev.asc ])
+ opts[:descending] ? :ide.desc : :ide.asc, :rev.desc
+ ).limit(
+ opts[:limit], opts[:skip] || opts[:offset]
)
- ds = ds.limit(opts[:limit], opts[:skip])
-
- docs = ds.all
- docs = select_last_revs(docs, opts[:descending])
+ docs = select_last_revs(ds)
docs = docs.collect { |d| Rufus::Json.decode(d[:doc]) }
- keys && keys.first.is_a?(Regexp) ?
- docs.select { |doc| keys.find { |key| key.match(doc['_id']) } } :
+ if keys && keys.first.is_a?(Regexp)
+ docs.select { |doc| keys.find { |key| key.match(doc['_id']) } }
+ else
docs
+ end
# (pass on the dataset.filter(:wfid => /regexp/) for now
# since we have potentially multiple keys)
end
# Returns all the ids of the documents of a given type.
#
def ids(type)
- @sequel[:documents].where(:typ => type).collect { |d| d[:ide] }.uniq.sort
+ @sequel[@table].where(:typ => type).collect { |d| d[:ide] }.uniq.sort
end
# Nukes all the documents in this storage.
#
def purge!
- @sequel[:documents].delete
+ @sequel[@table].delete
end
- # Returns a string representation the current content of the storage for
- # a given type.
- #
- def dump(type)
-
- "=== #{type} ===\n" +
- get_many(type).map { |h| " #{h['_id']} => #{h.inspect}" }.join("\n")
- end
-
# Calls #disconnect on the db. According to Sequel's doc, it closes
# all the idle connections in the pool (not the active ones).
#
def shutdown
@@ -235,52 +254,71 @@
# Nukes a db type and reputs it (losing all the documents that were in it).
#
def purge_type!(type)
- @sequel[:documents].where(:typ => type).delete
+ @sequel[@table].where(:typ => type).delete
end
# A provision made for workitems, allow to query them directly by
# participant name.
#
- def by_participant(type, participant_name, opts)
+ def by_participant(type, participant_name, opts={})
raise NotImplementedError if type != 'workitems'
- docs = @sequel[:documents].where(
- :typ => type, :participant_name => participant_name)
+ docs = @sequel[@table].where(
+ :typ => type, :participant_name => participant_name
+ )
- select_last_revs(docs).collect { |d| Rufus::Json.decode(d[:doc]) }
+ return docs.count if opts[:count]
+
+ docs = docs.order(
+ :ide.asc, :rev.desc
+ ).limit(
+ opts[:limit], opts[:offset] || opts[:skip]
+ )
+
+ select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d[:doc]) }
end
# Querying workitems by field (warning, goes deep into the JSON structure)
#
- def by_field(type, field, value=nil)
+ def by_field(type, field, value, opts={})
raise NotImplementedError if type != 'workitems'
lk = [ '%"', field, '":' ]
lk.push(Rufus::Json.encode(value)) if value
lk.push('%')
- docs = @sequel[:documents].where(:typ => type).filter(:doc.like(lk.join))
+ docs = @sequel[@table].where(
+ :typ => type
+ ).filter(
+ :doc.like(lk.join)
+ )
- select_last_revs(docs).collect { |d| Rufus::Json.decode(d[:doc]) }
+ return docs.count if opts[:count]
+
+ docs = docs.order(
+ :ide.asc, :rev.desc
+ ).limit(
+ opts[:limit], opts[:offset] || opts[:skip]
+ )
+
+ select_last_revs(docs).collect { |d| Ruote::Workitem.from_json(d[:doc]) }
end
def query_workitems(criteria)
- ds = @sequel[:documents].where(:typ => 'workitems')
+ ds = @sequel[@table].where(:typ => 'workitems')
- return select_last_revs(ds.all).size if criteria['count']
+ count = criteria.delete('count')
limit = criteria.delete('limit')
- offset = criteria.delete('offset')
+ offset = criteria.delete('offset') || criteria.delete('skip')
- ds = ds.limit(limit, offset)
-
wfid =
criteria.delete('wfid')
pname =
criteria.delete('participant_name') || criteria.delete('participant')
@@ -289,33 +327,39 @@
criteria.collect do |k, v|
ds = ds.filter(:doc.like("%\"#{k}\":#{Rufus::Json.encode(v)}%"))
end
- select_last_revs(ds.all).collect { |d|
- Ruote::Workitem.new(Rufus::Json.decode(d[:doc]))
- }
- end
+ return ds.count if count
- protected
+ ds = ds.order(:ide.asc, :rev.desc).limit(limit, offset)
- def do_delete(doc)
+ select_last_revs(ds).collect { |d| Ruote::Workitem.from_json(d[:doc]) }
+ end
- @sequel[:documents].where(
- :ide => doc['_id'], :typ => doc['type'], :rev => doc['_rev'].to_i
- ).delete
+ # Used by the worker to indicate a new step begins. For ruote-sequel,
+ # it means the cache can be prepared (a unique select yielding
+ # all the info necessary for one worker step (expressions excluded)).
+ #
+ def begin_step
+
+ prepare_cache
end
- def do_insert(doc, rev)
+ protected
- @sequel[:documents].insert(
+ def do_insert(doc, rev, update_rev=false)
+
+ doc = doc.send(
+ update_rev ? :merge! : :merge,
+ { '_rev' => rev, 'put_at' => Ruote.now_to_utc_s })
+
+ @sequel[@table].insert(
:ide => doc['_id'],
:rev => rev,
:typ => doc['type'],
- :doc => Rufus::Json.encode(doc.merge(
- '_rev' => rev,
- 'put_at' => Ruote.now_to_utc_s)),
+ :doc => Rufus::Json.encode(doc),
:wfid => extract_wfid(doc),
:participant_name => doc['participant_name']
)
end
@@ -324,36 +368,94 @@
doc['wfid'] || (doc['fei'] ? doc['fei']['wfid'] : nil)
end
def do_get(type, key)
- @sequel[:documents].where(
+ d = @sequel[@table].select(:doc).where(
:typ => type, :ide => key
).reverse_order(:rev).first
+
+ d ? Rufus::Json.decode(d[:doc]) : nil
end
- # Don't put configuration if it's already in
+ # Weed out older docs (same ide, smaller rev).
#
- # (avoid storages from trashing configuration...)
+ # This could all have been done via SQL, but those inconsistencies
+ # are rare, the cost of the pumped SQL is not constant :-(
#
- def put_configuration
+ def select_last_revs(docs)
- return if get('configurations', 'engine')
+ docs.each_with_object([]) { |doc, a|
+ a << doc if a.last.nil? || doc[:ide] != a.last[:ide]
+ }
+ end
- conf = { '_id' => 'engine', 'type' => 'configurations' }.merge(@options)
- put(conf)
+ #--
+ # worker step cache
+ #
+ # in order to cut down the number of selects, do one select with
+ # all the information the worker needs for one step of work
+ #++
+
+ CACHED_TYPES = %w[ msgs schedules configurations variables ]
+
+ # One select to grab in all the info necessary for a worker step
+ # (expressions excepted).
+ #
+ def prepare_cache
+
+ CACHED_TYPES.each { |t| cache[t] = {} }
+
+ @sequel[@table].select(
+ :ide, :typ, :doc
+ ).where(
+ :typ => CACHED_TYPES
+ ).order(
+ :ide.asc, :rev.desc
+ ).each do |d|
+ (cache[d[:typ]] ||= {})[d[:ide]] ||= Rufus::Json.decode(d[:doc])
+ end
+
+ cache['variables']['trackers'] ||=
+ { '_id' => 'trackers', 'type' => 'variables', 'trackers' => {} }
end
- def select_last_revs(docs, reverse=false)
+ # Ask the cache for a doc. Returns nil if it's not cached.
+ #
+ def cache_get(type, key)
- docs = docs.inject({}) { |h, doc|
- h[doc[:ide]] = doc
- h
- }.values.sort_by { |h|
- h[:ide]
- }
+ (cache[type] || {})[key]
+ end
- reverse ? docs.reverse : docs
+ # Ask the cache for a set of documents. Returns nil if it's not cached
+ # or caching is not OK.
+ #
+ def cache_get_many(type, keys, options)
+
+ if !options[:batch] && CACHED_TYPES.include?(type) && cache[type]
+ cache[type].values
+ else
+ nil
+ end
+ end
+
+ # Removes a document from the cache.
+ #
+ def cache_clear(doc)
+
+ (cache[doc['type']] || {}).delete(doc['_id'])
+ end
+
+ # Returns the cache for the given thread. Returns {} if there is no
+ # cache available.
+ #
+ def cache
+
+ worker = Thread.current['ruote_worker']
+
+ return {} unless worker
+
+ (Thread.current["cache_#{worker.name}"] ||= {})
end
end
end
end