lib/amqp-spec/rspec.rb in amqp-spec-0.0.4 vs lib/amqp-spec/rspec.rb in amqp-spec-0.1.7

- old
+ new

@@ -1,7 +1,7 @@ -require 'mq' require 'fiber' unless Fiber.respond_to?(:current) +require 'mq' # You can include one of the following modules into your example groups: # AMQP::SpecHelper # AMQP::Spec # @@ -17,13 +17,41 @@ # # In order to stop AMQP loop, you should call 'done' AFTER you are sure that your example is finished. # For example, if you are using subscribe block that tests expectations on messages, 'done' should be # probably called at the end of this block. # -# TODO: Define 'async' method wrapping async requests and returning results... 'async_loop' too for subscribe? +# TODO: Define 'async' method wrapping async requests and returning results... 'async_loop' too for subscribe block? # TODO: 'evented_before', 'evented_after' that will be run inside EM before the example module AMQP + + # Initializes new AMQP client/connection without starting another EM loop + def self.start_connection opts={}, &block +# puts "!!!!!!!!! Existing connection: #{@conn}" if @conn + @conn = connect opts + @conn.callback(&block) if block + end + + # Closes AMQP connection and raises optional exception AFTER the AMQP connection is 100% closed + def self.stop_connection + if AMQP.conn and not AMQP.closing +# MQ.reset ? + @closing = true + @conn.close { + yield if block_given? + cleanup_state + } + end + end + + def self.cleanup_state +# MQ.reset ? + Thread.current[:mq] = nil + Thread.current[:mq_id] = nil + @conn = nil + @closing = false + end + module SpecHelper SpecTimeoutExceededError = Class.new(RuntimeError) def self.included(example_group) @@ -38,10 +66,11 @@ @@_em_default_timeout = spec_timeout else @@_em_default_timeout end end + alias default_timeout default_spec_timeout def self.default_options(opts=nil) if opts @@_em_default_options = opts @@ -64,35 +93,41 @@ # In addition to EM and AMQP options, :spec_timeout option (in seconds) is used to force spec to timeout # if something goes wrong and EM/AMQP loop hangs for some reason. SpecTimeoutExceededError is raised. def amqp opts={}, &block opts = @@_em_default_options.merge opts - EM.run do + begin + EM.run do # begin ? - @_em_spec_with_amqp = true - @_em_spec_exception = nil - spec_timeout = opts.delete(:spec_timeout) || @@_em_default_timeout - timeout(spec_timeout) if spec_timeout - @_em_spec_fiber = Fiber.new do - begin - amqp_start opts, &block - rescue Exception => @_em_spec_exception - p @_em_spec_exception - done + @_em_spec_with_amqp = true + @_em_spec_exception = nil + spec_timeout = opts.delete(:spec_timeout) || @@_em_default_timeout + timeout(spec_timeout) if spec_timeout + @_em_spec_fiber = Fiber.new do + begin + AMQP.start_connection opts, &block + rescue Exception => @_em_spec_exception +# p "inner", @_em_spec_exception + done + end + Fiber.yield end - Fiber.yield - end - @_em_spec_fiber.resume -# raise @_em_spec_exception if @_em_spec_exception + @_em_spec_fiber.resume + end + rescue Exception => outer_spec_exception +# p "outer", outer_spec_exception unless outer_spec_exception.is_a? SpecTimeoutExceededError + # Makes sure AMQP state is cleaned even after Rspec failures + AMQP.cleanup_state + raise outer_spec_exception end end # Yields to block inside EM loop, :spec_timeout option (in seconds) is used to force spec to timeout # if something goes wrong and EM/AMQP loop hangs for some reason. SpecTimeoutExceededError is raised. - # TODO: accept :spec_timeout =>1 as a Hash for compatibility with amqp interface def em(spec_timeout = @@_em_default_timeout, &block) + spec_timeout = spec_timeout[:spec_timeout] || @@_em_default_timeout if spec_timeout.is_a?(Hash) EM.run do @_em_spec_with_amqp = false @_em_spec_exception = nil timeout(spec_timeout) if spec_timeout @_em_spec_fiber = Fiber.new do @@ -115,58 +150,36 @@ @_em_spec_exception = SpecTimeoutExceededError.new done end end - # Stops AMQP and EM event loop + # Stops EM event loop, for amqp specs stops AMQP and cleans up its state def done EM.next_tick do if @_em_spec_with_amqp - amqp_stop(@_em_spec_exception) do - finish_em_spec_fiber + if AMQP.conn and not AMQP.closing + AMQP.stop_connection do + finish_em_spec_fiber { AMQP.cleanup_state } + end + else + finish_em_spec_fiber { AMQP.cleanup_state } end else finish_em_spec_fiber - raise @_em_spec_exception if @_em_spec_exception end end end private + # Stops EM loop, executes optional block, finishes off fiber and raises exception if any def finish_em_spec_fiber EM.stop_event_loop if EM.reactor_running? -# p Thread.current, Thread.current[:mq], __LINE__ + yield if block_given? @_em_spec_fiber.resume if @_em_spec_fiber.alive? + raise @_em_spec_exception if @_em_spec_exception end - - # Private method that initializes AMQP client/connection without starting another EM loop - def amqp_start opts={}, &block - AMQP.instance_exec do -# p Thread.current, Thread.current[:mq] - puts "!!!!!!!!! Existing connection: #{@conn}" if @conn - @conn = connect opts -# @conn ||= connect opts - @conn.callback(&block) if block - end - end - - # Private method that closes AMQP connection and raises optional - # exception AFTER the AMQP connection is 100% closed - def amqp_stop exception - if AMQP.conn and not AMQP.closing - AMQP.instance_exec do #(@_em_spec_exception) do |exception| - @closing = true - @conn.close { - yield if block_given? - @conn = nil - @closing = false - raise exception if exception - } - end - end - end end module Spec def self.included(cls) cls.send(:include, SpecHelper) @@ -189,7 +202,5 @@ super(&block) end end end end - -