lib/beanstalk-client/connection.rb in beanstalk-client-0.10.0 vs lib/beanstalk-client/connection.rb in beanstalk-client-0.11.0
- old
+ new
@@ -207,19 +207,22 @@
@multi = multi
end
def method_missing(selector, *args, &block)
begin
+ @multi.last_conn = @conn
@conn.send(selector, *args, &block)
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, UnexpectedResponse => ex
@multi.remove(@conn)
raise ex
end
end
end
class Pool
+ attr_accessor :last_conn
+
def initialize(addrs)
@addrs = addrs
connect()
end
@@ -328,41 +331,44 @@
make_hash(send_to_all_conns(:peek_job, id))
end
private
- def send_to_each_conn_first_res(sel, *args)
- open_connections.each do |c|
- x = wrap(c, sel, *args)
- return x if x
- end
- nil
+ def wrap(*args)
+ yield
+ rescue DrainingError
+ # Don't reconnect -- we're not interested in this server
+ retry
+ rescue EOFError, Errno::ECONNRESET, Errno::EPIPE
+ connect()
+ retry
end
- def send_to_rand_conn(sel, *args)
- wrap(pick_connection, sel, *args)
+ def send_to_each_conn_first_res(*args)
+ connect()
+ wrap{open_connections.inject(nil) {|r,c| r or c.send(*args)}}
end
- def send_to_all_conns(sel, *args)
- compact_hash(make_hash(@connections.map{|a, c| [a, wrap(c, sel, *args)]}))
+ def send_to_rand_conn(*args)
+ connect()
+ wrap{pick_connection.send(*args)}
end
+ def send_to_all_conns(*args)
+ connect()
+ wrap{compact_hash(map_hash(@connections){|c| c.send(*args)})}
+ end
+
def pick_connection()
open_connections[rand(open_connections.size)] or raise NotConnected
end
- def wrap(conn, sel, *args)
- (@last_conn = conn).send(sel, *args)
- rescue DrainingError
- # Don't reconnect -- we're not interested in this server
- retry
- rescue EOFError, Errno::ECONNRESET, Errno::EPIPE
- connect()
- retry
- end
-
def make_hash(pairs)
Hash[*pairs.inject([]){|a,b|a+b}]
+ end
+
+ def map_hash(h)
+ make_hash(h.map{|k,v| [k, yield(v)]})
end
def compact_hash(hash)
hash.reject{|k,v| v == nil}
end