# beanstalk-client/connection.rb - client library for beanstalk # Copyright (C) 2007 Philotic Inc. # This program is free software: you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation, either version 3 of the License, or # (at your option) any later version. # This program is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # You should have received a copy of the GNU General Public License # along with this program. If not, see . require 'socket' require 'fcntl' require 'yaml' require 'set' require 'thread' module Beanstalk class Connection attr_reader :addr def initialize(addr, default_tube=nil) @mutex = Mutex.new @tube_mutex = Mutex.new @waiting = false @addr = addr connect @last_used = 'default' @watch_list = [@last_used] self.use(default_tube) if default_tube self.watch(default_tube) if default_tube end def connect host, port = addr.split(':') @socket = TCPSocket.new(host, port.to_i) # Don't leak fds when we exec. @socket.fcntl(Fcntl::F_SETFD, Fcntl::FD_CLOEXEC) end def close @socket.close @socket = nil end def put(body, pri=65536, delay=0, ttr=120) pri = pri.to_i delay = delay.to_i ttr = ttr.to_i body = "#{body}" # Make sure that body.bytesize gives a useful number interact("put #{pri} #{delay} #{ttr} #{body.bytesize}\r\n#{body}\r\n", %w(INSERTED BURIED))[0].to_i end def yput(obj, pri=65536, delay=0, ttr=120) put(YAML.dump(obj), pri, delay, ttr) end def peek_job(id) interact("peek #{id}\r\n", :job) end def peek_ready() interact("peek-ready\r\n", :job) end def peek_delayed() interact("peek-delayed\r\n", :job) end def peek_buried() interact("peek-buried\r\n", :job) end def on_tube(tube, &block) @tube_mutex.lock use tube yield self ensure @tube_mutex.unlock end def reserve(timeout=nil) raise WaitingForJobError if @waiting @mutex.lock if timeout.nil? @socket.write("reserve\r\n") else @socket.write("reserve-with-timeout #{timeout}\r\n") end begin @waiting = true # Give the user a chance to select on multiple fds. Beanstalk.select.call([@socket]) if Beanstalk.select rescue WaitingForJobError # just continue ensure @waiting = false end Job.new(self, *read_job('RESERVED')) ensure @mutex.unlock end def delete(id) interact("delete #{id}\r\n", %w(DELETED)) :ok end def release(id, pri, delay) id = id.to_i pri = pri.to_i delay = delay.to_i interact("release #{id} #{pri} #{delay}\r\n", %w(RELEASED)) :ok end def bury(id, pri) interact("bury #{id} #{pri}\r\n", %w(BURIED)) :ok end def touch(id) interact("touch #{id}\r\n", %w(TOUCHED)) :ok end def kick(n) interact("kick #{n}\r\n", %w(KICKED))[0].to_i end def use(tube) return tube if tube == @last_used @last_used = interact("use #{tube}\r\n", %w(USING))[0] rescue BadFormatError raise InvalidTubeName.new(tube) end def watch(tube) return @watch_list.size if @watch_list.include?(tube) r = interact("watch #{tube}\r\n", %w(WATCHING))[0].to_i @watch_list += [tube] return r rescue BadFormatError raise InvalidTubeName.new(tube) end def ignore(tube) return @watch_list.size if !@watch_list.include?(tube) r = interact("ignore #{tube}\r\n", %w(WATCHING))[0].to_i @watch_list -= [tube] return r end def stats() interact("stats\r\n", :yaml) end def job_stats(id) interact("stats-job #{id}\r\n", :yaml) end def stats_tube(tube) interact("stats-tube #{tube}\r\n", :yaml) end def list_tubes() interact("list-tubes\r\n", :yaml) end def list_tube_used() interact("list-tube-used\r\n", %w(USING))[0] end def list_tubes_watched(cached=false) return @watch_list if cached @watch_list = interact("list-tubes-watched\r\n", :yaml) end private def interact(cmd, rfmt) raise WaitingForJobError if @waiting @mutex.lock @socket.write(cmd) return read_yaml('OK') if rfmt == :yaml return found_job if rfmt == :job check_resp(*rfmt) ensure @mutex.unlock end def get_resp() r = @socket.gets("\r\n") raise EOFError if r == nil r[0...-2] end def check_resp(*words) r = get_resp() rword, *vals = r.split(/\s+/) if (words.size > 0) and !words.include?(rword) raise UnexpectedResponse.classify(rword, r) end vals end def found_job() Job.new(self, *read_job('FOUND')) rescue NotFoundError nil end def read_job(word) id, bytes = check_resp(word).map{|s| s.to_i} body = read_bytes(bytes) raise 'bad trailer' if read_bytes(2) != "\r\n" [id, body, word == 'RESERVED'] end def read_yaml(word) bytes_s, = check_resp(word) yaml = read_bytes(bytes_s.to_i) raise 'bad trailer' if read_bytes(2) != "\r\n" YAML::load(yaml) end def read_bytes(n) str = @socket.read(n) raise EOFError, 'End of file reached' if str == nil raise EOFError, 'End of file reached' if str.size < n str end end class Pool attr_accessor :last_conn def initialize(addrs, default_tube=nil) @addrs = addrs @watch_list = ['default'] @default_tube=default_tube @watch_list = [default_tube] if default_tube connect() end def connect() @connections ||= {} @addrs.each do |addr| begin if !@connections.include?(addr) @connections[addr] = Connection.new(addr, @default_tube) prev_watched = @connections[addr].list_tubes_watched() to_ignore = prev_watched - @watch_list @watch_list.each{|tube| @connections[addr].watch(tube)} to_ignore.each{|tube| @connections[addr].ignore(tube)} end rescue Errno::ECONNREFUSED raise NotConnected rescue Exception => ex puts "#{ex.class}: #{ex}" end end @connections.size end def open_connections() @connections.values() end def last_server @last_conn.addr end def auth(token) send_to_all_conns(:auth, token) end # Put a job on the queue. # # == Parameters: # # * body: the payload of the job (use Beanstalk::Pool#yput / Beanstalk::Job#ybody to automatically serialize your payload with YAML) # * pri: priority. Default 65536 (higher numbers are higher priority) # * delay: how long to wait until making the job available for reservation # * ttr: time in seconds for the job to reappear on the queue (if beanstalk doesn't hear from a consumer within this time, assume the consumer died and make the job available for someone else to process). Default 120 seconds. def put(body, pri=65536, delay=0, ttr=120) send_to_rand_conn(:put, body, pri, delay, ttr) end # Like put, but serialize the object with YAML. def yput(obj, pri=65536, delay=0, ttr=120) send_to_rand_conn(:yput, obj, pri, delay, ttr) end def on_tube(tube, &block) send_to_rand_conn(:on_tube, tube, &block) end # Reserve a job from the queue. # # == Parameters # # * timeout - Time (in seconds) to wait for a job to be available. If nil, wait indefinitely. def reserve(timeout=nil) send_to_rand_conn(:reserve, timeout) end def use(tube) send_to_all_conns(:use, tube) end def watch(tube) r = send_to_all_conns(:watch, tube) @watch_list = send_to_rand_conn(:list_tubes_watched, true) return r end def ignore(tube) r = send_to_all_conns(:ignore, tube) @watch_list = send_to_rand_conn(:list_tubes_watched, true) return r end def raw_stats() send_to_all_conns(:stats) end def stats() sum_hashes(raw_stats.values) end def raw_stats_tube(tube) send_to_all_conns(:stats_tube, tube) end def stats_tube(tube) sum_hashes(raw_stats_tube(tube).values) end def list_tubes() send_to_all_conns(:list_tubes) end def list_tube_used() send_to_all_conns(:list_tube_used) end def list_tubes_watched(*args) send_to_all_conns(:list_tubes_watched, *args) end def remove(conn) connection = @connections.delete(conn.addr) connection.close if connection connection end # Close all open connections for this pool def close while @connections.size > 0 addr = @connections.keys.last conn = @connections[addr] @connections.delete(addr) conn.close end end def peek_ready() send_to_each_conn_first_res(:peek_ready) end def peek_delayed() send_to_each_conn_first_res(:peek_delayed) end def peek_buried() send_to_each_conn_first_res(:peek_buried) end def peek_job(id) make_hash(send_to_all_conns(:peek_job, id)) end private def call_wrap(c, *args, &block) self.last_conn = c c.send(*args, &block) rescue UnexpectedResponse => ex raise ex rescue EOFError, Errno::ECONNRESET, Errno::EPIPE => ex self.remove(c) raise ex end def retry_wrap(*args) yield rescue DrainingError # Don't reconnect -- we're not interested in this server retry rescue EOFError, Errno::ECONNRESET, Errno::EPIPE connect() retry end def send_to_each_conn_first_res(*args) connect() retry_wrap{open_connections.inject(nil) {|r,c| r or call_wrap(c, *args)}} end def send_to_rand_conn(*args, &block) connect() retry_wrap{call_wrap(pick_connection, *args, &block)} end def send_to_all_conns(*args) connect() retry_wrap{compact_hash(map_hash(@connections){|c| call_wrap(c, *args)})} end def pick_connection() open_connections[rand(open_connections.size)] or raise NotConnected end def make_hash(pairs) Hash[*pairs.inject([]){|a,b|a+b}] end def map_hash(h) make_hash(h.map{|k,v| [k, yield(v)]}) end def compact_hash(hash) hash.reject{|k,v| v == nil} end def sum_hashes(hs) hs.inject({}){|a,b| a.merge(b) {|k,o,n| combine_stats(k, o, n)}} end DONT_ADD = Set['name', 'version', 'pid'] def combine_stats(k, a, b) DONT_ADD.include?(k) ? Set[a] + Set[b] : a + b end end end