lib/em-jack/connection.rb in em-jack-0.0.3 vs lib/em-jack/connection.rb in em-jack-0.0.4
- old
+ new
@@ -28,101 +28,102 @@
use(@tube)
watch(@tube)
end
end
- def use(tube)
+ def use(tube, &blk)
return if @used_tube == tube
@used_tube = tube
@conn.send(:use, tube)
- add_deferrable
+ add_deferrable(&blk)
end
- def watch(tube)
+ def watch(tube, &blk)
return if @watched_tubes.include?(tube)
- @watched_tubes.push(tube)
@conn.send(:watch, tube)
- add_deferrable
+ df = add_deferrable(&blk)
+ df.callback { @watched_tubes.push(tube) }
+ df
end
- def ignore(tube)
- return if not @watched_tubes.include?(tube)
- @watched_tubes.delete(tube)
+ def ignore(tube, &blk)
+ return unless @watched_tubes.include?(tube)
@conn.send(:ignore, tube)
- add_deferrable
+ df = add_deferrable(&blk)
+ df.callback { @watched_tubes.delete(tube) }
+ df
end
- def reserve(timeout = nil)
+ def reserve(timeout = nil, &blk)
if timeout
@conn.send(:'reserve-with-timeout', timeout)
else
@conn.send(:reserve)
end
- add_deferrable
+ add_deferrable(&blk)
end
- def each_job(&block)
+ def each_job(&blk)
work = Proc.new do
r = reserve
r.callback do |job|
- block.call(job)
+ blk.call(job)
+
EM.next_tick { work.call }
end
end
work.call
end
- def stats(type = nil, val = nil)
+ def stats(type = nil, val = nil, &blk)
case(type)
when nil then @conn.send(:stats)
when :tube then @conn.send(:'stats-tube', val)
when :job then @conn.send(:'stats-job', val.jobid)
else raise EMJack::InvalidCommand.new
end
- add_deferrable
+ add_deferrable(&blk)
end
- def list(type = nil)
+ def list(type = nil, &blk)
case(type)
when nil then @conn.send(:'list-tubes')
when :used then @conn.send(:'list-tube-used')
when :watched then @conn.send(:'list-tubes-watched')
else raise EMJack::InvalidCommand.new
end
- add_deferrable
+ add_deferrable(&blk)
end
- def delete(job)
+ def delete(job, &blk)
return if job.nil?
@conn.send(:delete, job.jobid)
- add_deferrable
+ add_deferrable(&blk)
end
- def release(job)
+ def release(job, &blk)
return if job.nil?
@conn.send(:release, job.jobid, 0, 0)
- add_deferrable
+ add_deferrable(&blk)
end
-
- def put(msg, opts = {})
+
+ def put(msg, opts = nil, &blk)
+ opts = {} if opts.nil?
pri = (opts[:priority] || 65536).to_i
- if pri< 0
- pri = 65536
- elsif pri > (2 ** 32)
- pri = 2 ** 32
- end
+ pri = 65536 if pri< 0
+ pri = 2 ** 32 if pri > (2 ** 32)
delay = (opts[:delay] || 0).to_i
delay = 0 if delay < 0
ttr = (opts[:ttr] || 300).to_i
ttr = 300 if ttr < 0
m = msg.to_s
@conn.send_with_data(:put, m, pri, delay, ttr, m.length)
- add_deferrable
+ add_deferrable(&blk)
end
def connected
@retries = 0
end
@@ -134,18 +135,30 @@
raise EMJack::Disconnected if @retries >= RETRY_COUNT
@retries += 1
EM.add_timer(1) { @conn.reconnect(@host, @port) }
end
- def add_deferrable
+ def add_deferrable(&blk)
df = EM::DefaultDeferrable.new
- df.errback { |err| puts "ERROR: #{err}" }
+ df.errback do |err|
+ if @error_callback
+ @error_callback.call(err)
+ else
+ puts "ERROR: #{err}"
+ end
+ end
+ df.callback &blk if block_given?
+
@deferrables.push(df)
df
end
+ def on_error(&blk)
+ @error_callback = blk
+ end
+
def received(data)
@data << data
until @data.empty?
idx = @data.index(/\r\n/)
@@ -174,10 +187,14 @@
when /^INSERTED\s+(\d+)\r\n/ then
df = @deferrables.shift
df.succeed($1.to_i)
+ when /^RELEASED\r\n/ then
+ df = @deferrables.shift
+ df.succeed
+
when /^BURIED\s+(\d+)\r\n/ then
df = @deferrables.shift
df.fail(:buried, $1.to_i)
when /^USING\s+(.*)\r\n/ then
@@ -186,10 +203,14 @@
when /^WATCHING\s+(\d+)\r\n/ then
df = @deferrables.shift
df.succeed($1.to_i)
+ when /^NOT_IGNORED\r\n/ then
+ df = @deferrables.shift
+ df.fail("Can't ignore only watched tube")
+
when /^OK\s+(\d+)\r\n/ then
bytes = $1.to_i
body, @data = extract_body(bytes, @data)
break if body.nil?
@@ -220,12 +241,14 @@
end
def extract_body(bytes, data)
rem = data[(data.index(/\r\n/) + 2)..-1]
return [nil, data] if rem.length < bytes
+
body = rem[0..(bytes - 1)]
data = rem[(bytes + 2)..-1]
data = "" if data.nil?
+
[body, data]
end
end
end