lib/beanstalk-client/connection.rb in beanstalk-client-0.9.0 vs lib/beanstalk-client/connection.rb in beanstalk-client-0.10.0
- old
+ new
@@ -17,22 +17,23 @@
require 'socket'
require 'fcntl'
require 'yaml'
require 'set'
-require 'beanstalk-client/bag'
require 'beanstalk-client/errors'
require 'beanstalk-client/job'
module Beanstalk
- class RawConnection
+ class Connection
attr_reader :addr
def initialize(addr, jptr=self)
@addr = addr
@jptr = jptr
connect
+ @last_used = 'default'
+ @watch_list = ['default']
end
def connect
host, port = addr.split(':')
@socket = TCPSocket.new(host, port.to_i)
@@ -45,121 +46,142 @@
@socket.close
@socket = nil
end
def put(body, pri=65536, delay=0, ttr=120)
- @socket.write("put #{pri} #{delay} #{ttr} #{body.size}\r\n#{body}\r\n")
- check_resp('INSERTED', 'BURIED')[0].to_i
+ interact("put #{pri} #{delay} #{ttr} #{body.size}\r\n#{body}\r\n",
+ %w(INSERTED BURIED))[0].to_i
end
def yput(obj, pri=65536, delay=0, ttr=120)
put(YAML.dump(obj), pri, delay, ttr)
end
- def peek()
- @socket.write("peek\r\n")
- begin
- Job.new(@jptr, *read_job('FOUND'))
- rescue UnexpectedResponse
- nil
- end
+ def peek_job(id)
+ interact("peek #{id}\r\n", :job)
end
- def peek_job(id)
- @socket.write("peek #{id}\r\n")
- Job.new(@jptr, *read_job('FOUND'))
+ def peek_ready()
+ interact("peek-ready\r\n", :job)
end
+ def peek_delayed()
+ interact("peek-delayed\r\n", :job)
+ end
+
+ def peek_buried()
+ interact("peek-buried\r\n", :job)
+ end
+
def reserve()
+ raise WaitingForJobError if @waiting
@socket.write("reserve\r\n")
+
+ begin
+ @waiting = true
+ # Give the user a chance to select on multiple fds.
+ Beanstalk.select.call([@socket]) if Beanstalk.select
+ rescue WaitingForJobError
+ # just continue
+ ensure
+ @waiting = false
+ end
+
Job.new(@jptr, *read_job('RESERVED'))
end
def delete(id)
- @socket.write("delete #{id}\r\n")
- check_resp('DELETED')
+ interact("delete #{id}\r\n", %w(DELETED))
:ok
end
def release(id, pri, delay)
- @socket.write("release #{id} #{pri} #{delay}\r\n")
- check_resp('RELEASED')
+ interact("release #{id} #{pri} #{delay}\r\n", %w(RELEASED))
:ok
end
def bury(id, pri)
- @socket.write("bury #{id} #{pri}\r\n")
- check_resp('BURIED')
+ interact("bury #{id} #{pri}\r\n", %w(BURIED))
:ok
end
def use(tube)
- @socket.write("use #{tube}\r\n")
- check_resp('USING')[0]
+ return tube if tube == @last_used
+ @last_used = interact("use #{tube}\r\n", %w(USING))[0]
end
def watch(tube)
- @socket.write("watch #{tube}\r\n")
- check_resp('WATCHING')[0]
+ return @watch_list.size if @watch_list.include?(tube)
+ r = interact("watch #{tube}\r\n", %w(WATCHING))[0].to_i
+ @watch_list += [tube]
+ return r
end
def ignore(tube)
- @socket.write("ignore #{tube}\r\n")
- check_resp('WATCHING')[0]
+ return @watch_list.size if !@watch_list.include?(tube)
+ r = interact("ignore #{tube}\r\n", %w(WATCHING))[0].to_i
+ @watch_list -= [tube]
+ return r
end
def stats()
- @socket.write("stats\r\n")
- read_yaml('OK')
+ interact("stats\r\n", :yaml)
end
def job_stats(id)
- @socket.write("stats-job #{id}\r\n")
- read_yaml('OK')
+ interact("stats-job #{id}\r\n", :yaml)
end
def stats_tube(tube)
- @socket.write("stats-tube #{tube}\r\n")
- read_yaml('OK')
+ interact("stats-tube #{tube}\r\n", :yaml)
end
def list_tubes()
- @socket.write("list-tubes\r\n")
- read_yaml('OK')
+ interact("list-tubes\r\n", :yaml)
end
def list_tube_used()
- @socket.write("list-tube-used\r\n")
- check_resp('USING')[0]
+ interact("list-tube-used\r\n", %w(USING))[0]
end
- def list_tubes_watched()
- @socket.write("list-tubes-watched\r\n")
- read_yaml('OK')
+ def list_tubes_watched(cached=false)
+ return @watch_list if cached
+ @watch_list = interact("list-tubes-watched\r\n", :yaml)
end
private
+ def interact(cmd, rfmt)
+ raise WaitingForJobError if @waiting
+ @socket.write(cmd)
+ return read_yaml('OK') if rfmt == :yaml
+ return found_job if rfmt == :job
+ check_resp(*rfmt)
+ end
+
def get_resp()
r = @socket.gets("\r\n")
raise EOFError if r == nil
r[0...-2]
end
def check_resp(*words)
r = get_resp()
rword, *vals = r.split(/\s+/)
if (words.size > 0) and !words.include?(rword)
- raise UnexpectedResponse.new(r)
+ raise UnexpectedResponse.classify(rword, r)
end
vals
end
- def read_job(word)
- # Give the user a chance to select on multiple fds.
- Beanstalk.select.call([@socket]) if Beanstalk.select
+ def found_job()
+ Job.new(@jptr, *read_job('FOUND'))
+ rescue NotFoundError
+ nil
+ end
+ def read_job(word)
id, bytes = check_resp(word).map{|s| s.to_i}
body = read_bytes(bytes)
raise 'bad trailer' if read_bytes(2) != "\r\n"
[id, body]
end
@@ -177,51 +199,10 @@
raise EOFError, 'End of file reached' if str.size < n
str
end
end
- # Same interface as RawConnection.
- # With this you can reserve more than one job at a time.
- class Connection
- attr_reader :addr
-
- def initialize(addr, jptr=self)
- @addr = addr
- @misc = RawConnection.new(addr, jptr)
- @free = Bag.new{RawConnection.new(addr, jptr)}
- @used = {}
- end
-
- def reserve()
- c = @free.take()
- j = c.reserve()
- @used[j.id] = c
- j
- ensure
- @free.give(c) if c and not j
- end
-
- def delete(id)
- @used[id].delete(id)
- @free.give(@used.delete(id))
- end
-
- def release(id, pri, delay)
- @used[id].release(id, pri, delay)
- @free.give(@used.delete(id))
- end
-
- def bury(id, pri)
- @used[id].bury(id, pri)
- @free.give(@used.delete(id))
- end
-
- def method_missing(selector, *args, &block)
- @misc.send(selector, *args, &block)
- end
- end
-
class CleanupWrapper
def initialize(addr, multi)
@conn = Connection.new(addr, self)
@multi = multi
end
@@ -264,18 +245,10 @@
def last_server
@last_conn.addr
end
- def send_to_rand_conn(sel, *args)
- wrap(pick_connection, sel, *args)
- end
-
- def send_to_all_conns(sel, *args)
- make_hash(@connections.map{|a, c| [a, wrap(c, sel, *args)]})
- end
-
def put(body, pri=65536, delay=0, ttr=120)
send_to_rand_conn(:put, body, pri, delay, ttr)
end
def yput(obj, pri=65536, delay=0, ttr=120)
@@ -330,25 +303,51 @@
@connections.delete(conn.addr)
end
def close
while @connections.size > 0
- addr, conn = @connections.pop
+ addr = @connections.keys.last
+ conn = @connections[addr]
+ @connections.delete(addr)
conn.close
end
end
- def peek()
+ def peek_ready()
+ send_to_each_conn_first_res(:peek_ready)
+ end
+
+ def peek_delayed()
+ send_to_each_conn_first_res(:peek_delayed)
+ end
+
+ def peek_buried()
+ send_to_each_conn_first_res(:peek_buried)
+ end
+
+ def peek_job(id)
+ make_hash(send_to_all_conns(:peek_job, id))
+ end
+
+ private
+
+ def send_to_each_conn_first_res(sel, *args)
open_connections.each do |c|
- job = c.peek
- return job if job
+ x = wrap(c, sel, *args)
+ return x if x
end
nil
end
- private
+ def send_to_rand_conn(sel, *args)
+ wrap(pick_connection, sel, *args)
+ end
+ def send_to_all_conns(sel, *args)
+ compact_hash(make_hash(@connections.map{|a, c| [a, wrap(c, sel, *args)]}))
+ end
+
def pick_connection()
open_connections[rand(open_connections.size)] or raise NotConnected
end
def wrap(conn, sel, *args)
@@ -361,9 +360,13 @@
retry
end
def make_hash(pairs)
Hash[*pairs.inject([]){|a,b|a+b}]
+ end
+
+ def compact_hash(hash)
+ hash.reject{|k,v| v == nil}
end
def sum_hashes(hs)
hs.inject({}){|a,b| a.merge(b) {|k,o,n| combine_stats(k, o, n)}}
end