lib/ruote/couch/storage.rb in ruote-couch-2.1.7 vs lib/ruote/couch/storage.rb in ruote-couch-2.1.8

- old
+ new

@@ -43,23 +43,31 @@ attr_reader :couch # Hooks the storage to a CouchDB instance. # - # The main option is 'prefix', which indicate which prefix should be + # The main option is 'couch_prefix', which indicate which prefix should be # added to all the database names used by this storage. # + # The option 'couch_timeout' is used what is the get_msgs timeout. This + # is the long-polling timeout. For functional test it is set to two seconds + # but for a production system, something like 10 minutes or 8 hours might + # be OK. + # def initialize (host, port, options={}) @host = host @port = port @options = options - @prefix = options['prefix'] || '' + @prefix = options['couch_prefix'] || options['prefix'] || '' @prefix = "#{@prefix}_" if @prefix.size > 0 + @zeroes = 21 # maybe make it an option + @timeout = options['couch_timeout'] || 60 + @dbs = {} %w[ msgs schedules configurations variables ].each do |type| @dbs[type] = Database.new( @@ -74,10 +82,12 @@ @dbs['workitems'] = WorkitemDatabase.new( @host, @port, 'workitems', "#{@prefix}ruote_workitems") put_configuration + + @zero_msgs_offset = @zeroes end def put (doc, opts={}) @dbs[doc['type']].put(doc, opts) @@ -117,10 +127,12 @@ @dbs[type].dump end def shutdown + @poller.kill if @poller + @dbs.values.each { |db| db.shutdown } end # Mainly used by ruote's test/unit/ut_17_storage.rb # @@ -157,9 +169,66 @@ end def query_workitems (criteria) @dbs['workitems'].query_workitems(criteria) + end + + # Overwriting Ruote::StorageBase.get_msgs + # + # Taking care of using long-polling + # (http://wiki.apache.org/couchdb/HTTP_database_API) when possible + # + def get_msgs + + if @zero_msgs_offset > 0 + + msgs = get_many( + 'msgs', nil, :limit => 300 + ).sort { |a, b| + a['put_at'] <=> b['put_at'] + } + + @zero_msgs_offset = @zero_msgs_offset - 1 if msgs.size == 0 + return msgs + end + + @zero_msgs_offset = @zeroes + + schedules = get_many('schedules') + + next_at = schedules.collect { |s| s['at'] }.sort.first + delta = next_at ? (Time.parse(next_at) - Time.now) : nil + + #p [ delta, @timeout ] + + return [] if delta && delta < 5.0 + + last_seq = @dbs['msgs'].get('_changes')['last_seq'] + + timeout = delta ? delta - 3.0 : -1.0 + timeout = (timeout < 0.0 || timeout > @timeout) ? @timeout : timeout + + #p [ Time.now, :last_seq, last_seq, :timeout, timeout ] + + begin + + @poller = Thread.current + + @dbs['msgs'].get( + "_changes?feed=longpoll&heartbeat=60000&since=#{last_seq}", + :timeout => timeout) + # block until there is a change in the 'msgs' db + + rescue Exception => e + #rescue Rufus::Jig::TimeoutError => te + # p [ :caught, e.class ] + # e.backtrace.each { |l| puts l } + ensure + @poller = nil + end + + [] end protected def put_configuration