lib/vx/consumer/session.rb in vx-consumer-0.1.2 vs lib/vx/consumer/session.rb in vx-consumer-0.1.3

- old
+ new

@@ -38,13 +38,13 @@ end def close if open? @@session_lock.synchronize do - instrument("closing_collection", info: conn_info) + instrument("closing_connection", info: conn_info) - instrument("close_collection", info: conn_info) do + instrument("close_connection", info: conn_info) do begin conn.close while conn.status != :closed sleep 0.01 end @@ -96,14 +96,27 @@ else "not connected" end end - def with_channel + def pub_channel assert_connection_is_open - conn.with_channel { |ch| yield ch } + key = :vx_consumer_session_pub_channel + ch = Thread.current[key] + + if ch and ch.closed? + ch = nil + end + + unless ch + ch = conn.create_channel + assign_error_handlers_to_channel(ch) + ch + end + + ch end def declare_exchange(ch, name, options = nil) assert_connection_is_open @@ -114,9 +127,14 @@ def declare_queue(ch, name, options = nil) assert_connection_is_open options ||= {} ch.queue name, options + end + + def assign_error_handlers_to_channel(ch) + ch.on_uncaught_exception {|e, c| ::Vx::Consumer.exception_handler(e, consumer: c) } + ch.on_error {|e, c| ::Vx::Consumer.exception_handler(e, consumer: c) } end private def assert_connection_is_open