lib/beanstalk-client/connection.rb in beanstalk-client-0.7.0 vs lib/beanstalk-client/connection.rb in beanstalk-client-0.9.0
- old
+ new
@@ -16,10 +16,11 @@
# along with this program. If not, see <http://www.gnu.org/licenses/>.
require 'socket'
require 'fcntl'
require 'yaml'
+require 'set'
require 'beanstalk-client/bag'
require 'beanstalk-client/errors'
require 'beanstalk-client/job'
module Beanstalk
@@ -89,20 +90,55 @@
@socket.write("bury #{id} #{pri}\r\n")
check_resp('BURIED')
:ok
end
+ def use(tube)
+ @socket.write("use #{tube}\r\n")
+ check_resp('USING')[0]
+ end
+
+ def watch(tube)
+ @socket.write("watch #{tube}\r\n")
+ check_resp('WATCHING')[0]
+ end
+
+ def ignore(tube)
+ @socket.write("ignore #{tube}\r\n")
+ check_resp('WATCHING')[0]
+ end
+
def stats()
@socket.write("stats\r\n")
read_yaml('OK')
end
def job_stats(id)
- @socket.write("stats #{id}\r\n")
+ @socket.write("stats-job #{id}\r\n")
read_yaml('OK')
end
+ def stats_tube(tube)
+ @socket.write("stats-tube #{tube}\r\n")
+ read_yaml('OK')
+ end
+
+ def list_tubes()
+ @socket.write("list-tubes\r\n")
+ read_yaml('OK')
+ end
+
+ def list_tube_used()
+ @socket.write("list-tube-used\r\n")
+ check_resp('USING')[0]
+ end
+
+ def list_tubes_watched()
+ @socket.write("list-tubes-watched\r\n")
+ read_yaml('OK')
+ end
+
private
def get_resp()
r = @socket.gets("\r\n")
raise EOFError if r == nil
@@ -120,14 +156,14 @@
def read_job(word)
# Give the user a chance to select on multiple fds.
Beanstalk.select.call([@socket]) if Beanstalk.select
- id, pri, bytes = check_resp(word).map{|s| s.to_i}
+ id, bytes = check_resp(word).map{|s| s.to_i}
body = read_bytes(bytes)
raise 'bad trailer' if read_bytes(2) != "\r\n"
- [id, pri, body]
+ [id, body]
end
def read_yaml(word)
bytes_s, = check_resp(word)
yaml = read_bytes(bytes_s.to_i)
@@ -228,40 +264,70 @@
def last_server
@last_conn.addr
end
- def send_to_conn(sel, *args)
- (@last_conn = pick_connection).send(sel, *args)
- rescue DrainingError
- # Don't reconnect -- we're not interested in this server
- retry
- rescue EOFError, Errno::ECONNRESET, Errno::EPIPE
- connect()
- retry
+ def send_to_rand_conn(sel, *args)
+ wrap(pick_connection, sel, *args)
end
+ def send_to_all_conns(sel, *args)
+ make_hash(@connections.map{|a, c| [a, wrap(c, sel, *args)]})
+ end
+
def put(body, pri=65536, delay=0, ttr=120)
- send_to_conn(:put, body, pri, delay, ttr)
+ send_to_rand_conn(:put, body, pri, delay, ttr)
end
def yput(obj, pri=65536, delay=0, ttr=120)
- send_to_conn(:yput, obj, pri, delay, ttr)
+ send_to_rand_conn(:yput, obj, pri, delay, ttr)
end
def reserve()
- send_to_conn(:reserve)
+ send_to_rand_conn(:reserve)
end
+ def use(tube)
+ send_to_all_conns(:use, tube)
+ end
+
+ def watch(tube)
+ send_to_all_conns(:watch, tube)
+ end
+
+ def ignore(tube)
+ send_to_all_conns(:ignore, tube)
+ end
+
def raw_stats()
- Hash[*@connections.map{|a,c| [a, c.stats()]}.inject([]){|a,b|a+b}]
+ send_to_all_conns(:stats)
end
def stats()
- raw_stats.values.inject({}){|sums,h| sums.merge(h) {|k,o,n| o + n}}
+ sum_hashes(raw_stats.values)
end
+ def raw_stats_tube(tube)
+ send_to_all_conns(:stats_tube, tube)
+ end
+
+ def stats_tube(tube)
+ sum_hashes(raw_stats_tube(tube).values)
+ end
+
+ def list_tubes()
+ send_to_all_conns(:list_tubes)
+ end
+
+ def list_tube_used()
+ send_to_all_conns(:list_tube_used)
+ end
+
+ def list_tubes_watched()
+ send_to_all_conns(:list_tubes_watched)
+ end
+
def remove(conn)
@connections.delete(conn.addr)
end
def close
@@ -281,8 +347,31 @@
private
def pick_connection()
open_connections[rand(open_connections.size)] or raise NotConnected
+ end
+
+ def wrap(conn, sel, *args)
+ (@last_conn = conn).send(sel, *args)
+ rescue DrainingError
+ # Don't reconnect -- we're not interested in this server
+ retry
+ rescue EOFError, Errno::ECONNRESET, Errno::EPIPE
+ connect()
+ retry
+ end
+
+ def make_hash(pairs)
+ Hash[*pairs.inject([]){|a,b|a+b}]
+ end
+
+ def sum_hashes(hs)
+ hs.inject({}){|a,b| a.merge(b) {|k,o,n| combine_stats(k, o, n)}}
+ end
+
+ DONT_ADD = Set['name', 'version', 'pid']
+ def combine_stats(k, a, b)
+ DONT_ADD.include?(k) ? Set[a] + Set[b] : a + b
end
end
end