lib/beanstalk-client/connection.rb in beanstalk-client-0.9.0 vs lib/beanstalk-client/connection.rb in beanstalk-client-0.10.0

- old
+ new

@@ -17,22 +17,23 @@ require 'socket' require 'fcntl' require 'yaml' require 'set' -require 'beanstalk-client/bag' require 'beanstalk-client/errors' require 'beanstalk-client/job' module Beanstalk - class RawConnection + class Connection attr_reader :addr def initialize(addr, jptr=self) @addr = addr @jptr = jptr connect + @last_used = 'default' + @watch_list = ['default'] end def connect host, port = addr.split(':') @socket = TCPSocket.new(host, port.to_i) @@ -45,121 +46,142 @@ @socket.close @socket = nil end def put(body, pri=65536, delay=0, ttr=120) - @socket.write("put #{pri} #{delay} #{ttr} #{body.size}\r\n#{body}\r\n") - check_resp('INSERTED', 'BURIED')[0].to_i + interact("put #{pri} #{delay} #{ttr} #{body.size}\r\n#{body}\r\n", + %w(INSERTED BURIED))[0].to_i end def yput(obj, pri=65536, delay=0, ttr=120) put(YAML.dump(obj), pri, delay, ttr) end - def peek() - @socket.write("peek\r\n") - begin - Job.new(@jptr, *read_job('FOUND')) - rescue UnexpectedResponse - nil - end + def peek_job(id) + interact("peek #{id}\r\n", :job) end - def peek_job(id) - @socket.write("peek #{id}\r\n") - Job.new(@jptr, *read_job('FOUND')) + def peek_ready() + interact("peek-ready\r\n", :job) end + def peek_delayed() + interact("peek-delayed\r\n", :job) + end + + def peek_buried() + interact("peek-buried\r\n", :job) + end + def reserve() + raise WaitingForJobError if @waiting @socket.write("reserve\r\n") + + begin + @waiting = true + # Give the user a chance to select on multiple fds. + Beanstalk.select.call([@socket]) if Beanstalk.select + rescue WaitingForJobError + # just continue + ensure + @waiting = false + end + Job.new(@jptr, *read_job('RESERVED')) end def delete(id) - @socket.write("delete #{id}\r\n") - check_resp('DELETED') + interact("delete #{id}\r\n", %w(DELETED)) :ok end def release(id, pri, delay) - @socket.write("release #{id} #{pri} #{delay}\r\n") - check_resp('RELEASED') + interact("release #{id} #{pri} #{delay}\r\n", %w(RELEASED)) :ok end def bury(id, pri) - @socket.write("bury #{id} #{pri}\r\n") - check_resp('BURIED') + interact("bury #{id} #{pri}\r\n", %w(BURIED)) :ok end def use(tube) - @socket.write("use #{tube}\r\n") - check_resp('USING')[0] + return tube if tube == @last_used + @last_used = interact("use #{tube}\r\n", %w(USING))[0] end def watch(tube) - @socket.write("watch #{tube}\r\n") - check_resp('WATCHING')[0] + return @watch_list.size if @watch_list.include?(tube) + r = interact("watch #{tube}\r\n", %w(WATCHING))[0].to_i + @watch_list += [tube] + return r end def ignore(tube) - @socket.write("ignore #{tube}\r\n") - check_resp('WATCHING')[0] + return @watch_list.size if !@watch_list.include?(tube) + r = interact("ignore #{tube}\r\n", %w(WATCHING))[0].to_i + @watch_list -= [tube] + return r end def stats() - @socket.write("stats\r\n") - read_yaml('OK') + interact("stats\r\n", :yaml) end def job_stats(id) - @socket.write("stats-job #{id}\r\n") - read_yaml('OK') + interact("stats-job #{id}\r\n", :yaml) end def stats_tube(tube) - @socket.write("stats-tube #{tube}\r\n") - read_yaml('OK') + interact("stats-tube #{tube}\r\n", :yaml) end def list_tubes() - @socket.write("list-tubes\r\n") - read_yaml('OK') + interact("list-tubes\r\n", :yaml) end def list_tube_used() - @socket.write("list-tube-used\r\n") - check_resp('USING')[0] + interact("list-tube-used\r\n", %w(USING))[0] end - def list_tubes_watched() - @socket.write("list-tubes-watched\r\n") - read_yaml('OK') + def list_tubes_watched(cached=false) + return @watch_list if cached + @watch_list = interact("list-tubes-watched\r\n", :yaml) end private + def interact(cmd, rfmt) + raise WaitingForJobError if @waiting + @socket.write(cmd) + return read_yaml('OK') if rfmt == :yaml + return found_job if rfmt == :job + check_resp(*rfmt) + end + def get_resp() r = @socket.gets("\r\n") raise EOFError if r == nil r[0...-2] end def check_resp(*words) r = get_resp() rword, *vals = r.split(/\s+/) if (words.size > 0) and !words.include?(rword) - raise UnexpectedResponse.new(r) + raise UnexpectedResponse.classify(rword, r) end vals end - def read_job(word) - # Give the user a chance to select on multiple fds. - Beanstalk.select.call([@socket]) if Beanstalk.select + def found_job() + Job.new(@jptr, *read_job('FOUND')) + rescue NotFoundError + nil + end + def read_job(word) id, bytes = check_resp(word).map{|s| s.to_i} body = read_bytes(bytes) raise 'bad trailer' if read_bytes(2) != "\r\n" [id, body] end @@ -177,51 +199,10 @@ raise EOFError, 'End of file reached' if str.size < n str end end - # Same interface as RawConnection. - # With this you can reserve more than one job at a time. - class Connection - attr_reader :addr - - def initialize(addr, jptr=self) - @addr = addr - @misc = RawConnection.new(addr, jptr) - @free = Bag.new{RawConnection.new(addr, jptr)} - @used = {} - end - - def reserve() - c = @free.take() - j = c.reserve() - @used[j.id] = c - j - ensure - @free.give(c) if c and not j - end - - def delete(id) - @used[id].delete(id) - @free.give(@used.delete(id)) - end - - def release(id, pri, delay) - @used[id].release(id, pri, delay) - @free.give(@used.delete(id)) - end - - def bury(id, pri) - @used[id].bury(id, pri) - @free.give(@used.delete(id)) - end - - def method_missing(selector, *args, &block) - @misc.send(selector, *args, &block) - end - end - class CleanupWrapper def initialize(addr, multi) @conn = Connection.new(addr, self) @multi = multi end @@ -264,18 +245,10 @@ def last_server @last_conn.addr end - def send_to_rand_conn(sel, *args) - wrap(pick_connection, sel, *args) - end - - def send_to_all_conns(sel, *args) - make_hash(@connections.map{|a, c| [a, wrap(c, sel, *args)]}) - end - def put(body, pri=65536, delay=0, ttr=120) send_to_rand_conn(:put, body, pri, delay, ttr) end def yput(obj, pri=65536, delay=0, ttr=120) @@ -330,25 +303,51 @@ @connections.delete(conn.addr) end def close while @connections.size > 0 - addr, conn = @connections.pop + addr = @connections.keys.last + conn = @connections[addr] + @connections.delete(addr) conn.close end end - def peek() + def peek_ready() + send_to_each_conn_first_res(:peek_ready) + end + + def peek_delayed() + send_to_each_conn_first_res(:peek_delayed) + end + + def peek_buried() + send_to_each_conn_first_res(:peek_buried) + end + + def peek_job(id) + make_hash(send_to_all_conns(:peek_job, id)) + end + + private + + def send_to_each_conn_first_res(sel, *args) open_connections.each do |c| - job = c.peek - return job if job + x = wrap(c, sel, *args) + return x if x end nil end - private + def send_to_rand_conn(sel, *args) + wrap(pick_connection, sel, *args) + end + def send_to_all_conns(sel, *args) + compact_hash(make_hash(@connections.map{|a, c| [a, wrap(c, sel, *args)]})) + end + def pick_connection() open_connections[rand(open_connections.size)] or raise NotConnected end def wrap(conn, sel, *args) @@ -361,9 +360,13 @@ retry end def make_hash(pairs) Hash[*pairs.inject([]){|a,b|a+b}] + end + + def compact_hash(hash) + hash.reject{|k,v| v == nil} end def sum_hashes(hs) hs.inject({}){|a,b| a.merge(b) {|k,o,n| combine_stats(k, o, n)}} end