lib/beanstalk-client/connection.rb in beanstalk-client-0.11.0 vs lib/beanstalk-client/connection.rb in beanstalk-client-1.0.0

- old
+ new

@@ -24,16 +24,19 @@ module Beanstalk class Connection attr_reader :addr - def initialize(addr, jptr=self) + def initialize(addr, jptr=self, default_tube=nil) + @waiting = false @addr = addr @jptr = jptr connect @last_used = 'default' - @watch_list = ['default'] + @watch_list = [@last_used] + self.use(default_tube) if default_tube + self.watch(default_tube) if default_tube end def connect host, port = addr.split(':') @socket = TCPSocket.new(host, port.to_i) @@ -105,17 +108,21 @@ end def use(tube) return tube if tube == @last_used @last_used = interact("use #{tube}\r\n", %w(USING))[0] + rescue BadFormatError + raise InvalidTubeName.new(tube) end def watch(tube) return @watch_list.size if @watch_list.include?(tube) r = interact("watch #{tube}\r\n", %w(WATCHING))[0].to_i @watch_list += [tube] return r + rescue BadFormatError + raise InvalidTubeName.new(tube) end def ignore(tube) return @watch_list.size if !@watch_list.include?(tube) r = interact("ignore #{tube}\r\n", %w(WATCHING))[0].to_i @@ -199,42 +206,32 @@ raise EOFError, 'End of file reached' if str.size < n str end end - class CleanupWrapper - def initialize(addr, multi) - @conn = Connection.new(addr, self) - @multi = multi - end - - def method_missing(selector, *args, &block) - begin - @multi.last_conn = @conn - @conn.send(selector, *args, &block) - rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, UnexpectedResponse => ex - @multi.remove(@conn) - raise ex - end - end - end - class Pool attr_accessor :last_conn - def initialize(addrs) + def initialize(addrs, default_tube=nil) @addrs = addrs + @watch_list = ['default'] + @default_tube=default_tube + @watch_list = [default_tube] if default_tube connect() end def connect() @connections ||= {} @addrs.each do |addr| begin if !@connections.include?(addr) puts "connecting to beanstalk at #{addr}" - @connections[addr] = CleanupWrapper.new(addr, self) + @connections[addr] = Connection.new(addr, self, @default_tube) + prev_watched = @connections[addr].list_tubes_watched() + to_ignore = prev_watched - @watch_list + @watch_list.each{|tube| @connections[addr].watch(tube)} + to_ignore.each{|tube| @connections[addr].ignore(tube)} end rescue Exception => ex puts "#{ex.class}: #{ex}" #puts begin ex.fixed_backtrace rescue ex.backtrace end end @@ -265,15 +262,19 @@ def use(tube) send_to_all_conns(:use, tube) end def watch(tube) - send_to_all_conns(:watch, tube) + r = send_to_all_conns(:watch, tube) + @watch_list = send_to_rand_conn(:list_tubes_watched, true) + return r end def ignore(tube) - send_to_all_conns(:ignore, tube) + r = send_to_all_conns(:ignore, tube) + @watch_list = send_to_rand_conn(:list_tubes_watched, true) + return r end def raw_stats() send_to_all_conns(:stats) end @@ -296,12 +297,12 @@ def list_tube_used() send_to_all_conns(:list_tube_used) end - def list_tubes_watched() - send_to_all_conns(:list_tubes_watched) + def list_tubes_watched(*args) + send_to_all_conns(:list_tubes_watched, *args) end def remove(conn) @connections.delete(conn.addr) end @@ -331,11 +332,19 @@ make_hash(send_to_all_conns(:peek_job, id)) end private - def wrap(*args) + def call_wrap(c, *args) + self.last_conn = c + c.send(*args) + rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, UnexpectedResponse => ex + self.remove(c) + raise ex + end + + def retry_wrap(*args) yield rescue DrainingError # Don't reconnect -- we're not interested in this server retry rescue EOFError, Errno::ECONNRESET, Errno::EPIPE @@ -343,20 +352,20 @@ retry end def send_to_each_conn_first_res(*args) connect() - wrap{open_connections.inject(nil) {|r,c| r or c.send(*args)}} + retry_wrap{open_connections.inject(nil) {|r,c| r or call_wrap(c, *args)}} end def send_to_rand_conn(*args) connect() - wrap{pick_connection.send(*args)} + retry_wrap{call_wrap(pick_connection, *args)} end def send_to_all_conns(*args) connect() - wrap{compact_hash(map_hash(@connections){|c| c.send(*args)})} + retry_wrap{compact_hash(map_hash(@connections){|c| call_wrap(c, *args)})} end def pick_connection() open_connections[rand(open_connections.size)] or raise NotConnected end