lib/amqp-spec/rspec.rb in amqp-spec-0.0.4 vs lib/amqp-spec/rspec.rb in amqp-spec-0.1.7
- old
+ new
@@ -1,7 +1,7 @@
-require 'mq'
require 'fiber' unless Fiber.respond_to?(:current)
+require 'mq'
# You can include one of the following modules into your example groups:
# AMQP::SpecHelper
# AMQP::Spec
#
@@ -17,13 +17,41 @@
#
# In order to stop AMQP loop, you should call 'done' AFTER you are sure that your example is finished.
# For example, if you are using subscribe block that tests expectations on messages, 'done' should be
# probably called at the end of this block.
#
-# TODO: Define 'async' method wrapping async requests and returning results... 'async_loop' too for subscribe?
+# TODO: Define 'async' method wrapping async requests and returning results... 'async_loop' too for subscribe block?
# TODO: 'evented_before', 'evented_after' that will be run inside EM before the example
module AMQP
+
+ # Initializes new AMQP client/connection without starting another EM loop
+ def self.start_connection opts={}, &block
+# puts "!!!!!!!!! Existing connection: #{@conn}" if @conn
+ @conn = connect opts
+ @conn.callback(&block) if block
+ end
+
+ # Closes AMQP connection and raises optional exception AFTER the AMQP connection is 100% closed
+ def self.stop_connection
+ if AMQP.conn and not AMQP.closing
+# MQ.reset ?
+ @closing = true
+ @conn.close {
+ yield if block_given?
+ cleanup_state
+ }
+ end
+ end
+
+ def self.cleanup_state
+# MQ.reset ?
+ Thread.current[:mq] = nil
+ Thread.current[:mq_id] = nil
+ @conn = nil
+ @closing = false
+ end
+
module SpecHelper
SpecTimeoutExceededError = Class.new(RuntimeError)
def self.included(example_group)
@@ -38,10 +66,11 @@
@@_em_default_timeout = spec_timeout
else
@@_em_default_timeout
end
end
+
alias default_timeout default_spec_timeout
def self.default_options(opts=nil)
if opts
@@_em_default_options = opts
@@ -64,35 +93,41 @@
# In addition to EM and AMQP options, :spec_timeout option (in seconds) is used to force spec to timeout
# if something goes wrong and EM/AMQP loop hangs for some reason. SpecTimeoutExceededError is raised.
def amqp opts={}, &block
opts = @@_em_default_options.merge opts
- EM.run do
+ begin
+ EM.run do
# begin ?
- @_em_spec_with_amqp = true
- @_em_spec_exception = nil
- spec_timeout = opts.delete(:spec_timeout) || @@_em_default_timeout
- timeout(spec_timeout) if spec_timeout
- @_em_spec_fiber = Fiber.new do
- begin
- amqp_start opts, &block
- rescue Exception => @_em_spec_exception
- p @_em_spec_exception
- done
+ @_em_spec_with_amqp = true
+ @_em_spec_exception = nil
+ spec_timeout = opts.delete(:spec_timeout) || @@_em_default_timeout
+ timeout(spec_timeout) if spec_timeout
+ @_em_spec_fiber = Fiber.new do
+ begin
+ AMQP.start_connection opts, &block
+ rescue Exception => @_em_spec_exception
+# p "inner", @_em_spec_exception
+ done
+ end
+ Fiber.yield
end
- Fiber.yield
- end
- @_em_spec_fiber.resume
-# raise @_em_spec_exception if @_em_spec_exception
+ @_em_spec_fiber.resume
+ end
+ rescue Exception => outer_spec_exception
+# p "outer", outer_spec_exception unless outer_spec_exception.is_a? SpecTimeoutExceededError
+ # Makes sure AMQP state is cleaned even after Rspec failures
+ AMQP.cleanup_state
+ raise outer_spec_exception
end
end
# Yields to block inside EM loop, :spec_timeout option (in seconds) is used to force spec to timeout
# if something goes wrong and EM/AMQP loop hangs for some reason. SpecTimeoutExceededError is raised.
- # TODO: accept :spec_timeout =>1 as a Hash for compatibility with amqp interface
def em(spec_timeout = @@_em_default_timeout, &block)
+ spec_timeout = spec_timeout[:spec_timeout] || @@_em_default_timeout if spec_timeout.is_a?(Hash)
EM.run do
@_em_spec_with_amqp = false
@_em_spec_exception = nil
timeout(spec_timeout) if spec_timeout
@_em_spec_fiber = Fiber.new do
@@ -115,58 +150,36 @@
@_em_spec_exception = SpecTimeoutExceededError.new
done
end
end
- # Stops AMQP and EM event loop
+ # Stops EM event loop, for amqp specs stops AMQP and cleans up its state
def done
EM.next_tick do
if @_em_spec_with_amqp
- amqp_stop(@_em_spec_exception) do
- finish_em_spec_fiber
+ if AMQP.conn and not AMQP.closing
+ AMQP.stop_connection do
+ finish_em_spec_fiber { AMQP.cleanup_state }
+ end
+ else
+ finish_em_spec_fiber { AMQP.cleanup_state }
end
else
finish_em_spec_fiber
- raise @_em_spec_exception if @_em_spec_exception
end
end
end
private
+ # Stops EM loop, executes optional block, finishes off fiber and raises exception if any
def finish_em_spec_fiber
EM.stop_event_loop if EM.reactor_running?
-# p Thread.current, Thread.current[:mq], __LINE__
+ yield if block_given?
@_em_spec_fiber.resume if @_em_spec_fiber.alive?
+ raise @_em_spec_exception if @_em_spec_exception
end
-
- # Private method that initializes AMQP client/connection without starting another EM loop
- def amqp_start opts={}, &block
- AMQP.instance_exec do
-# p Thread.current, Thread.current[:mq]
- puts "!!!!!!!!! Existing connection: #{@conn}" if @conn
- @conn = connect opts
-# @conn ||= connect opts
- @conn.callback(&block) if block
- end
- end
-
- # Private method that closes AMQP connection and raises optional
- # exception AFTER the AMQP connection is 100% closed
- def amqp_stop exception
- if AMQP.conn and not AMQP.closing
- AMQP.instance_exec do #(@_em_spec_exception) do |exception|
- @closing = true
- @conn.close {
- yield if block_given?
- @conn = nil
- @closing = false
- raise exception if exception
- }
- end
- end
- end
end
module Spec
def self.included(cls)
cls.send(:include, SpecHelper)
@@ -189,7 +202,5 @@
super(&block)
end
end
end
end
-
-