lib/beanstalk-client/connection.rb in beanstalk-client-0.6.1 vs lib/beanstalk-client/connection.rb in beanstalk-client-0.7.0
- old
+ new
@@ -27,17 +27,26 @@
attr_reader :addr
def initialize(addr, jptr=self)
@addr = addr
@jptr = jptr
+ connect
+ end
+
+ def connect
host, port = addr.split(':')
@socket = TCPSocket.new(host, port.to_i)
# Don't leak fds when we exec.
@socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC)
end
+ def close
+ @socket.close
+ @socket = nil
+ end
+
def put(body, pri=65536, delay=0, ttr=120)
@socket.write("put #{pri} #{delay} #{ttr} #{body.size}\r\n#{body}\r\n")
check_resp('INSERTED', 'BURIED')[0].to_i
end
@@ -215,12 +224,16 @@
def open_connections()
@connections.values()
end
+ def last_server
+ @last_conn.addr
+ end
+
def send_to_conn(sel, *args)
- pick_connection.send(sel, *args)
+ (@last_conn = pick_connection).send(sel, *args)
rescue DrainingError
# Don't reconnect -- we're not interested in this server
retry
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE
connect()
@@ -247,9 +260,16 @@
raw_stats.values.inject({}){|sums,h| sums.merge(h) {|k,o,n| o + n}}
end
def remove(conn)
@connections.delete(conn.addr)
+ end
+
+ def close
+ while @connections.size > 0
+ addr, conn = @connections.pop
+ conn.close
+ end
end
def peek()
open_connections.each do |c|
job = c.peek