lib/em-jack/connection.rb in em-jack-0.0.4 vs lib/em-jack/connection.rb in em-jack-0.0.5

- old
+ new

@@ -2,12 +2,23 @@ require 'yaml' module EMJack class Connection RETRY_COUNT = 5 - + + @@handlers = [] + attr_accessor :host, :port + + def self.register_handler(handler) + @@handlers ||= [] + @@handlers << handler + end + + def self.handlers + @@handlers + end def initialize(opts = {}) @host = opts[:host] || 'localhost' @port = opts[:port] || 11300 @tube = opts[:tube] @@ -60,22 +71,10 @@ @conn.send(:reserve) end add_deferrable(&blk) end - def each_job(&blk) - work = Proc.new do - r = reserve - r.callback do |job| - blk.call(job) - - EM.next_tick { work.call } - end - end - work.call - end - def stats(type = nil, val = nil, &blk) case(type) when nil then @conn.send(:stats) when :tube then @conn.send(:'stats-tube', val) when :job then @conn.send(:'stats-job', val.jobid) @@ -97,14 +96,18 @@ def delete(job, &blk) return if job.nil? @conn.send(:delete, job.jobid) add_deferrable(&blk) end - - def release(job, &blk) + + def release(job, opts = {}, &blk) return if job.nil? - @conn.send(:release, job.jobid, 0, 0) + + pri = (opts[:priority] || 65536).to_i + delay = (opts[:delay] || 0).to_i + + @conn.send(:release, job.jobid, pri, delay) add_deferrable(&blk) end def put(msg, opts = nil, &blk) opts = {} if opts.nil? @@ -121,18 +124,29 @@ m = msg.to_s @conn.send_with_data(:put, m, pri, delay, ttr, m.length) add_deferrable(&blk) end - + + def each_job(timeout = nil, &blk) + work = Proc.new do + r = reserve(timeout) + r.callback do |job| + blk.call(job) + + EM.next_tick { work.call } + end + end + work.call + end + def connected @retries = 0 end def disconnected - # XXX I think I need to run out the deferrables as failed here - # since the connection was dropped + @deferrables.each { |df| df.fail(:disconnected) } raise EMJack::Disconnected if @retries >= RETRY_COUNT @retries += 1 EM.add_timer(1) { @conn.reconnect(@host, @port) } end @@ -154,88 +168,43 @@ end def on_error(&blk) @error_callback = blk end - + def received(data) @data << data until @data.empty? idx = @data.index(/\r\n/) break if idx.nil? first = @data[0..(idx + 1)] - handled = false - %w(OUT_OF_MEMORY INTERNAL_ERROR DRAINING BAD_FORMAT - UNKNOWN_COMMAND EXPECTED_CRLF JOB_TOO_BIG DEADLINE_SOON - TIMED_OUT NOT_FOUND).each do |cmd| - next unless first =~ /^#{cmd}\r\n/i - df = @deferrables.shift - df.fail(cmd.downcase.to_sym) + df = @deferrables.shift + handled, skip = false, false + EMJack::Connection.handlers.each do |h| + handles, bytes = h.handles?(first) - @data = @data[(cmd.length + 2)..-1] - handled = true - break - end - next if handled + next unless handles + bytes = bytes.to_i - case (first) - when /^DELETED\r\n/ then - df = @deferrables.shift - df.succeed + # if this handler requires us to receive a body make sure we can get + # the full length of body. If not, we'll go around and wait for more + # data to be received + body, @data = extract_body(bytes, @data) unless bytes <= 0 + break if body.nil? && bytes > 0 - when /^INSERTED\s+(\d+)\r\n/ then - df = @deferrables.shift - df.succeed($1.to_i) - - when /^RELEASED\r\n/ then - df = @deferrables.shift - df.succeed - - when /^BURIED\s+(\d+)\r\n/ then - df = @deferrables.shift - df.fail(:buried, $1.to_i) - - when /^USING\s+(.*)\r\n/ then - df = @deferrables.shift - df.succeed($1) - - when /^WATCHING\s+(\d+)\r\n/ then - df = @deferrables.shift - df.succeed($1.to_i) - - when /^NOT_IGNORED\r\n/ then - df = @deferrables.shift - df.fail("Can't ignore only watched tube") - - when /^OK\s+(\d+)\r\n/ then - bytes = $1.to_i - - body, @data = extract_body(bytes, @data) - break if body.nil? - - df = @deferrables.shift - df.succeed(YAML.load(body)) - next - - when /^RESERVED\s+(\d+)\s+(\d+)\r\n/ then - id = $1.to_i - bytes = $2.to_i - - body, @data = extract_body(bytes, @data) - break if body.nil? - - df = @deferrables.shift - job = EMJack::Job.new(self, id, body) - df.succeed(job) - next - - else - break + handled = h.handle(df, first, body) + break if handled end + @deferrables.unshift(df) unless handled + + # not handled means there wasn't enough data to process a complete response + break unless handled + next unless @data.index(/\r\n/) + @data = @data[(@data.index(/\r\n/) + 2)..-1] @data = "" if @data.nil? end end