lib/em-jack/connection.rb in em-jack-0.0.6 vs lib/em-jack/connection.rb in em-jack-0.0.7

- old
+ new

@@ -15,49 +15,49 @@ end def self.handlers @@handlers end - + def initialize(opts = {}) @host = opts[:host] || 'localhost' @port = opts[:port] || 11300 @tube = opts[:tube] - + @used_tube = 'default' @watched_tubes = ['default'] - + @data = "" @retries = 0 @in_reserve = false @deferrables = [] - + @conn = EM::connect(host, port, EMJack::BeanstalkConnection) do |conn| conn.client = self end - + unless @tube.nil? use(@tube) watch(@tube) end end - + def use(tube, &blk) return if @used_tube == tube @used_tube = tube @conn.send(:use, tube) add_deferrable(&blk) end - + def watch(tube, &blk) return if @watched_tubes.include?(tube) @conn.send(:watch, tube) df = add_deferrable(&blk) df.callback { @watched_tubes.push(tube) } df end - + def ignore(tube, &blk) return unless @watched_tubes.include?(tube) @conn.send(:ignore, tube) df = add_deferrable(&blk) df.callback { @watched_tubes.delete(tube) } @@ -112,19 +112,19 @@ def put(msg, opts = nil, &blk) opts = {} if opts.nil? pri = (opts[:priority] || 65536).to_i pri = 65536 if pri< 0 pri = 2 ** 32 if pri > (2 ** 32) - + delay = (opts[:delay] || 0).to_i delay = 0 if delay < 0 - + ttr = (opts[:ttr] || 300).to_i ttr = 300 if ttr < 0 - + m = msg.to_s - + @conn.send_with_data(:put, m, pri, delay, ttr, m.length) add_deferrable(&blk) end def each_job(timeout = nil, &blk) @@ -158,17 +158,17 @@ @error_callback.call(err) else puts "ERROR: #{err}" end end - + df.callback &blk if block_given? @deferrables.push(df) df end - + def on_error(&blk) @error_callback = blk end def received(data) @@ -192,20 +192,20 @@ # the full length of body. If not, we'll go around and wait for more # data to be received body, @data = extract_body(bytes, @data) unless bytes <= 0 break if body.nil? && bytes > 0 - handled = h.handle(df, first, body) + handled = h.handle(df, first, body, self) break if handled end @deferrables.unshift(df) unless handled # not handled means there wasn't enough data to process a complete response break unless handled next unless @data.index(/\r\n/) - + @data = @data[(@data.index(/\r\n/) + 2)..-1] @data = "" if @data.nil? end end @@ -217,7 +217,7 @@ data = rem[(bytes + 2)..-1] data = "" if data.nil? [body, data] end - end + end end