lib/em-jack/connection.rb in em-jack-0.1.1 vs lib/em-jack/connection.rb in em-jack-0.1.2
- old
+ new
@@ -23,17 +23,15 @@
def initialize(opts = {})
@host = opts[:host] || 'localhost'
@port = opts[:port] || 11300
@tube = opts[:tube]
- @used_tube = 'default'
- @watched_tubes = ['default']
+ reset_tube_state
@data = ""
@retries = 0
@in_reserve = false
- @deferrables = []
@conn = EM::connect(host, port, EMJack::BeanstalkConnection) do |conn|
conn.client = self
end
@@ -41,10 +39,21 @@
use(@tube)
watch(@tube)
end
end
+ def reset_tube_state
+ prev_used = @used_tube
+ prev_watched = @watched_tubes.dup if @watched_tubes
+
+ @used_tube = 'default'
+ @watched_tubes = ['default']
+ @deferrables = []
+
+ return [prev_used, prev_watched]
+ end
+
def fiber!
eigen = (class << self
self
end)
eigen.instance_eval do
@@ -228,21 +237,45 @@
succeed
end
def disconnected
d = @deferrables.dup
- @deferrables = []
+ prev_used, prev_watched = reset_tube_state
+
set_deferred_status(nil)
d.each { |df| df.fail(:disconnected) }
- raise EMJack::Disconnected if @retries >= RETRY_COUNT
+ if @retries >= RETRY_COUNT
+ if @disconnected_callback
+ @disconnected_callback.call
+ else
+ raise EMJack::Disconnected
+ end
+ end
@retries += 1
- EM.add_timer(5) { @conn.reconnect(@host, @port) }
+ EM.add_timer(5) { reconnect(prev_used, prev_watched) }
end
+
+ def reconnect(prev_used, prev_watched)
+ @conn.reconnect(@host, @port)
+ use(prev_used) if prev_used
+ [ prev_watched ].flatten.compact.each do |tube|
+ watch(tube)
+ end
+ end
+
+ def reconnect!
+ @retries = 0
+
+ prev_used, prev_watched = reset_tube_state
+
+ EM.next_tick { reconnect(prev_used, prev_watched) }
+ end
+
def add_deferrable(&blk)
df = EM::DefaultDeferrable.new
if @error_callback
df.errback { |err| @error_callback.call(err) }
end
@@ -253,9 +286,13 @@
df
end
def on_error(&blk)
@error_callback = blk
+ end
+
+ def on_disconnect(&blk)
+ @disconnected_callback = blk
end
def received(data)
@data << data