lib/em-jack/connection.rb in em-jack-0.1.3 vs lib/em-jack/connection.rb in em-jack-0.1.4
- old
+ new
@@ -1,7 +1,8 @@
require 'eventmachine'
require 'yaml'
+require 'uri'
module EMJack
class Connection
include EM::Deferrable
@@ -19,19 +20,28 @@
def self.handlers
@@handlers
end
def initialize(opts = {})
- @host = opts[:host] || 'localhost'
- @port = opts[:port] || 11300
- @tube = opts[:tube]
+ case opts
+ when Hash
+ @host = opts[:host] || 'localhost'
+ @port = opts[:port] || 11300
+ @tube = opts[:tube]
+ when String
+ uri = URI.parse(opts)
+ @host = uri.host || 'localhost'
+ @port = uri.port || 11300
+ @tube = uri.path.gsub(/^\//, '') # Kill the leading /
+ end
reset_tube_state
@data = ""
@retries = 0
@in_reserve = false
+ @fiberized = false
@conn = EM::connect(host, port, EMJack::BeanstalkConnection) do |conn|
conn.client = self
end
@@ -40,31 +50,30 @@
watch(@tube)
end
end
def reset_tube_state
- prev_used = @used_tube
+ prev_used = @used_tube
prev_watched = @watched_tubes.dup if @watched_tubes
@used_tube = 'default'
@watched_tubes = ['default']
@deferrables = []
return [prev_used, prev_watched]
end
def fiber!
- eigen = (class << self
- self
- end)
+ @fiberized = true
+
+ eigen = (class << self; self; end)
eigen.instance_eval do
%w(use reserve ignore watch peek stats list delete touch bury kick pause release put).each do |meth|
alias_method :"a#{meth}", meth.to_sym
define_method(meth.to_sym) do |*args|
fib = Fiber.current
ameth = :"a#{meth}"
- p [ameth, *args]
proc = lambda { |*result| fib.resume(*result) }
send(ameth, *args, &proc)
Fiber.yield
end
end
@@ -217,30 +226,43 @@
add_deferrable(&blk)
end
def each_job(timeout = nil, &blk)
- work = Proc.new do
- r = reserve(timeout)
- r.callback do |job|
- blk.call(job)
- EM.next_tick { work.call }
+ if (@fiberized)
+ work = Proc.new do
+ Fiber.new do
+ job = reserve(timeout)
+ blk.call(job)
+ end.resume
+ EM.next_tick { work.call }
end
- r.errback do
- EM.next_tick { work.call }
+ else
+ work = Proc.new do
+ r = reserve(timeout)
+ r.callback do |job|
+ blk.call(job)
+ EM.next_tick { work.call }
+ end
+ r.errback do
+ EM.next_tick { work.call }
+ end
end
- end
+ end
work.call
end
def connected
@reconnect_proc = nil
@retries = 0
succeed
+ @connected = true
+ @connected_callback.call if @connected_callback
end
def disconnected
+ @connected = false
d = @deferrables.dup
## if reconnecting, need to fail ourself to remove any callbacks
fail
@@ -254,30 +276,37 @@
raise EMJack::Disconnected
end
end
prev_used, prev_watched = reset_tube_state
- @reconnect_proc = Proc.new { reconnect(prev_used, prev_watched) } unless @reconnect_proc
+ unless @reconnect_proc
+ recon = Proc.new { reconnect(prev_used, prev_watched) }
+ if @fiberized
+ @reconnect_proc = Proc.new { Fiber.new { recon.call }.resume }
+ else
+ @reconnect_proc = recon
+ end
+ end
@retries += 1
EM.add_timer(5) { @reconnect_proc.call }
end
-
+
def reconnect(prev_used, prev_watched)
@conn.reconnect(@host, @port)
use(prev_used) if prev_used
- [ prev_watched ].flatten.compact.each do |tube|
- watch(tube)
+
+ [prev_watched].flatten.compact.each do |tube|
+ @fiberized ? awatch(tube) : watch(tube)
end
end
def reconnect!
@retries = 0
prev_used, prev_watched = reset_tube_state
-
EM.next_tick { reconnect(prev_used, prev_watched) }
end
def add_deferrable(&blk)
df = EM::DefaultDeferrable.new
@@ -292,12 +321,20 @@
end
def on_error(&blk)
@error_callback = blk
end
-
+
def on_disconnect(&blk)
@disconnected_callback = blk
+ end
+
+ def on_connect(&blk)
+ @connected_callback = blk
+ end
+
+ def connected?
+ @connected
end
def received(data)
@data << data