lib/ruote/couch/storage.rb in ruote-couch-2.1.11 vs lib/ruote/couch/storage.rb in ruote-couch-2.2.0

- old
+ new

@@ -1,7 +1,7 @@ #-- -# Copyright (c) 2005-2010, John Mettraux, jmettraux@gmail.com +# Copyright (c) 2005-2011, John Mettraux, jmettraux@gmail.com # # Permission is hereby granted, free of charge, to any person obtaining a copy # of this software and associated documentation files (the "Software"), to deal # in the Software without restriction, including without limitation the rights # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell @@ -36,11 +36,11 @@ # A CouchDB storage mechanism for ruote. # # The storage merely 'routes' work to Ruote::Couch::Database instances, # one per document 'type' (expressions, msgs, schedules, variables, ...) # - class CouchStorage + class Storage include Ruote::StorageBase attr_reader :couch @@ -80,16 +80,18 @@ @dbs['workitems'] = WorkitemDatabase.new( @host, @port, 'workitems', "#{@prefix}ruote_workitems", @options) put_configuration - @msgs_thread = nil + @poll_threads = {} + @msgs_queue = ::Queue.new + @msgs_last_min = nil - @schedules_thread = nil @schedules_queue = ::Queue.new - @schedules = nil + @schedules = {} + @schedules_last_min = nil end def put (doc, opts={}) @dbs[doc['type']].put(doc, opts) @@ -131,13 +133,11 @@ end def shutdown @dbs.values.each { |db| db.shutdown } - - @msgs_thread.kill rescue nil - @schedules_thread.kill rescue nil + @poll_threads.values.each { |t| t.kill rescue nil } end # Mainly used by ruote's test/unit/ut_17_storage.rb # def add_type (type) @@ -187,40 +187,45 @@ # Taking care of using long-polling # (http://wiki.apache.org/couchdb/HTTP_database_API) when possible # def get_msgs - mt = @msgs_thread + mt = @poll_threads['msgs'] ensure_msgs_thread_is_running msgs = [] - 2.times { msgs = get_many('msgs') } if mt != @msgs_thread + 2.times { + (msgs = get_many('msgs')) rescue nil + } if mt != @poll_threads['msgs'] # # seems necessary to avoid any msgs leak :-( + # + # added the "rescue nil", to rescue timeout exceptions while @msgs_queue.size > 0 msgs << @msgs_queue.pop end + if msgs.empty? && Time.now.min != @msgs_last_min + # + # once per minute, do a regular get, to avoid lost msgs + # + begin + msgs = get_many('msgs') + @msgs_last_min = Time.now.min + rescue Rufus::Jig::TimeoutError => te + end + end + msgs end def get_schedules (delta, now) ensure_schedules_thread_is_running - if @schedules.nil? - - # NOTE : the problem with this approach is that ALL the schedules - # are stored in memory. Most of the time it's not a problem, but - # for people will lots of schedules... - - @schedules = get_many('schedules') - @schedules = @schedules.inject({}) { |h, s| h[s['_id']] = s; h } - end - while @schedules_queue.size > 0 deleted, s = @schedules_queue.pop next unless s @@ -230,10 +235,19 @@ else @schedules[s['_id']] = s end end + if Time.now.min != @schedules_last_min + # + # once per minute, do a regular get, to avoid lost schedules + # + @schedules = get_many('schedules') + @schedules = @schedules.inject({}) { |h, s| h[s['_id']] = s; h } + @schedules_last_min = Time.now.min + end + filter_schedules(@schedules.values.reject { |sch| sch['at'].nil? }, now) end protected @@ -244,32 +258,42 @@ conf = { '_id' => 'engine', 'type' => 'configurations' }.merge!(@options) put(conf) end - def ensure_msgs_thread_is_running + def ensure_poll_thread_is_running (doctype, &block) - status = @msgs_thread ? @msgs_thread.status : -1 - return if status == 'run' || status == 'sleep' + if t = @poll_threads[doctype] + return if t.status == 'run' || t.status == 'sleep' # thread is OK + end - @msgs_thread = Thread.new do - @dbs['msgs'].couch.on_change do |_, deleted, doc| - @msgs_queue << doc unless deleted - end + # create or revive thread.... + + @poll_threads[doctype] = Thread.new do + + @dbs[doctype].couch.on_change(&block) rescue nil end end - def ensure_schedules_thread_is_running + def ensure_msgs_thread_is_running - status = @schedules_thread ? @schedules_thread.status : -1 - return if status == 'run' || status == 'sleep' + ensure_poll_thread_is_running 'msgs' do |_, deleted, doc| + @msgs_queue << doc unless deleted + end + end - @schedules_thread = Thread.new do - @dbs['schedules'].couch.on_change do |_, deleted, doc| - @schedules_queue << [ deleted, doc ] - end + def ensure_schedules_thread_is_running + + ensure_poll_thread_is_running 'schedules' do |_, deleted, doc| + @schedules_queue << [ deleted, doc ] end end + end + + # + # Kept for backward compatibility. + # + class CouchStorage < Storage end end end