lib/ruote/couch/storage.rb in ruote-couch-2.1.9 vs lib/ruote/couch/storage.rb in ruote-couch-2.1.10
- old
+ new
@@ -20,10 +20,11 @@
# THE SOFTWARE.
#
# Made in Japan.
#++
+require 'thread'
require 'ruote/storage/base'
require 'ruote/couch/version'
require 'ruote/couch/database'
require 'rufus/jig' # gem install rufus-jig
@@ -61,11 +62,11 @@
@options = options
@prefix = options['couch_prefix'] || options['prefix'] || ''
@prefix = "#{@prefix}_" if @prefix.size > 0
- @zeroes = 21 # maybe make it an option
+ #@zeroes = 21 # maybe make it an option
@timeout = options['couch_timeout'] || 60
@dbs = {}
%w[ msgs schedules configurations variables ].each do |type|
@@ -76,18 +77,25 @@
@dbs['errors'] = WfidIndexedDatabase.new(
@host, @port, 'errors', "#{@prefix}ruote_errors")
@dbs['expressions'] = WfidIndexedDatabase.new(
- @host, @port, 'expressions', "#{@prefix}ruote_expressions", false)
+ #@host, @port, 'expressions', "#{@prefix}ruote_expressions", false)
+ @host, @port, 'expressions', "#{@prefix}ruote_expressions")
@dbs['workitems'] = WorkitemDatabase.new(
@host, @port, 'workitems', "#{@prefix}ruote_workitems")
put_configuration
- @zero_msgs_offset = @zeroes
+ #@zero_msgs_offset = @zeroes
+ @msgs_thread = nil
+ @msgs_queue = ::Queue.new
+
+ @schedules_thread = nil
+ @schedules_queue = ::Queue.new
+ @schedules = nil
end
def put (doc, opts={})
@dbs[doc['type']].put(doc, opts)
@@ -140,21 +148,25 @@
@dbs[type].dump
end
def shutdown
- @poller.kill if @poller
-
#@dbs.values.each { |db| db.shutdown }
+
+ #@poller.kill if @poller
+
+ @msgs_thread.kill rescue nil
+ @schedules_thread.kill rescue nil
end
# Mainly used by ruote's test/unit/ut_17_storage.rb
#
def add_type (type)
@dbs[type] = Database.new(
- @host, @port, type, "#{@prefix}ruote_#{type}", false)
+ #@host, @port, type, "#{@prefix}ruote_#{type}", false)
+ @host, @port, type, "#{@prefix}ruote_#{type}")
end
# Nukes a db type and reputs it (losing all the documents that were in it).
#
def purge_type! (type)
@@ -191,58 +203,47 @@
# Taking care of using long-polling
# (http://wiki.apache.org/couchdb/HTTP_database_API) when possible
#
def get_msgs
- if @zero_msgs_offset > 0
+ ensure_msgs_thread_is_running
- msgs = get_many(
- 'msgs', nil, :limit => 300
- ).sort { |a, b|
- a['put_at'] <=> b['put_at']
- }
+ msgs = []
- @zero_msgs_offset = @zero_msgs_offset - 1 if msgs.size == 0
- return msgs
+ while @msgs_queue.size > 0
+ msgs << @msgs_queue.pop
end
- @zero_msgs_offset = @zeroes
+ msgs
+ end
- schedules = get_many('schedules')
+ def get_schedules (delta, now)
- next_at = schedules.collect { |s| s['at'] }.sort.first
- delta = next_at ? (Time.parse(next_at) - Time.now) : nil
+ ensure_schedules_thread_is_running
- #p [ delta, @timeout ]
+ if @schedules.nil?
- return [] if delta && delta < 5.0
+ # NOTE : the problem with this approach is that ALL the schedules
+ # are stored in memory. Most of the time it's not a problem, but
+ # for people will lots of schedules...
- last_seq = @dbs['msgs'].get('_changes')['last_seq']
+ @schedules = get_many('schedules')
+ @schedules = @schedules.inject({}) { |h, s| h[s['_id']] = s; h }
+ end
- timeout = delta ? delta - 3.0 : -1.0
- timeout = (timeout < 0.0 || timeout > @timeout) ? @timeout : timeout
+ while @schedules_queue.size > 0
- #p [ Time.now, :last_seq, last_seq, :timeout, timeout ]
+ deleted, s = @schedules_queue.pop
- begin
-
- @poller = Thread.current
-
- @dbs['msgs'].get(
- "_changes?feed=longpoll&heartbeat=60000&since=#{last_seq}",
- :timeout => timeout)
- # block until there is a change in the 'msgs' db
-
- rescue Exception => e
- #rescue Rufus::Jig::TimeoutError => te
- # p [ :caught, e.class ]
- # e.backtrace.each { |l| puts l }
- ensure
- @poller = nil
+ if deleted
+ @schedules.delete(s['_id'])
+ else
+ @schedules[s['_id']] = s
+ end
end
- []
+ filter_schedules(@schedules.values, now)
end
protected
def put_configuration
@@ -250,9 +251,33 @@
return if get('configurations', 'engine')
conf = { '_id' => 'engine', 'type' => 'configurations' }.merge!(@options)
put(conf)
+ end
+
+ def ensure_msgs_thread_is_running
+
+ status = @msgs_thread ? @msgs_thread.status : -1
+ return if status == 'run' || status == 'sleep'
+
+ @msgs_thread = Thread.new do
+ @dbs['msgs'].couch.on_change do |_, deleted, doc|
+ @msgs_queue << doc unless deleted
+ end
+ end
+ end
+
+ def ensure_schedules_thread_is_running
+
+ status = @schedules_thread ? @schedules_thread.status : -1
+ return if status == 'run' || status == 'sleep'
+
+ @schedules_thread = Thread.new do
+ @dbs['schedules'].couch.on_change do |_, deleted, doc|
+ @schedules_queue << [ deleted, doc ]
+ end
+ end
end
end
end
end