lib/beanstalk-client/connection.rb in beanstalk-client-1.0.2 vs lib/beanstalk-client/connection.rb in beanstalk-client-1.1.0
- old
+ new
@@ -17,18 +17,18 @@
require 'socket'
require 'fcntl'
require 'yaml'
require 'set'
-require 'beanstalk-client/errors'
-require 'beanstalk-client/job'
+require 'thread'
module Beanstalk
class Connection
attr_reader :addr
def initialize(addr, default_tube=nil)
+ @mutex = Mutex.new
@waiting = false
@addr = addr
connect
@last_used = 'default'
@watch_list = [@last_used]
@@ -48,10 +48,14 @@
@socket.close
@socket = nil
end
def put(body, pri=65536, delay=0, ttr=120)
+ pri = pri.to_i
+ delay = delay.to_i
+ ttr = ttr.to_i
+ body = body.to_s # Make sure that body.size gives a useful number
interact("put #{pri} #{delay} #{ttr} #{body.size}\r\n#{body}\r\n",
%w(INSERTED BURIED))[0].to_i
end
def yput(obj, pri=65536, delay=0, ttr=120)
@@ -74,10 +78,11 @@
interact("peek-buried\r\n", :job)
end
def reserve(timeout=nil)
raise WaitingForJobError if @waiting
+ @mutex.lock
if timeout.nil?
@socket.write("reserve\r\n")
else
@socket.write("reserve-with-timeout #{timeout}\r\n")
end
@@ -91,27 +96,41 @@
ensure
@waiting = false
end
Job.new(self, *read_job('RESERVED'))
+ ensure
+ @mutex.unlock
end
def delete(id)
interact("delete #{id}\r\n", %w(DELETED))
:ok
end
def release(id, pri, delay)
+ id = id.to_i
+ pri = pri.to_i
+ delay = delay.to_i
interact("release #{id} #{pri} #{delay}\r\n", %w(RELEASED))
:ok
end
def bury(id, pri)
interact("bury #{id} #{pri}\r\n", %w(BURIED))
:ok
end
+ def touch(id)
+ interact("touch #{id}\r\n", %w(TOUCHED))
+ :ok
+ end
+
+ def kick(n)
+ interact("kick #{n}\r\n", %w(KICKED))[0].to_i
+ end
+
def use(tube)
return tube if tube == @last_used
@last_used = interact("use #{tube}\r\n", %w(USING))[0]
rescue BadFormatError
raise InvalidTubeName.new(tube)
@@ -160,14 +179,17 @@
private
def interact(cmd, rfmt)
raise WaitingForJobError if @waiting
+ @mutex.lock
@socket.write(cmd)
return read_yaml('OK') if rfmt == :yaml
return found_job if rfmt == :job
check_resp(*rfmt)
+ ensure
+ @mutex.unlock
end
def get_resp()
r = @socket.gets("\r\n")
raise EOFError if r == nil
@@ -191,11 +213,11 @@
def read_job(word)
id, bytes = check_resp(word).map{|s| s.to_i}
body = read_bytes(bytes)
raise 'bad trailer' if read_bytes(2) != "\r\n"
- [id, body]
+ [id, body, word == 'RESERVED']
end
def read_yaml(word)
bytes_s, = check_resp(word)
yaml = read_bytes(bytes_s.to_i)
@@ -231,13 +253,14 @@
prev_watched = @connections[addr].list_tubes_watched()
to_ignore = prev_watched - @watch_list
@watch_list.each{|tube| @connections[addr].watch(tube)}
to_ignore.each{|tube| @connections[addr].ignore(tube)}
end
+ rescue Errno::ECONNREFUSED
+ raise NotConnected
rescue Exception => ex
puts "#{ex.class}: #{ex}"
- #puts begin ex.fixed_backtrace rescue ex.backtrace end
end
end
@connections.size
end
@@ -247,18 +270,32 @@
def last_server
@last_conn.addr
end
+ # Put a job on the queue.
+ #
+ # == Parameters:
+ #
+ # * <tt>body</tt>: the payload of the job (use Beanstalk::Pool#yput / Beanstalk::Job#ybody to automatically serialize your payload with YAML)
+ # * <tt>pri</tt>: priority. Default 65536 (higher numbers are higher priority)
+ # * <tt>delay</tt>: how long to wait until making the job available for reservation
+ # * <tt>ttr</tt>: time in seconds for the job to reappear on the queue (if beanstalk doesn't hear from a consumer within this time, assume the consumer died and make the job available for someone else to process). Default 120 seconds.
def put(body, pri=65536, delay=0, ttr=120)
send_to_rand_conn(:put, body, pri, delay, ttr)
end
+ # Like put, but serialize the object with YAML.
def yput(obj, pri=65536, delay=0, ttr=120)
send_to_rand_conn(:yput, obj, pri, delay, ttr)
end
+ # Reserve a job from the queue.
+ #
+ # == Parameters
+ #
+ # * <tt>timeout</tt> - Time (in seconds) to wait for a job to be available. If nil, wait indefinitely.
def reserve(timeout=nil)
send_to_rand_conn(:reserve, timeout)
end
def use(tube)
@@ -304,13 +341,16 @@
def list_tubes_watched(*args)
send_to_all_conns(:list_tubes_watched, *args)
end
def remove(conn)
- @connections.delete(conn.addr)
+ connection = @connections.delete(conn.addr)
+ connection.close if connection
+ connection
end
+ # Close all open connections for this pool
def close
while @connections.size > 0
addr = @connections.keys.last
conn = @connections[addr]
@connections.delete(addr)
@@ -337,10 +377,12 @@
private
def call_wrap(c, *args)
self.last_conn = c
c.send(*args)
- rescue EOFError, Errno::ECONNRESET, Errno::EPIPE, UnexpectedResponse => ex
+ rescue UnexpectedResponse => ex
+ raise ex
+ rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => ex
self.remove(c)
raise ex
end
def retry_wrap(*args)