lib/em-jack/connection.rb in em-jack-0.0.4 vs lib/em-jack/connection.rb in em-jack-0.0.5
- old
+ new
@@ -2,12 +2,23 @@
require 'yaml'
module EMJack
class Connection
RETRY_COUNT = 5
-
+
+ @@handlers = []
+
attr_accessor :host, :port
+
+ def self.register_handler(handler)
+ @@handlers ||= []
+ @@handlers << handler
+ end
+
+ def self.handlers
+ @@handlers
+ end
def initialize(opts = {})
@host = opts[:host] || 'localhost'
@port = opts[:port] || 11300
@tube = opts[:tube]
@@ -60,22 +71,10 @@
@conn.send(:reserve)
end
add_deferrable(&blk)
end
- def each_job(&blk)
- work = Proc.new do
- r = reserve
- r.callback do |job|
- blk.call(job)
-
- EM.next_tick { work.call }
- end
- end
- work.call
- end
-
def stats(type = nil, val = nil, &blk)
case(type)
when nil then @conn.send(:stats)
when :tube then @conn.send(:'stats-tube', val)
when :job then @conn.send(:'stats-job', val.jobid)
@@ -97,14 +96,18 @@
def delete(job, &blk)
return if job.nil?
@conn.send(:delete, job.jobid)
add_deferrable(&blk)
end
-
- def release(job, &blk)
+
+ def release(job, opts = {}, &blk)
return if job.nil?
- @conn.send(:release, job.jobid, 0, 0)
+
+ pri = (opts[:priority] || 65536).to_i
+ delay = (opts[:delay] || 0).to_i
+
+ @conn.send(:release, job.jobid, pri, delay)
add_deferrable(&blk)
end
def put(msg, opts = nil, &blk)
opts = {} if opts.nil?
@@ -121,18 +124,29 @@
m = msg.to_s
@conn.send_with_data(:put, m, pri, delay, ttr, m.length)
add_deferrable(&blk)
end
-
+
+ def each_job(timeout = nil, &blk)
+ work = Proc.new do
+ r = reserve(timeout)
+ r.callback do |job|
+ blk.call(job)
+
+ EM.next_tick { work.call }
+ end
+ end
+ work.call
+ end
+
def connected
@retries = 0
end
def disconnected
- # XXX I think I need to run out the deferrables as failed here
- # since the connection was dropped
+ @deferrables.each { |df| df.fail(:disconnected) }
raise EMJack::Disconnected if @retries >= RETRY_COUNT
@retries += 1
EM.add_timer(1) { @conn.reconnect(@host, @port) }
end
@@ -154,88 +168,43 @@
end
def on_error(&blk)
@error_callback = blk
end
-
+
def received(data)
@data << data
until @data.empty?
idx = @data.index(/\r\n/)
break if idx.nil?
first = @data[0..(idx + 1)]
- handled = false
- %w(OUT_OF_MEMORY INTERNAL_ERROR DRAINING BAD_FORMAT
- UNKNOWN_COMMAND EXPECTED_CRLF JOB_TOO_BIG DEADLINE_SOON
- TIMED_OUT NOT_FOUND).each do |cmd|
- next unless first =~ /^#{cmd}\r\n/i
- df = @deferrables.shift
- df.fail(cmd.downcase.to_sym)
+ df = @deferrables.shift
+ handled, skip = false, false
+ EMJack::Connection.handlers.each do |h|
+ handles, bytes = h.handles?(first)
- @data = @data[(cmd.length + 2)..-1]
- handled = true
- break
- end
- next if handled
+ next unless handles
+ bytes = bytes.to_i
- case (first)
- when /^DELETED\r\n/ then
- df = @deferrables.shift
- df.succeed
+ # if this handler requires us to receive a body make sure we can get
+ # the full length of body. If not, we'll go around and wait for more
+ # data to be received
+ body, @data = extract_body(bytes, @data) unless bytes <= 0
+ break if body.nil? && bytes > 0
- when /^INSERTED\s+(\d+)\r\n/ then
- df = @deferrables.shift
- df.succeed($1.to_i)
-
- when /^RELEASED\r\n/ then
- df = @deferrables.shift
- df.succeed
-
- when /^BURIED\s+(\d+)\r\n/ then
- df = @deferrables.shift
- df.fail(:buried, $1.to_i)
-
- when /^USING\s+(.*)\r\n/ then
- df = @deferrables.shift
- df.succeed($1)
-
- when /^WATCHING\s+(\d+)\r\n/ then
- df = @deferrables.shift
- df.succeed($1.to_i)
-
- when /^NOT_IGNORED\r\n/ then
- df = @deferrables.shift
- df.fail("Can't ignore only watched tube")
-
- when /^OK\s+(\d+)\r\n/ then
- bytes = $1.to_i
-
- body, @data = extract_body(bytes, @data)
- break if body.nil?
-
- df = @deferrables.shift
- df.succeed(YAML.load(body))
- next
-
- when /^RESERVED\s+(\d+)\s+(\d+)\r\n/ then
- id = $1.to_i
- bytes = $2.to_i
-
- body, @data = extract_body(bytes, @data)
- break if body.nil?
-
- df = @deferrables.shift
- job = EMJack::Job.new(self, id, body)
- df.succeed(job)
- next
-
- else
- break
+ handled = h.handle(df, first, body)
+ break if handled
end
+ @deferrables.unshift(df) unless handled
+
+ # not handled means there wasn't enough data to process a complete response
+ break unless handled
+ next unless @data.index(/\r\n/)
+
@data = @data[(@data.index(/\r\n/) + 2)..-1]
@data = "" if @data.nil?
end
end