lib/beanstalk-client/connection.rb in beanstalk-client-0.7.0 vs lib/beanstalk-client/connection.rb in beanstalk-client-0.9.0

- old
+ new

@@ -16,10 +16,11 @@ # along with this program. If not, see <http://www.gnu.org/licenses/>. require 'socket' require 'fcntl' require 'yaml' +require 'set' require 'beanstalk-client/bag' require 'beanstalk-client/errors' require 'beanstalk-client/job' module Beanstalk @@ -89,20 +90,55 @@ @socket.write("bury #{id} #{pri}\r\n") check_resp('BURIED') :ok end + def use(tube) + @socket.write("use #{tube}\r\n") + check_resp('USING')[0] + end + + def watch(tube) + @socket.write("watch #{tube}\r\n") + check_resp('WATCHING')[0] + end + + def ignore(tube) + @socket.write("ignore #{tube}\r\n") + check_resp('WATCHING')[0] + end + def stats() @socket.write("stats\r\n") read_yaml('OK') end def job_stats(id) - @socket.write("stats #{id}\r\n") + @socket.write("stats-job #{id}\r\n") read_yaml('OK') end + def stats_tube(tube) + @socket.write("stats-tube #{tube}\r\n") + read_yaml('OK') + end + + def list_tubes() + @socket.write("list-tubes\r\n") + read_yaml('OK') + end + + def list_tube_used() + @socket.write("list-tube-used\r\n") + check_resp('USING')[0] + end + + def list_tubes_watched() + @socket.write("list-tubes-watched\r\n") + read_yaml('OK') + end + private def get_resp() r = @socket.gets("\r\n") raise EOFError if r == nil @@ -120,14 +156,14 @@ def read_job(word) # Give the user a chance to select on multiple fds. Beanstalk.select.call([@socket]) if Beanstalk.select - id, pri, bytes = check_resp(word).map{|s| s.to_i} + id, bytes = check_resp(word).map{|s| s.to_i} body = read_bytes(bytes) raise 'bad trailer' if read_bytes(2) != "\r\n" - [id, pri, body] + [id, body] end def read_yaml(word) bytes_s, = check_resp(word) yaml = read_bytes(bytes_s.to_i) @@ -228,40 +264,70 @@ def last_server @last_conn.addr end - def send_to_conn(sel, *args) - (@last_conn = pick_connection).send(sel, *args) - rescue DrainingError - # Don't reconnect -- we're not interested in this server - retry - rescue EOFError, Errno::ECONNRESET, Errno::EPIPE - connect() - retry + def send_to_rand_conn(sel, *args) + wrap(pick_connection, sel, *args) end + def send_to_all_conns(sel, *args) + make_hash(@connections.map{|a, c| [a, wrap(c, sel, *args)]}) + end + def put(body, pri=65536, delay=0, ttr=120) - send_to_conn(:put, body, pri, delay, ttr) + send_to_rand_conn(:put, body, pri, delay, ttr) end def yput(obj, pri=65536, delay=0, ttr=120) - send_to_conn(:yput, obj, pri, delay, ttr) + send_to_rand_conn(:yput, obj, pri, delay, ttr) end def reserve() - send_to_conn(:reserve) + send_to_rand_conn(:reserve) end + def use(tube) + send_to_all_conns(:use, tube) + end + + def watch(tube) + send_to_all_conns(:watch, tube) + end + + def ignore(tube) + send_to_all_conns(:ignore, tube) + end + def raw_stats() - Hash[*@connections.map{|a,c| [a, c.stats()]}.inject([]){|a,b|a+b}] + send_to_all_conns(:stats) end def stats() - raw_stats.values.inject({}){|sums,h| sums.merge(h) {|k,o,n| o + n}} + sum_hashes(raw_stats.values) end + def raw_stats_tube(tube) + send_to_all_conns(:stats_tube, tube) + end + + def stats_tube(tube) + sum_hashes(raw_stats_tube(tube).values) + end + + def list_tubes() + send_to_all_conns(:list_tubes) + end + + def list_tube_used() + send_to_all_conns(:list_tube_used) + end + + def list_tubes_watched() + send_to_all_conns(:list_tubes_watched) + end + def remove(conn) @connections.delete(conn.addr) end def close @@ -281,8 +347,31 @@ private def pick_connection() open_connections[rand(open_connections.size)] or raise NotConnected + end + + def wrap(conn, sel, *args) + (@last_conn = conn).send(sel, *args) + rescue DrainingError + # Don't reconnect -- we're not interested in this server + retry + rescue EOFError, Errno::ECONNRESET, Errno::EPIPE + connect() + retry + end + + def make_hash(pairs) + Hash[*pairs.inject([]){|a,b|a+b}] + end + + def sum_hashes(hs) + hs.inject({}){|a,b| a.merge(b) {|k,o,n| combine_stats(k, o, n)}} + end + + DONT_ADD = Set['name', 'version', 'pid'] + def combine_stats(k, a, b) + DONT_ADD.include?(k) ? Set[a] + Set[b] : a + b end end end