lib/ruote/couch/storage.rb in ruote-couch-2.1.11 vs lib/ruote/couch/storage.rb in ruote-couch-2.2.0
- old
+ new
@@ -1,7 +1,7 @@
#--
-# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com
+# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
@@ -36,11 +36,11 @@
# A CouchDB storage mechanism for ruote.
#
# The storage merely 'routes' work to Ruote::Couch::Database instances,
# one per document 'type' (expressions, msgs, schedules, variables, ...)
#
- class CouchStorage
+ class Storage
include Ruote::StorageBase
attr_reader :couch
@@ -80,16 +80,18 @@
@dbs['workitems'] = WorkitemDatabase.new(
@host, @port, 'workitems', "#{@prefix}ruote_workitems", @options)
put_configuration
- @msgs_thread = nil
+ @poll_threads = {}
+
@msgs_queue = ::Queue.new
+ @msgs_last_min = nil
- @schedules_thread = nil
@schedules_queue = ::Queue.new
- @schedules = nil
+ @schedules = {}
+ @schedules_last_min = nil
end
def put (doc, opts={})
@dbs[doc['type']].put(doc, opts)
@@ -131,13 +133,11 @@
end
def shutdown
@dbs.values.each { |db| db.shutdown }
-
- @msgs_thread.kill rescue nil
- @schedules_thread.kill rescue nil
+ @poll_threads.values.each { |t| t.kill rescue nil }
end
# Mainly used by ruote's test/unit/ut_17_storage.rb
#
def add_type (type)
@@ -187,40 +187,45 @@
# Taking care of using long-polling
# (http://wiki.apache.org/couchdb/HTTP_database_API) when possible
#
def get_msgs
- mt = @msgs_thread
+ mt = @poll_threads['msgs']
ensure_msgs_thread_is_running
msgs = []
- 2.times { msgs = get_many('msgs') } if mt != @msgs_thread
+ 2.times {
+ (msgs = get_many('msgs')) rescue nil
+ } if mt != @poll_threads['msgs']
#
# seems necessary to avoid any msgs leak :-(
+ #
+ # added the "rescue nil", to rescue timeout exceptions
while @msgs_queue.size > 0
msgs << @msgs_queue.pop
end
+ if msgs.empty? && Time.now.min != @msgs_last_min
+ #
+ # once per minute, do a regular get, to avoid lost msgs
+ #
+ begin
+ msgs = get_many('msgs')
+ @msgs_last_min = Time.now.min
+ rescue Rufus::Jig::TimeoutError => te
+ end
+ end
+
msgs
end
def get_schedules (delta, now)
ensure_schedules_thread_is_running
- if @schedules.nil?
-
- # NOTE : the problem with this approach is that ALL the schedules
- # are stored in memory. Most of the time it's not a problem, but
- # for people will lots of schedules...
-
- @schedules = get_many('schedules')
- @schedules = @schedules.inject({}) { |h, s| h[s['_id']] = s; h }
- end
-
while @schedules_queue.size > 0
deleted, s = @schedules_queue.pop
next unless s
@@ -230,10 +235,19 @@
else
@schedules[s['_id']] = s
end
end
+ if Time.now.min != @schedules_last_min
+ #
+ # once per minute, do a regular get, to avoid lost schedules
+ #
+ @schedules = get_many('schedules')
+ @schedules = @schedules.inject({}) { |h, s| h[s['_id']] = s; h }
+ @schedules_last_min = Time.now.min
+ end
+
filter_schedules(@schedules.values.reject { |sch| sch['at'].nil? }, now)
end
protected
@@ -244,32 +258,42 @@
conf = { '_id' => 'engine', 'type' => 'configurations' }.merge!(@options)
put(conf)
end
- def ensure_msgs_thread_is_running
+ def ensure_poll_thread_is_running (doctype, &block)
- status = @msgs_thread ? @msgs_thread.status : -1
- return if status == 'run' || status == 'sleep'
+ if t = @poll_threads[doctype]
+ return if t.status == 'run' || t.status == 'sleep' # thread is OK
+ end
- @msgs_thread = Thread.new do
- @dbs['msgs'].couch.on_change do |_, deleted, doc|
- @msgs_queue << doc unless deleted
- end
+ # create or revive thread....
+
+ @poll_threads[doctype] = Thread.new do
+
+ @dbs[doctype].couch.on_change(&block) rescue nil
end
end
- def ensure_schedules_thread_is_running
+ def ensure_msgs_thread_is_running
- status = @schedules_thread ? @schedules_thread.status : -1
- return if status == 'run' || status == 'sleep'
+ ensure_poll_thread_is_running 'msgs' do |_, deleted, doc|
+ @msgs_queue << doc unless deleted
+ end
+ end
- @schedules_thread = Thread.new do
- @dbs['schedules'].couch.on_change do |_, deleted, doc|
- @schedules_queue << [ deleted, doc ]
- end
+ def ensure_schedules_thread_is_running
+
+ ensure_poll_thread_is_running 'schedules' do |_, deleted, doc|
+ @schedules_queue << [ deleted, doc ]
end
end
+ end
+
+ #
+ # Kept for backward compatibility.
+ #
+ class CouchStorage < Storage
end
end
end