lib/em-jack/connection.rb in em-jack-0.1.3 vs lib/em-jack/connection.rb in em-jack-0.1.4

- old
+ new

@@ -1,7 +1,8 @@ require 'eventmachine' require 'yaml' +require 'uri' module EMJack class Connection include EM::Deferrable @@ -19,19 +20,28 @@ def self.handlers @@handlers end def initialize(opts = {}) - @host = opts[:host] || 'localhost' - @port = opts[:port] || 11300 - @tube = opts[:tube] + case opts + when Hash + @host = opts[:host] || 'localhost' + @port = opts[:port] || 11300 + @tube = opts[:tube] + when String + uri = URI.parse(opts) + @host = uri.host || 'localhost' + @port = uri.port || 11300 + @tube = uri.path.gsub(/^\//, '') # Kill the leading / + end reset_tube_state @data = "" @retries = 0 @in_reserve = false + @fiberized = false @conn = EM::connect(host, port, EMJack::BeanstalkConnection) do |conn| conn.client = self end @@ -40,31 +50,30 @@ watch(@tube) end end def reset_tube_state - prev_used = @used_tube + prev_used = @used_tube prev_watched = @watched_tubes.dup if @watched_tubes @used_tube = 'default' @watched_tubes = ['default'] @deferrables = [] return [prev_used, prev_watched] end def fiber! - eigen = (class << self - self - end) + @fiberized = true + + eigen = (class << self; self; end) eigen.instance_eval do %w(use reserve ignore watch peek stats list delete touch bury kick pause release put).each do |meth| alias_method :"a#{meth}", meth.to_sym define_method(meth.to_sym) do |*args| fib = Fiber.current ameth = :"a#{meth}" - p [ameth, *args] proc = lambda { |*result| fib.resume(*result) } send(ameth, *args, &proc) Fiber.yield end end @@ -217,30 +226,43 @@ add_deferrable(&blk) end def each_job(timeout = nil, &blk) - work = Proc.new do - r = reserve(timeout) - r.callback do |job| - blk.call(job) - EM.next_tick { work.call } + if (@fiberized) + work = Proc.new do + Fiber.new do + job = reserve(timeout) + blk.call(job) + end.resume + EM.next_tick { work.call } end - r.errback do - EM.next_tick { work.call } + else + work = Proc.new do + r = reserve(timeout) + r.callback do |job| + blk.call(job) + EM.next_tick { work.call } + end + r.errback do + EM.next_tick { work.call } + end end - end + end work.call end def connected @reconnect_proc = nil @retries = 0 succeed + @connected = true + @connected_callback.call if @connected_callback end def disconnected + @connected = false d = @deferrables.dup ## if reconnecting, need to fail ourself to remove any callbacks fail @@ -254,30 +276,37 @@ raise EMJack::Disconnected end end prev_used, prev_watched = reset_tube_state - @reconnect_proc = Proc.new { reconnect(prev_used, prev_watched) } unless @reconnect_proc + unless @reconnect_proc + recon = Proc.new { reconnect(prev_used, prev_watched) } + if @fiberized + @reconnect_proc = Proc.new { Fiber.new { recon.call }.resume } + else + @reconnect_proc = recon + end + end @retries += 1 EM.add_timer(5) { @reconnect_proc.call } end - + def reconnect(prev_used, prev_watched) @conn.reconnect(@host, @port) use(prev_used) if prev_used - [ prev_watched ].flatten.compact.each do |tube| - watch(tube) + + [prev_watched].flatten.compact.each do |tube| + @fiberized ? awatch(tube) : watch(tube) end end def reconnect! @retries = 0 prev_used, prev_watched = reset_tube_state - EM.next_tick { reconnect(prev_used, prev_watched) } end def add_deferrable(&blk) df = EM::DefaultDeferrable.new @@ -292,12 +321,20 @@ end def on_error(&blk) @error_callback = blk end - + def on_disconnect(&blk) @disconnected_callback = blk + end + + def on_connect(&blk) + @connected_callback = blk + end + + def connected? + @connected end def received(data) @data << data