lib/em-jack/connection.rb in em-jack-0.0.6 vs lib/em-jack/connection.rb in em-jack-0.0.7
- old
+ new
@@ -15,49 +15,49 @@
end
def self.handlers
@@handlers
end
-
+
def initialize(opts = {})
@host = opts[:host] || 'localhost'
@port = opts[:port] || 11300
@tube = opts[:tube]
-
+
@used_tube = 'default'
@watched_tubes = ['default']
-
+
@data = ""
@retries = 0
@in_reserve = false
@deferrables = []
-
+
@conn = EM::connect(host, port, EMJack::BeanstalkConnection) do |conn|
conn.client = self
end
-
+
unless @tube.nil?
use(@tube)
watch(@tube)
end
end
-
+
def use(tube, &blk)
return if @used_tube == tube
@used_tube = tube
@conn.send(:use, tube)
add_deferrable(&blk)
end
-
+
def watch(tube, &blk)
return if @watched_tubes.include?(tube)
@conn.send(:watch, tube)
df = add_deferrable(&blk)
df.callback { @watched_tubes.push(tube) }
df
end
-
+
def ignore(tube, &blk)
return unless @watched_tubes.include?(tube)
@conn.send(:ignore, tube)
df = add_deferrable(&blk)
df.callback { @watched_tubes.delete(tube) }
@@ -112,19 +112,19 @@
def put(msg, opts = nil, &blk)
opts = {} if opts.nil?
pri = (opts[:priority] || 65536).to_i
pri = 65536 if pri< 0
pri = 2 ** 32 if pri > (2 ** 32)
-
+
delay = (opts[:delay] || 0).to_i
delay = 0 if delay < 0
-
+
ttr = (opts[:ttr] || 300).to_i
ttr = 300 if ttr < 0
-
+
m = msg.to_s
-
+
@conn.send_with_data(:put, m, pri, delay, ttr, m.length)
add_deferrable(&blk)
end
def each_job(timeout = nil, &blk)
@@ -158,17 +158,17 @@
@error_callback.call(err)
else
puts "ERROR: #{err}"
end
end
-
+
df.callback &blk if block_given?
@deferrables.push(df)
df
end
-
+
def on_error(&blk)
@error_callback = blk
end
def received(data)
@@ -192,20 +192,20 @@
# the full length of body. If not, we'll go around and wait for more
# data to be received
body, @data = extract_body(bytes, @data) unless bytes <= 0
break if body.nil? && bytes > 0
- handled = h.handle(df, first, body)
+ handled = h.handle(df, first, body, self)
break if handled
end
@deferrables.unshift(df) unless handled
# not handled means there wasn't enough data to process a complete response
break unless handled
next unless @data.index(/\r\n/)
-
+
@data = @data[(@data.index(/\r\n/) + 2)..-1]
@data = "" if @data.nil?
end
end
@@ -217,7 +217,7 @@
data = rem[(bytes + 2)..-1]
data = "" if data.nil?
[body, data]
end
- end
+ end
end