lib/ruote/beanstalk/storage.rb in ruote-beanstalk-2.1.10 vs lib/ruote/beanstalk/storage.rb in ruote-beanstalk-2.1.11
- old
+ new
@@ -31,11 +31,11 @@
module Beanstalk
#
# An error class just for BsStorage.
#
- class BsStorageError < RuntimeError
+ class StorageError < RuntimeError
end
#
# This ruote storage can be used in two modes : client and server.
#
@@ -45,34 +45,34 @@
#
# The storage is pointed at a beanstalk queue
#
# engine = Ruote::Engine.new(
# Ruote::Worker.new(
- # Ruote::Beanstalk::BsStorage.new('127.0.0.1:11300', opts)))
+ # Ruote::Beanstalk::Storage.new('127.0.0.1:11300', opts)))
#
- # All the operations (put, get, get_many, ...) of the storage are done
+ # All the operations(put, get, get_many, ...) of the storage are done
# by a server, connected to the same beanstalk queue.
#
# == server
#
# The storage point to a beanstalk queue and receives orders from clients
# via the queue.
#
- # Ruote::Beanstalk::BsStorage.new(':11300', 'ruote_work', :fork => true)
+ # Ruote::Beanstalk::Storage.new(':11300', 'ruote_work', :fork => true)
#
# Note the directory passed as a string. When in server mode, this storage
# uses an embedded Ruote::FsStorage for the actual storage.
#
# The :fork => true lets the storage start and adjacent OS process containing
# the Beanstalk server. The storage takes care of stopping the beanstalk
# server when the Ruby process exits.
#
- class BsStorage
+ class Storage
include Ruote::StorageBase
- def initialize (uri, directory=nil, options=nil)
+ def initialize(uri, directory=nil, options=nil)
@uri, address, port = split_uri(uri)
directory, @options = if directory.nil?
[ nil, {} ]
@@ -111,47 +111,68 @@
put_configuration
serve if @cloche
end
- def put (doc, opts={})
+ # One catch : will return [] in case of [network] error
+ #
+ def get_msgs
+ super rescue []
+ end
+
+ # One catch : will return true (failure) in case of [network] error
+ #
+ def reserve(doc)
+
+ super(doc) rescue true
+ end
+
+ def put(doc, opts={})
+
doc.merge!('put_at' => Ruote.now_to_utc_s)
return @cloche.put(doc, opts) if @cloche
r = operate('put', [ doc ])
return r unless r.nil?
- doc['_rev'] = (doc['_rev'] || -1) + 1 if opts[:update_rev]
+ doc['_rev'] =(doc['_rev'] || -1) + 1 if opts[:update_rev]
nil
end
- def get (type, key)
+ def get(type, key)
return @cloche.get(type, key) if @cloche
operate('get', [ type, key ])
end
- def delete (doc)
+ def delete(doc)
return @cloche.delete(doc) if @cloche
operate('delete', [ doc ])
end
- def get_many (type, key=nil, opts={})
+ def get_many(type, key=nil, opts={})
- return @cloche.get_many(type, key, opts) if @cloche
+ return operate('get_many', [ type, key, opts ]) unless @cloche
- operate('get_many', [ type, key, opts ])
+ if key
+ key = Array(key).collect { |k|
+ k[0..6] == '(?-mix:' ? Regexp.new(k[7..-2]) : "!#{k}"
+ } if key
+ end
+ # assuming /!#{wfid}$/...
+
+ @cloche.get_many(type, key, opts)
end
- def ids (type)
+ def ids(type)
return @cloche.ids(type) if @cloche
operate('ids', [ type ])
end
@@ -163,50 +184,55 @@
else
operate('purge!', [])
end
end
- def dump (type)
+ def dump(type)
get_many(type)
end
def shutdown
Thread.list.each do |t|
t.keys.each do |k|
- next unless k.match(/^BeanstalkConnection\_/)
+ next unless k.to_s.match(CONN_KEY)
t[k].close
t[k] = nil
end
end
end
+ def close
+
+ shutdown
+ end
+
# Mainly used by ruote's test/unit/ut_17_storage.rb
#
- def add_type (type)
+ def add_type(type)
# nothing to do
end
- # Nukes a db type and reputs it (losing all the documents that were in it).
+ # Nukes a db type and reputs it(losing all the documents that were in it).
#
- def purge_type! (type)
+ def purge_type!(type)
if @cloche
@cloche.purge_type!(type)
else
operate('purge_type!', [ type ])
end
end
protected
- CONN_KEY = '__ruote_beanstalk_connection'
+ CONN_KEY = 'ruote-beanstalk-connection'
TUBE_NAME = 'ruote-storage-commands'
- def split_uri (uri)
+ def split_uri(uri)
uri = ':' if uri == ''
address, port = uri.split(':')
address = '127.0.0.1' if address.strip == ''
@@ -216,10 +242,19 @@
end
def connection
c = Thread.current[CONN_KEY]
+
+ #begin
+ # c.stats
+ # return c
+ #rescue Exception => e
+ # c = nil
+ #end if c
+ # keeping around the idea around
+
return c if c
c = ::Beanstalk::Connection.new(@uri, TUBE_NAME)
c.ignore('default')
@@ -235,11 +270,11 @@
return if get('configurations', 'engine')
put({ '_id' => 'engine', 'type' => 'configurations' }.merge(@options))
end
- def operate (command, params)
+ def operate(command, params)
client_id = "BsStorage-#{Thread.current.object_id}-#{$$}"
timestamp = Time.now.to_f.to_s
con = connection
@@ -249,24 +284,28 @@
con.watch(client_id)
con.ignore(TUBE_NAME)
result = nil
- # NOTE : what about a timeout ?
-
loop do
- job = con.reserve
+ job = nil
+ begin
+ job = con.reserve
+ rescue Exception => e
+ # probably our timeout
+ break
+ end
job.delete
result, ts = Rufus::Json.decode(job.body)
break if ts == timestamp # hopefully
end
if result.is_a?(Array) && result.first == 'error'
raise ArgumentError.new(result.last) if result[1] == 'ArgumentError'
- raise BsStorageError.new(result.last)
+ raise StorageError.new(result.last)
end
result
end