lib/ruote/couch/storage.rb in ruote-couch-2.1.7 vs lib/ruote/couch/storage.rb in ruote-couch-2.1.8
- old
+ new
@@ -43,23 +43,31 @@
attr_reader :couch
# Hooks the storage to a CouchDB instance.
#
- # The main option is 'prefix', which indicate which prefix should be
+ # The main option is 'couch_prefix', which indicate which prefix should be
# added to all the database names used by this storage.
#
+ # The option 'couch_timeout' is used what is the get_msgs timeout. This
+ # is the long-polling timeout. For functional test it is set to two seconds
+ # but for a production system, something like 10 minutes or 8 hours might
+ # be OK.
+ #
def initialize (host, port, options={})
@host = host
@port = port
@options = options
- @prefix = options['prefix'] || ''
+ @prefix = options['couch_prefix'] || options['prefix'] || ''
@prefix = "#{@prefix}_" if @prefix.size > 0
+ @zeroes = 21 # maybe make it an option
+ @timeout = options['couch_timeout'] || 60
+
@dbs = {}
%w[ msgs schedules configurations variables ].each do |type|
@dbs[type] = Database.new(
@@ -74,10 +82,12 @@
@dbs['workitems'] = WorkitemDatabase.new(
@host, @port, 'workitems', "#{@prefix}ruote_workitems")
put_configuration
+
+ @zero_msgs_offset = @zeroes
end
def put (doc, opts={})
@dbs[doc['type']].put(doc, opts)
@@ -117,10 +127,12 @@
@dbs[type].dump
end
def shutdown
+ @poller.kill if @poller
+
@dbs.values.each { |db| db.shutdown }
end
# Mainly used by ruote's test/unit/ut_17_storage.rb
#
@@ -157,9 +169,66 @@
end
def query_workitems (criteria)
@dbs['workitems'].query_workitems(criteria)
+ end
+
+ # Overwriting Ruote::StorageBase.get_msgs
+ #
+ # Taking care of using long-polling
+ # (http://wiki.apache.org/couchdb/HTTP_database_API) when possible
+ #
+ def get_msgs
+
+ if @zero_msgs_offset > 0
+
+ msgs = get_many(
+ 'msgs', nil, :limit => 300
+ ).sort { |a, b|
+ a['put_at'] <=> b['put_at']
+ }
+
+ @zero_msgs_offset = @zero_msgs_offset - 1 if msgs.size == 0
+ return msgs
+ end
+
+ @zero_msgs_offset = @zeroes
+
+ schedules = get_many('schedules')
+
+ next_at = schedules.collect { |s| s['at'] }.sort.first
+ delta = next_at ? (Time.parse(next_at) - Time.now) : nil
+
+ #p [ delta, @timeout ]
+
+ return [] if delta && delta < 5.0
+
+ last_seq = @dbs['msgs'].get('_changes')['last_seq']
+
+ timeout = delta ? delta - 3.0 : -1.0
+ timeout = (timeout < 0.0 || timeout > @timeout) ? @timeout : timeout
+
+ #p [ Time.now, :last_seq, last_seq, :timeout, timeout ]
+
+ begin
+
+ @poller = Thread.current
+
+ @dbs['msgs'].get(
+ "_changes?feed=longpoll&heartbeat=60000&since=#{last_seq}",
+ :timeout => timeout)
+ # block until there is a change in the 'msgs' db
+
+ rescue Exception => e
+ #rescue Rufus::Jig::TimeoutError => te
+ # p [ :caught, e.class ]
+ # e.backtrace.each { |l| puts l }
+ ensure
+ @poller = nil
+ end
+
+ []
end
protected
def put_configuration