lib/ruote/beanstalk/storage.rb in ruote-beanstalk-2.1.10 vs lib/ruote/beanstalk/storage.rb in ruote-beanstalk-2.1.11

- old
+ new

@@ -31,11 +31,11 @@ module Beanstalk # # An error class just for BsStorage. # - class BsStorageError < RuntimeError + class StorageError < RuntimeError end # # This ruote storage can be used in two modes : client and server. # @@ -45,34 +45,34 @@ # # The storage is pointed at a beanstalk queue # # engine = Ruote::Engine.new( # Ruote::Worker.new( - # Ruote::Beanstalk::BsStorage.new('127.0.0.1:11300', opts))) + # Ruote::Beanstalk::Storage.new('127.0.0.1:11300', opts))) # - # All the operations (put, get, get_many, ...) of the storage are done + # All the operations(put, get, get_many, ...) of the storage are done # by a server, connected to the same beanstalk queue. # # == server # # The storage point to a beanstalk queue and receives orders from clients # via the queue. # - # Ruote::Beanstalk::BsStorage.new(':11300', 'ruote_work', :fork => true) + # Ruote::Beanstalk::Storage.new(':11300', 'ruote_work', :fork => true) # # Note the directory passed as a string. When in server mode, this storage # uses an embedded Ruote::FsStorage for the actual storage. # # The :fork => true lets the storage start and adjacent OS process containing # the Beanstalk server. The storage takes care of stopping the beanstalk # server when the Ruby process exits. # - class BsStorage + class Storage include Ruote::StorageBase - def initialize (uri, directory=nil, options=nil) + def initialize(uri, directory=nil, options=nil) @uri, address, port = split_uri(uri) directory, @options = if directory.nil? [ nil, {} ] @@ -111,47 +111,68 @@ put_configuration serve if @cloche end - def put (doc, opts={}) + # One catch : will return [] in case of [network] error + # + def get_msgs + super rescue [] + end + + # One catch : will return true (failure) in case of [network] error + # + def reserve(doc) + + super(doc) rescue true + end + + def put(doc, opts={}) + doc.merge!('put_at' => Ruote.now_to_utc_s) return @cloche.put(doc, opts) if @cloche r = operate('put', [ doc ]) return r unless r.nil? - doc['_rev'] = (doc['_rev'] || -1) + 1 if opts[:update_rev] + doc['_rev'] =(doc['_rev'] || -1) + 1 if opts[:update_rev] nil end - def get (type, key) + def get(type, key) return @cloche.get(type, key) if @cloche operate('get', [ type, key ]) end - def delete (doc) + def delete(doc) return @cloche.delete(doc) if @cloche operate('delete', [ doc ]) end - def get_many (type, key=nil, opts={}) + def get_many(type, key=nil, opts={}) - return @cloche.get_many(type, key, opts) if @cloche + return operate('get_many', [ type, key, opts ]) unless @cloche - operate('get_many', [ type, key, opts ]) + if key + key = Array(key).collect { |k| + k[0..6] == '(?-mix:' ? Regexp.new(k[7..-2]) : "!#{k}" + } if key + end + # assuming /!#{wfid}$/... + + @cloche.get_many(type, key, opts) end - def ids (type) + def ids(type) return @cloche.ids(type) if @cloche operate('ids', [ type ]) end @@ -163,50 +184,55 @@ else operate('purge!', []) end end - def dump (type) + def dump(type) get_many(type) end def shutdown Thread.list.each do |t| t.keys.each do |k| - next unless k.match(/^BeanstalkConnection\_/) + next unless k.to_s.match(CONN_KEY) t[k].close t[k] = nil end end end + def close + + shutdown + end + # Mainly used by ruote's test/unit/ut_17_storage.rb # - def add_type (type) + def add_type(type) # nothing to do end - # Nukes a db type and reputs it (losing all the documents that were in it). + # Nukes a db type and reputs it(losing all the documents that were in it). # - def purge_type! (type) + def purge_type!(type) if @cloche @cloche.purge_type!(type) else operate('purge_type!', [ type ]) end end protected - CONN_KEY = '__ruote_beanstalk_connection' + CONN_KEY = 'ruote-beanstalk-connection' TUBE_NAME = 'ruote-storage-commands' - def split_uri (uri) + def split_uri(uri) uri = ':' if uri == '' address, port = uri.split(':') address = '127.0.0.1' if address.strip == '' @@ -216,10 +242,19 @@ end def connection c = Thread.current[CONN_KEY] + + #begin + # c.stats + # return c + #rescue Exception => e + # c = nil + #end if c + # keeping around the idea around + return c if c c = ::Beanstalk::Connection.new(@uri, TUBE_NAME) c.ignore('default') @@ -235,11 +270,11 @@ return if get('configurations', 'engine') put({ '_id' => 'engine', 'type' => 'configurations' }.merge(@options)) end - def operate (command, params) + def operate(command, params) client_id = "BsStorage-#{Thread.current.object_id}-#{$$}" timestamp = Time.now.to_f.to_s con = connection @@ -249,24 +284,28 @@ con.watch(client_id) con.ignore(TUBE_NAME) result = nil - # NOTE : what about a timeout ? - loop do - job = con.reserve + job = nil + begin + job = con.reserve + rescue Exception => e + # probably our timeout + break + end job.delete result, ts = Rufus::Json.decode(job.body) break if ts == timestamp # hopefully end if result.is_a?(Array) && result.first == 'error' raise ArgumentError.new(result.last) if result[1] == 'ArgumentError' - raise BsStorageError.new(result.last) + raise StorageError.new(result.last) end result end