lib/ruote/couch/storage.rb in ruote-couch-2.1.9 vs lib/ruote/couch/storage.rb in ruote-couch-2.1.10

- old
+ new

@@ -20,10 +20,11 @@ # THE SOFTWARE. # # Made in Japan. #++ +require 'thread' require 'ruote/storage/base' require 'ruote/couch/version' require 'ruote/couch/database' require 'rufus/jig' # gem install rufus-jig @@ -61,11 +62,11 @@ @options = options @prefix = options['couch_prefix'] || options['prefix'] || '' @prefix = "#{@prefix}_" if @prefix.size > 0 - @zeroes = 21 # maybe make it an option + #@zeroes = 21 # maybe make it an option @timeout = options['couch_timeout'] || 60 @dbs = {} %w[ msgs schedules configurations variables ].each do |type| @@ -76,18 +77,25 @@ @dbs['errors'] = WfidIndexedDatabase.new( @host, @port, 'errors', "#{@prefix}ruote_errors") @dbs['expressions'] = WfidIndexedDatabase.new( - @host, @port, 'expressions', "#{@prefix}ruote_expressions", false) + #@host, @port, 'expressions', "#{@prefix}ruote_expressions", false) + @host, @port, 'expressions', "#{@prefix}ruote_expressions") @dbs['workitems'] = WorkitemDatabase.new( @host, @port, 'workitems', "#{@prefix}ruote_workitems") put_configuration - @zero_msgs_offset = @zeroes + #@zero_msgs_offset = @zeroes + @msgs_thread = nil + @msgs_queue = ::Queue.new + + @schedules_thread = nil + @schedules_queue = ::Queue.new + @schedules = nil end def put (doc, opts={}) @dbs[doc['type']].put(doc, opts) @@ -140,21 +148,25 @@ @dbs[type].dump end def shutdown - @poller.kill if @poller - #@dbs.values.each { |db| db.shutdown } + + #@poller.kill if @poller + + @msgs_thread.kill rescue nil + @schedules_thread.kill rescue nil end # Mainly used by ruote's test/unit/ut_17_storage.rb # def add_type (type) @dbs[type] = Database.new( - @host, @port, type, "#{@prefix}ruote_#{type}", false) + #@host, @port, type, "#{@prefix}ruote_#{type}", false) + @host, @port, type, "#{@prefix}ruote_#{type}") end # Nukes a db type and reputs it (losing all the documents that were in it). # def purge_type! (type) @@ -191,58 +203,47 @@ # Taking care of using long-polling # (http://wiki.apache.org/couchdb/HTTP_database_API) when possible # def get_msgs - if @zero_msgs_offset > 0 + ensure_msgs_thread_is_running - msgs = get_many( - 'msgs', nil, :limit => 300 - ).sort { |a, b| - a['put_at'] <=> b['put_at'] - } + msgs = [] - @zero_msgs_offset = @zero_msgs_offset - 1 if msgs.size == 0 - return msgs + while @msgs_queue.size > 0 + msgs << @msgs_queue.pop end - @zero_msgs_offset = @zeroes + msgs + end - schedules = get_many('schedules') + def get_schedules (delta, now) - next_at = schedules.collect { |s| s['at'] }.sort.first - delta = next_at ? (Time.parse(next_at) - Time.now) : nil + ensure_schedules_thread_is_running - #p [ delta, @timeout ] + if @schedules.nil? - return [] if delta && delta < 5.0 + # NOTE : the problem with this approach is that ALL the schedules + # are stored in memory. Most of the time it's not a problem, but + # for people will lots of schedules... - last_seq = @dbs['msgs'].get('_changes')['last_seq'] + @schedules = get_many('schedules') + @schedules = @schedules.inject({}) { |h, s| h[s['_id']] = s; h } + end - timeout = delta ? delta - 3.0 : -1.0 - timeout = (timeout < 0.0 || timeout > @timeout) ? @timeout : timeout + while @schedules_queue.size > 0 - #p [ Time.now, :last_seq, last_seq, :timeout, timeout ] + deleted, s = @schedules_queue.pop - begin - - @poller = Thread.current - - @dbs['msgs'].get( - "_changes?feed=longpoll&heartbeat=60000&since=#{last_seq}", - :timeout => timeout) - # block until there is a change in the 'msgs' db - - rescue Exception => e - #rescue Rufus::Jig::TimeoutError => te - # p [ :caught, e.class ] - # e.backtrace.each { |l| puts l } - ensure - @poller = nil + if deleted + @schedules.delete(s['_id']) + else + @schedules[s['_id']] = s + end end - [] + filter_schedules(@schedules.values, now) end protected def put_configuration @@ -250,9 +251,33 @@ return if get('configurations', 'engine') conf = { '_id' => 'engine', 'type' => 'configurations' }.merge!(@options) put(conf) + end + + def ensure_msgs_thread_is_running + + status = @msgs_thread ? @msgs_thread.status : -1 + return if status == 'run' || status == 'sleep' + + @msgs_thread = Thread.new do + @dbs['msgs'].couch.on_change do |_, deleted, doc| + @msgs_queue << doc unless deleted + end + end + end + + def ensure_schedules_thread_is_running + + status = @schedules_thread ? @schedules_thread.status : -1 + return if status == 'run' || status == 'sleep' + + @schedules_thread = Thread.new do + @dbs['schedules'].couch.on_change do |_, deleted, doc| + @schedules_queue << [ deleted, doc ] + end + end end end end end