lib/em-jack/connection.rb in em-jack-0.0.8 vs lib/em-jack/connection.rb in em-jack-0.0.9
- old
+ new
@@ -1,10 +1,12 @@
require 'eventmachine'
require 'yaml'
module EMJack
class Connection
+ include EM::Deferrable
+
RETRY_COUNT = 5
@@handlers = []
attr_accessor :host, :port
@@ -41,78 +43,138 @@
end
end
def use(tube, &blk)
return if @used_tube == tube
- @used_tube = tube
- @conn.send(:use, tube)
+
+ callback {
+ @used_tube = tube
+ @conn.send(:use, tube)
+ }
+
add_deferrable(&blk)
end
def watch(tube, &blk)
return if @watched_tubes.include?(tube)
- @conn.send(:watch, tube)
+
+ callback { @conn.send(:watch, tube) }
+
df = add_deferrable(&blk)
df.callback { @watched_tubes.push(tube) }
df
end
def ignore(tube, &blk)
return unless @watched_tubes.include?(tube)
- @conn.send(:ignore, tube)
+
+ callback { @conn.send(:ignore, tube) }
+
df = add_deferrable(&blk)
df.callback { @watched_tubes.delete(tube) }
df
end
def reserve(timeout = nil, &blk)
- if timeout
- @conn.send(:'reserve-with-timeout', timeout)
- else
- @conn.send(:reserve)
- end
+ callback {
+ if timeout
+ @conn.send(:'reserve-with-timeout', timeout)
+ else
+ @conn.send(:reserve)
+ end
+ }
+
add_deferrable(&blk)
end
+ def peek(type = nil, &blk)
+ callback {
+ case(type.to_s)
+ when /^\d+$/ then @conn.send(:peek, type)
+ when "ready" then @conn.send(:'peek-ready')
+ when "delayed" then @conn.send(:'peek-delayed')
+ when "buried" then @conn.send(:'peek-buried')
+ else raise EMJack::InvalidCommand.new
+ end
+ }
+
+ add_deferrable(&blk)
+ end
+
def stats(type = nil, val = nil, &blk)
- case(type)
- when nil then @conn.send(:stats)
- when :tube then @conn.send(:'stats-tube', val)
- when :job then @conn.send(:'stats-job', val.jobid)
- else raise EMJack::InvalidCommand.new
- end
+ callback {
+ case(type)
+ when nil then @conn.send(:stats)
+ when :tube then @conn.send(:'stats-tube', val)
+ when :job then @conn.send(:'stats-job', val.jobid)
+ else raise EMJack::InvalidCommand.new
+ end
+ }
+
add_deferrable(&blk)
end
def list(type = nil, &blk)
- case(type)
- when nil then @conn.send(:'list-tubes')
- when :used then @conn.send(:'list-tube-used')
- when :watched then @conn.send(:'list-tubes-watched')
- else raise EMJack::InvalidCommand.new
- end
+ callback {
+ case(type)
+ when nil then @conn.send(:'list-tubes')
+ when :used then @conn.send(:'list-tube-used')
+ when :watched then @conn.send(:'list-tubes-watched')
+ else raise EMJack::InvalidCommand.new
+ end
+ }
add_deferrable(&blk)
end
def delete(job, &blk)
return if job.nil?
- @conn.send(:delete, job.jobid)
+
+ callback { @conn.send(:delete, job.jobid) }
+
add_deferrable(&blk)
end
+ def touch(job, &blk)
+ return if job.nil?
+
+ callback { @conn.send(:touch, job.jobid) }
+
+ add_deferrable(&blk)
+ end
+
+ def bury(job, pri, &blk)
+ callback { @conn.send(:bury, job.jobid, pri) }
+
+ add_deferrable(&blk)
+ end
+
+ def kick(count = 1, &blk)
+ callback { @conn.send(:kick, count) }
+
+ add_deferrable(&blk)
+ end
+
+ def pause(tube, delay, &blk)
+ callback { @conn.send(:'pause-tube', delay) }
+
+ add_deferrable(&blk)
+ end
+
def release(job, opts = {}, &blk)
return if job.nil?
pri = (opts[:priority] || 65536).to_i
delay = (opts[:delay] || 0).to_i
- @conn.send(:release, job.jobid, pri, delay)
+ callback { @conn.send(:release, job.jobid, pri, delay) }
+
add_deferrable(&blk)
end
def put(msg, opts = nil, &blk)
opts = {} if opts.nil?
+
pri = (opts[:priority] || 65536).to_i
pri = 65536 if pri< 0
pri = 2 ** 32 if pri > (2 ** 32)
delay = (opts[:delay] || 0).to_i
@@ -121,11 +183,12 @@
ttr = (opts[:ttr] || 300).to_i
ttr = 300 if ttr < 0
m = msg.to_s
- @conn.send_with_data(:put, m, pri, delay, ttr, m.length)
+ callback { @conn.send_with_data(:put, m, pri, delay, ttr, m.length) }
+
add_deferrable(&blk)
end
def each_job(timeout = nil, &blk)
work = Proc.new do
@@ -139,29 +202,30 @@
work.call
end
def connected
@retries = 0
+ succeed
end
def disconnected
- @deferrables.each { |df| df.fail(:disconnected) }
+ d = @deferrables.dup
@deferrables = []
+ set_deferred_status(nil)
+ d.each { |df| df.fail(:disconnected) }
+
raise EMJack::Disconnected if @retries >= RETRY_COUNT
+
@retries += 1
- EM.add_timer(1) { @conn.reconnect(@host, @port) }
+ EM.add_timer(5) { @conn.reconnect(@host, @port) }
end
def add_deferrable(&blk)
df = EM::DefaultDeferrable.new
- df.errback do |err|
- if @error_callback
- @error_callback.call(err)
- else
- puts "ERROR: #{err}"
- end
+ if @error_callback
+ df.errback { |err| @error_callback.call(err) }
end
df.callback &blk if block_given?
@deferrables.push(df)
@@ -190,11 +254,11 @@
bytes = bytes.to_i
# if this handler requires us to receive a body make sure we can get
# the full length of body. If not, we'll go around and wait for more
# data to be received
- body, @data = extract_body(bytes, @data) unless bytes <= 0
+ body, @data = extract_body!(bytes, @data) unless bytes <= 0
break if body.nil? && bytes > 0
handled = h.handle(df, first, body, self)
break if handled
end
@@ -208,10 +272,10 @@
@data = @data[(@data.index(/\r\n/) + 2)..-1]
@data = "" if @data.nil?
end
end
- def extract_body(bytes, data)
+ def extract_body!(bytes, data)
rem = data[(data.index(/\r\n/) + 2)..-1]
return [nil, data] if rem.length < bytes
body = rem[0..(bytes - 1)]
data = rem[(bytes + 2)..-1]