lib/beanstalk-client/connection.rb in beanstalk-client-1.1.0 vs lib/beanstalk-client/connection.rb in beanstalk-client-1.1.1
- old
+ new
@@ -25,10 +25,11 @@
class Connection
attr_reader :addr
def initialize(addr, default_tube=nil)
@mutex = Mutex.new
+ @tube_mutex = Mutex.new
@waiting = false
@addr = addr
connect
@last_used = 'default'
@watch_list = [@last_used]
@@ -51,12 +52,12 @@
def put(body, pri=65536, delay=0, ttr=120)
pri = pri.to_i
delay = delay.to_i
ttr = ttr.to_i
- body = body.to_s # Make sure that body.size gives a useful number
- interact("put #{pri} #{delay} #{ttr} #{body.size}\r\n#{body}\r\n",
+ body = "#{body}" # Make sure that body.bytesize gives a useful number
+ interact("put #{pri} #{delay} #{ttr} #{body.bytesize}\r\n#{body}\r\n",
%w(INSERTED BURIED))[0].to_i
end
def yput(obj, pri=65536, delay=0, ttr=120)
put(YAML.dump(obj), pri, delay, ttr)
@@ -76,10 +77,18 @@
def peek_buried()
interact("peek-buried\r\n", :job)
end
+ def on_tube(tube, &block)
+ @tube_mutex.lock
+ use tube
+ yield self
+ ensure
+ @tube_mutex.unlock
+ end
+
def reserve(timeout=nil)
raise WaitingForJobError if @waiting
@mutex.lock
if timeout.nil?
@socket.write("reserve\r\n")
@@ -287,10 +296,14 @@
# Like put, but serialize the object with YAML.
def yput(obj, pri=65536, delay=0, ttr=120)
send_to_rand_conn(:yput, obj, pri, delay, ttr)
end
+ def on_tube(tube, &block)
+ send_to_rand_conn(:on_tube, tube, &block)
+ end
+
# Reserve a job from the queue.
#
# == Parameters
#
# * <tt>timeout</tt> - Time (in seconds) to wait for a job to be available. If nil, wait indefinitely.
@@ -374,13 +387,13 @@
make_hash(send_to_all_conns(:peek_job, id))
end
private
- def call_wrap(c, *args)
+ def call_wrap(c, *args, &block)
self.last_conn = c
- c.send(*args)
+ c.send(*args, &block)
rescue UnexpectedResponse => ex
raise ex
rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => ex
self.remove(c)
raise ex
@@ -399,12 +412,12 @@
def send_to_each_conn_first_res(*args)
connect()
retry_wrap{open_connections.inject(nil) {|r,c| r or call_wrap(c, *args)}}
end
- def send_to_rand_conn(*args)
+ def send_to_rand_conn(*args, &block)
connect()
- retry_wrap{call_wrap(pick_connection, *args)}
+ retry_wrap{call_wrap(pick_connection, *args, &block)}
end
def send_to_all_conns(*args)
connect()
retry_wrap{compact_hash(map_hash(@connections){|c| call_wrap(c, *args)})}