lib/beanstalk-client/connection.rb in beanstalk-client-0.11.0 vs lib/beanstalk-client/connection.rb in beanstalk-client-1.0.0
- old
+ new
@@ -24,16 +24,19 @@
module Beanstalk
class Connection
attr_reader :addr
- def initialize(addr, jptr=self)
+ def initialize(addr, jptr=self, default_tube=nil)
+ @waiting = false
@addr = addr
@jptr = jptr
connect
@last_used = 'default'
- @watch_list = ['default']
+ @watch_list = [@last_used]
+ self.use(default_tube) if default_tube
+ self.watch(default_tube) if default_tube
end
def connect
host, port = addr.split(':')
@socket = TCPSocket.new(host, port.to_i)
@@ -105,17 +108,21 @@
end
def use(tube)
return tube if tube == @last_used
@last_used = interact("use #{tube}\r\n", %w(USING))[0]
+ rescue BadFormatError
+ raise InvalidTubeName.new(tube)
end
def watch(tube)
return @watch_list.size if @watch_list.include?(tube)
r = interact("watch #{tube}\r\n", %w(WATCHING))[0].to_i
@watch_list += [tube]
return r
+ rescue BadFormatError
+ raise InvalidTubeName.new(tube)
end
def ignore(tube)
return @watch_list.size if !@watch_list.include?(tube)
r = interact("ignore #{tube}\r\n", %w(WATCHING))[0].to_i
@@ -199,42 +206,32 @@
raise EOFError, 'End of file reached' if str.size < n
str
end
end
- class CleanupWrapper
- def initialize(addr, multi)
- @conn = Connection.new(addr, self)
- @multi = multi
- end
-
- def method_missing(selector, *args, &block)
- begin
- @multi.last_conn = @conn
- @conn.send(selector, *args, &block)
- rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, UnexpectedResponse => ex
- @multi.remove(@conn)
- raise ex
- end
- end
- end
-
class Pool
attr_accessor :last_conn
- def initialize(addrs)
+ def initialize(addrs, default_tube=nil)
@addrs = addrs
+ @watch_list = ['default']
+ @default_tube=default_tube
+ @watch_list = [default_tube] if default_tube
connect()
end
def connect()
@connections ||= {}
@addrs.each do |addr|
begin
if !@connections.include?(addr)
puts "connecting to beanstalk at #{addr}"
- @connections[addr] = CleanupWrapper.new(addr, self)
+ @connections[addr] = Connection.new(addr, self, @default_tube)
+ prev_watched = @connections[addr].list_tubes_watched()
+ to_ignore = prev_watched - @watch_list
+ @watch_list.each{|tube| @connections[addr].watch(tube)}
+ to_ignore.each{|tube| @connections[addr].ignore(tube)}
end
rescue Exception => ex
puts "#{ex.class}: #{ex}"
#puts begin ex.fixed_backtrace rescue ex.backtrace end
end
@@ -265,15 +262,19 @@
def use(tube)
send_to_all_conns(:use, tube)
end
def watch(tube)
- send_to_all_conns(:watch, tube)
+ r = send_to_all_conns(:watch, tube)
+ @watch_list = send_to_rand_conn(:list_tubes_watched, true)
+ return r
end
def ignore(tube)
- send_to_all_conns(:ignore, tube)
+ r = send_to_all_conns(:ignore, tube)
+ @watch_list = send_to_rand_conn(:list_tubes_watched, true)
+ return r
end
def raw_stats()
send_to_all_conns(:stats)
end
@@ -296,12 +297,12 @@
def list_tube_used()
send_to_all_conns(:list_tube_used)
end
- def list_tubes_watched()
- send_to_all_conns(:list_tubes_watched)
+ def list_tubes_watched(*args)
+ send_to_all_conns(:list_tubes_watched, *args)
end
def remove(conn)
@connections.delete(conn.addr)
end
@@ -331,11 +332,19 @@
make_hash(send_to_all_conns(:peek_job, id))
end
private
- def wrap(*args)
+ def call_wrap(c, *args)
+ self.last_conn = c
+ c.send(*args)
+ rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, UnexpectedResponse => ex
+ self.remove(c)
+ raise ex
+ end
+
+ def retry_wrap(*args)
yield
rescue DrainingError
# Don't reconnect -- we're not interested in this server
retry
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE
@@ -343,20 +352,20 @@
retry
end
def send_to_each_conn_first_res(*args)
connect()
- wrap{open_connections.inject(nil) {|r,c| r or c.send(*args)}}
+ retry_wrap{open_connections.inject(nil) {|r,c| r or call_wrap(c, *args)}}
end
def send_to_rand_conn(*args)
connect()
- wrap{pick_connection.send(*args)}
+ retry_wrap{call_wrap(pick_connection, *args)}
end
def send_to_all_conns(*args)
connect()
- wrap{compact_hash(map_hash(@connections){|c| c.send(*args)})}
+ retry_wrap{compact_hash(map_hash(@connections){|c| call_wrap(c, *args)})}
end
def pick_connection()
open_connections[rand(open_connections.size)] or raise NotConnected
end