lib/beanstalk-client/connection.rb in beanstalk-client-1.1.0 vs lib/beanstalk-client/connection.rb in beanstalk-client-1.1.1

- old
+ new

@@ -25,10 +25,11 @@ class Connection attr_reader :addr def initialize(addr, default_tube=nil) @mutex = Mutex.new + @tube_mutex = Mutex.new @waiting = false @addr = addr connect @last_used = 'default' @watch_list = [@last_used] @@ -51,12 +52,12 @@ def put(body, pri=65536, delay=0, ttr=120) pri = pri.to_i delay = delay.to_i ttr = ttr.to_i - body = body.to_s # Make sure that body.size gives a useful number - interact("put #{pri} #{delay} #{ttr} #{body.size}\r\n#{body}\r\n", + body = "#{body}" # Make sure that body.bytesize gives a useful number + interact("put #{pri} #{delay} #{ttr} #{body.bytesize}\r\n#{body}\r\n", %w(INSERTED BURIED))[0].to_i end def yput(obj, pri=65536, delay=0, ttr=120) put(YAML.dump(obj), pri, delay, ttr) @@ -76,10 +77,18 @@ def peek_buried() interact("peek-buried\r\n", :job) end + def on_tube(tube, &block) + @tube_mutex.lock + use tube + yield self + ensure + @tube_mutex.unlock + end + def reserve(timeout=nil) raise WaitingForJobError if @waiting @mutex.lock if timeout.nil? @socket.write("reserve\r\n") @@ -287,10 +296,14 @@ # Like put, but serialize the object with YAML. def yput(obj, pri=65536, delay=0, ttr=120) send_to_rand_conn(:yput, obj, pri, delay, ttr) end + def on_tube(tube, &block) + send_to_rand_conn(:on_tube, tube, &block) + end + # Reserve a job from the queue. # # == Parameters # # * <tt>timeout</tt> - Time (in seconds) to wait for a job to be available. If nil, wait indefinitely. @@ -374,13 +387,13 @@ make_hash(send_to_all_conns(:peek_job, id)) end private - def call_wrap(c, *args) + def call_wrap(c, *args, &block) self.last_conn = c - c.send(*args) + c.send(*args, &block) rescue UnexpectedResponse => ex raise ex rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => ex self.remove(c) raise ex @@ -399,12 +412,12 @@ def send_to_each_conn_first_res(*args) connect() retry_wrap{open_connections.inject(nil) {|r,c| r or call_wrap(c, *args)}} end - def send_to_rand_conn(*args) + def send_to_rand_conn(*args, &block) connect() - retry_wrap{call_wrap(pick_connection, *args)} + retry_wrap{call_wrap(pick_connection, *args, &block)} end def send_to_all_conns(*args) connect() retry_wrap{compact_hash(map_hash(@connections){|c| call_wrap(c, *args)})}