lib/em-jack/connection.rb in em-jack-0.0.3 vs lib/em-jack/connection.rb in em-jack-0.0.4

- old
+ new

@@ -28,101 +28,102 @@ use(@tube) watch(@tube) end end - def use(tube) + def use(tube, &blk) return if @used_tube == tube @used_tube = tube @conn.send(:use, tube) - add_deferrable + add_deferrable(&blk) end - def watch(tube) + def watch(tube, &blk) return if @watched_tubes.include?(tube) - @watched_tubes.push(tube) @conn.send(:watch, tube) - add_deferrable + df = add_deferrable(&blk) + df.callback { @watched_tubes.push(tube) } + df end - def ignore(tube) - return if not @watched_tubes.include?(tube) - @watched_tubes.delete(tube) + def ignore(tube, &blk) + return unless @watched_tubes.include?(tube) @conn.send(:ignore, tube) - add_deferrable + df = add_deferrable(&blk) + df.callback { @watched_tubes.delete(tube) } + df end - def reserve(timeout = nil) + def reserve(timeout = nil, &blk) if timeout @conn.send(:'reserve-with-timeout', timeout) else @conn.send(:reserve) end - add_deferrable + add_deferrable(&blk) end - def each_job(&block) + def each_job(&blk) work = Proc.new do r = reserve r.callback do |job| - block.call(job) + blk.call(job) + EM.next_tick { work.call } end end work.call end - def stats(type = nil, val = nil) + def stats(type = nil, val = nil, &blk) case(type) when nil then @conn.send(:stats) when :tube then @conn.send(:'stats-tube', val) when :job then @conn.send(:'stats-job', val.jobid) else raise EMJack::InvalidCommand.new end - add_deferrable + add_deferrable(&blk) end - def list(type = nil) + def list(type = nil, &blk) case(type) when nil then @conn.send(:'list-tubes') when :used then @conn.send(:'list-tube-used') when :watched then @conn.send(:'list-tubes-watched') else raise EMJack::InvalidCommand.new end - add_deferrable + add_deferrable(&blk) end - def delete(job) + def delete(job, &blk) return if job.nil? @conn.send(:delete, job.jobid) - add_deferrable + add_deferrable(&blk) end - def release(job) + def release(job, &blk) return if job.nil? @conn.send(:release, job.jobid, 0, 0) - add_deferrable + add_deferrable(&blk) end - - def put(msg, opts = {}) + + def put(msg, opts = nil, &blk) + opts = {} if opts.nil? pri = (opts[:priority] || 65536).to_i - if pri< 0 - pri = 65536 - elsif pri > (2 ** 32) - pri = 2 ** 32 - end + pri = 65536 if pri< 0 + pri = 2 ** 32 if pri > (2 ** 32) delay = (opts[:delay] || 0).to_i delay = 0 if delay < 0 ttr = (opts[:ttr] || 300).to_i ttr = 300 if ttr < 0 m = msg.to_s @conn.send_with_data(:put, m, pri, delay, ttr, m.length) - add_deferrable + add_deferrable(&blk) end def connected @retries = 0 end @@ -134,18 +135,30 @@ raise EMJack::Disconnected if @retries >= RETRY_COUNT @retries += 1 EM.add_timer(1) { @conn.reconnect(@host, @port) } end - def add_deferrable + def add_deferrable(&blk) df = EM::DefaultDeferrable.new - df.errback { |err| puts "ERROR: #{err}" } + df.errback do |err| + if @error_callback + @error_callback.call(err) + else + puts "ERROR: #{err}" + end + end + df.callback &blk if block_given? + @deferrables.push(df) df end + def on_error(&blk) + @error_callback = blk + end + def received(data) @data << data until @data.empty? idx = @data.index(/\r\n/) @@ -174,10 +187,14 @@ when /^INSERTED\s+(\d+)\r\n/ then df = @deferrables.shift df.succeed($1.to_i) + when /^RELEASED\r\n/ then + df = @deferrables.shift + df.succeed + when /^BURIED\s+(\d+)\r\n/ then df = @deferrables.shift df.fail(:buried, $1.to_i) when /^USING\s+(.*)\r\n/ then @@ -186,10 +203,14 @@ when /^WATCHING\s+(\d+)\r\n/ then df = @deferrables.shift df.succeed($1.to_i) + when /^NOT_IGNORED\r\n/ then + df = @deferrables.shift + df.fail("Can't ignore only watched tube") + when /^OK\s+(\d+)\r\n/ then bytes = $1.to_i body, @data = extract_body(bytes, @data) break if body.nil? @@ -220,12 +241,14 @@ end def extract_body(bytes, data) rem = data[(data.index(/\r\n/) + 2)..-1] return [nil, data] if rem.length < bytes + body = rem[0..(bytes - 1)] data = rem[(bytes + 2)..-1] data = "" if data.nil? + [body, data] end end end