lib/em-jack/connection.rb in em-jack-0.1.1 vs lib/em-jack/connection.rb in em-jack-0.1.2

- old
+ new

@@ -23,17 +23,15 @@ def initialize(opts = {}) @host = opts[:host] || 'localhost' @port = opts[:port] || 11300 @tube = opts[:tube] - @used_tube = 'default' - @watched_tubes = ['default'] + reset_tube_state @data = "" @retries = 0 @in_reserve = false - @deferrables = [] @conn = EM::connect(host, port, EMJack::BeanstalkConnection) do |conn| conn.client = self end @@ -41,10 +39,21 @@ use(@tube) watch(@tube) end end + def reset_tube_state + prev_used = @used_tube + prev_watched = @watched_tubes.dup if @watched_tubes + + @used_tube = 'default' + @watched_tubes = ['default'] + @deferrables = [] + + return [prev_used, prev_watched] + end + def fiber! eigen = (class << self self end) eigen.instance_eval do @@ -228,21 +237,45 @@ succeed end def disconnected d = @deferrables.dup - @deferrables = [] + prev_used, prev_watched = reset_tube_state + set_deferred_status(nil) d.each { |df| df.fail(:disconnected) } - raise EMJack::Disconnected if @retries >= RETRY_COUNT + if @retries >= RETRY_COUNT + if @disconnected_callback + @disconnected_callback.call + else + raise EMJack::Disconnected + end + end @retries += 1 - EM.add_timer(5) { @conn.reconnect(@host, @port) } + EM.add_timer(5) { reconnect(prev_used, prev_watched) } end + + def reconnect(prev_used, prev_watched) + @conn.reconnect(@host, @port) + use(prev_used) if prev_used + [ prev_watched ].flatten.compact.each do |tube| + watch(tube) + end + end + + def reconnect! + @retries = 0 + + prev_used, prev_watched = reset_tube_state + + EM.next_tick { reconnect(prev_used, prev_watched) } + end + def add_deferrable(&blk) df = EM::DefaultDeferrable.new if @error_callback df.errback { |err| @error_callback.call(err) } end @@ -253,9 +286,13 @@ df end def on_error(&blk) @error_callback = blk + end + + def on_disconnect(&blk) + @disconnected_callback = blk end def received(data) @data << data