lib/em-jack/connection.rb in em-jack-0.0.8 vs lib/em-jack/connection.rb in em-jack-0.0.9

- old
+ new

@@ -1,10 +1,12 @@ require 'eventmachine' require 'yaml' module EMJack class Connection + include EM::Deferrable + RETRY_COUNT = 5 @@handlers = [] attr_accessor :host, :port @@ -41,78 +43,138 @@ end end def use(tube, &blk) return if @used_tube == tube - @used_tube = tube - @conn.send(:use, tube) + + callback { + @used_tube = tube + @conn.send(:use, tube) + } + add_deferrable(&blk) end def watch(tube, &blk) return if @watched_tubes.include?(tube) - @conn.send(:watch, tube) + + callback { @conn.send(:watch, tube) } + df = add_deferrable(&blk) df.callback { @watched_tubes.push(tube) } df end def ignore(tube, &blk) return unless @watched_tubes.include?(tube) - @conn.send(:ignore, tube) + + callback { @conn.send(:ignore, tube) } + df = add_deferrable(&blk) df.callback { @watched_tubes.delete(tube) } df end def reserve(timeout = nil, &blk) - if timeout - @conn.send(:'reserve-with-timeout', timeout) - else - @conn.send(:reserve) - end + callback { + if timeout + @conn.send(:'reserve-with-timeout', timeout) + else + @conn.send(:reserve) + end + } + add_deferrable(&blk) end + def peek(type = nil, &blk) + callback { + case(type.to_s) + when /^\d+$/ then @conn.send(:peek, type) + when "ready" then @conn.send(:'peek-ready') + when "delayed" then @conn.send(:'peek-delayed') + when "buried" then @conn.send(:'peek-buried') + else raise EMJack::InvalidCommand.new + end + } + + add_deferrable(&blk) + end + def stats(type = nil, val = nil, &blk) - case(type) - when nil then @conn.send(:stats) - when :tube then @conn.send(:'stats-tube', val) - when :job then @conn.send(:'stats-job', val.jobid) - else raise EMJack::InvalidCommand.new - end + callback { + case(type) + when nil then @conn.send(:stats) + when :tube then @conn.send(:'stats-tube', val) + when :job then @conn.send(:'stats-job', val.jobid) + else raise EMJack::InvalidCommand.new + end + } + add_deferrable(&blk) end def list(type = nil, &blk) - case(type) - when nil then @conn.send(:'list-tubes') - when :used then @conn.send(:'list-tube-used') - when :watched then @conn.send(:'list-tubes-watched') - else raise EMJack::InvalidCommand.new - end + callback { + case(type) + when nil then @conn.send(:'list-tubes') + when :used then @conn.send(:'list-tube-used') + when :watched then @conn.send(:'list-tubes-watched') + else raise EMJack::InvalidCommand.new + end + } add_deferrable(&blk) end def delete(job, &blk) return if job.nil? - @conn.send(:delete, job.jobid) + + callback { @conn.send(:delete, job.jobid) } + add_deferrable(&blk) end + def touch(job, &blk) + return if job.nil? + + callback { @conn.send(:touch, job.jobid) } + + add_deferrable(&blk) + end + + def bury(job, pri, &blk) + callback { @conn.send(:bury, job.jobid, pri) } + + add_deferrable(&blk) + end + + def kick(count = 1, &blk) + callback { @conn.send(:kick, count) } + + add_deferrable(&blk) + end + + def pause(tube, delay, &blk) + callback { @conn.send(:'pause-tube', delay) } + + add_deferrable(&blk) + end + def release(job, opts = {}, &blk) return if job.nil? pri = (opts[:priority] || 65536).to_i delay = (opts[:delay] || 0).to_i - @conn.send(:release, job.jobid, pri, delay) + callback { @conn.send(:release, job.jobid, pri, delay) } + add_deferrable(&blk) end def put(msg, opts = nil, &blk) opts = {} if opts.nil? + pri = (opts[:priority] || 65536).to_i pri = 65536 if pri< 0 pri = 2 ** 32 if pri > (2 ** 32) delay = (opts[:delay] || 0).to_i @@ -121,11 +183,12 @@ ttr = (opts[:ttr] || 300).to_i ttr = 300 if ttr < 0 m = msg.to_s - @conn.send_with_data(:put, m, pri, delay, ttr, m.length) + callback { @conn.send_with_data(:put, m, pri, delay, ttr, m.length) } + add_deferrable(&blk) end def each_job(timeout = nil, &blk) work = Proc.new do @@ -139,29 +202,30 @@ work.call end def connected @retries = 0 + succeed end def disconnected - @deferrables.each { |df| df.fail(:disconnected) } + d = @deferrables.dup @deferrables = [] + set_deferred_status(nil) + d.each { |df| df.fail(:disconnected) } + raise EMJack::Disconnected if @retries >= RETRY_COUNT + @retries += 1 - EM.add_timer(1) { @conn.reconnect(@host, @port) } + EM.add_timer(5) { @conn.reconnect(@host, @port) } end def add_deferrable(&blk) df = EM::DefaultDeferrable.new - df.errback do |err| - if @error_callback - @error_callback.call(err) - else - puts "ERROR: #{err}" - end + if @error_callback + df.errback { |err| @error_callback.call(err) } end df.callback &blk if block_given? @deferrables.push(df) @@ -190,11 +254,11 @@ bytes = bytes.to_i # if this handler requires us to receive a body make sure we can get # the full length of body. If not, we'll go around and wait for more # data to be received - body, @data = extract_body(bytes, @data) unless bytes <= 0 + body, @data = extract_body!(bytes, @data) unless bytes <= 0 break if body.nil? && bytes > 0 handled = h.handle(df, first, body, self) break if handled end @@ -208,10 +272,10 @@ @data = @data[(@data.index(/\r\n/) + 2)..-1] @data = "" if @data.nil? end end - def extract_body(bytes, data) + def extract_body!(bytes, data) rem = data[(data.index(/\r\n/) + 2)..-1] return [nil, data] if rem.length < bytes body = rem[0..(bytes - 1)] data = rem[(bytes + 2)..-1]