lib/beanstalk-client/connection.rb in beanstalk-client-1.0.2 vs lib/beanstalk-client/connection.rb in beanstalk-client-1.1.0

- old
+ new

@@ -17,18 +17,18 @@ require 'socket' require 'fcntl' require 'yaml' require 'set' -require 'beanstalk-client/errors' -require 'beanstalk-client/job' +require 'thread' module Beanstalk class Connection attr_reader :addr def initialize(addr, default_tube=nil) + @mutex = Mutex.new @waiting = false @addr = addr connect @last_used = 'default' @watch_list = [@last_used] @@ -48,10 +48,14 @@ @socket.close @socket = nil end def put(body, pri=65536, delay=0, ttr=120) + pri = pri.to_i + delay = delay.to_i + ttr = ttr.to_i + body = body.to_s # Make sure that body.size gives a useful number interact("put #{pri} #{delay} #{ttr} #{body.size}\r\n#{body}\r\n", %w(INSERTED BURIED))[0].to_i end def yput(obj, pri=65536, delay=0, ttr=120) @@ -74,10 +78,11 @@ interact("peek-buried\r\n", :job) end def reserve(timeout=nil) raise WaitingForJobError if @waiting + @mutex.lock if timeout.nil? @socket.write("reserve\r\n") else @socket.write("reserve-with-timeout #{timeout}\r\n") end @@ -91,27 +96,41 @@ ensure @waiting = false end Job.new(self, *read_job('RESERVED')) + ensure + @mutex.unlock end def delete(id) interact("delete #{id}\r\n", %w(DELETED)) :ok end def release(id, pri, delay) + id = id.to_i + pri = pri.to_i + delay = delay.to_i interact("release #{id} #{pri} #{delay}\r\n", %w(RELEASED)) :ok end def bury(id, pri) interact("bury #{id} #{pri}\r\n", %w(BURIED)) :ok end + def touch(id) + interact("touch #{id}\r\n", %w(TOUCHED)) + :ok + end + + def kick(n) + interact("kick #{n}\r\n", %w(KICKED))[0].to_i + end + def use(tube) return tube if tube == @last_used @last_used = interact("use #{tube}\r\n", %w(USING))[0] rescue BadFormatError raise InvalidTubeName.new(tube) @@ -160,14 +179,17 @@ private def interact(cmd, rfmt) raise WaitingForJobError if @waiting + @mutex.lock @socket.write(cmd) return read_yaml('OK') if rfmt == :yaml return found_job if rfmt == :job check_resp(*rfmt) + ensure + @mutex.unlock end def get_resp() r = @socket.gets("\r\n") raise EOFError if r == nil @@ -191,11 +213,11 @@ def read_job(word) id, bytes = check_resp(word).map{|s| s.to_i} body = read_bytes(bytes) raise 'bad trailer' if read_bytes(2) != "\r\n" - [id, body] + [id, body, word == 'RESERVED'] end def read_yaml(word) bytes_s, = check_resp(word) yaml = read_bytes(bytes_s.to_i) @@ -231,13 +253,14 @@ prev_watched = @connections[addr].list_tubes_watched() to_ignore = prev_watched - @watch_list @watch_list.each{|tube| @connections[addr].watch(tube)} to_ignore.each{|tube| @connections[addr].ignore(tube)} end + rescue Errno::ECONNREFUSED + raise NotConnected rescue Exception => ex puts "#{ex.class}: #{ex}" - #puts begin ex.fixed_backtrace rescue ex.backtrace end end end @connections.size end @@ -247,18 +270,32 @@ def last_server @last_conn.addr end + # Put a job on the queue. + # + # == Parameters: + # + # * <tt>body</tt>: the payload of the job (use Beanstalk::Pool#yput / Beanstalk::Job#ybody to automatically serialize your payload with YAML) + # * <tt>pri</tt>: priority. Default 65536 (higher numbers are higher priority) + # * <tt>delay</tt>: how long to wait until making the job available for reservation + # * <tt>ttr</tt>: time in seconds for the job to reappear on the queue (if beanstalk doesn't hear from a consumer within this time, assume the consumer died and make the job available for someone else to process). Default 120 seconds. def put(body, pri=65536, delay=0, ttr=120) send_to_rand_conn(:put, body, pri, delay, ttr) end + # Like put, but serialize the object with YAML. def yput(obj, pri=65536, delay=0, ttr=120) send_to_rand_conn(:yput, obj, pri, delay, ttr) end + # Reserve a job from the queue. + # + # == Parameters + # + # * <tt>timeout</tt> - Time (in seconds) to wait for a job to be available. If nil, wait indefinitely. def reserve(timeout=nil) send_to_rand_conn(:reserve, timeout) end def use(tube) @@ -304,13 +341,16 @@ def list_tubes_watched(*args) send_to_all_conns(:list_tubes_watched, *args) end def remove(conn) - @connections.delete(conn.addr) + connection = @connections.delete(conn.addr) + connection.close if connection + connection end + # Close all open connections for this pool def close while @connections.size > 0 addr = @connections.keys.last conn = @connections[addr] @connections.delete(addr) @@ -337,10 +377,12 @@ private def call_wrap(c, *args) self.last_conn = c c.send(*args) - rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, UnexpectedResponse => ex + rescue UnexpectedResponse => ex + raise ex + rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => ex self.remove(c) raise ex end def retry_wrap(*args)