lib/vx/consumer/session.rb in vx-consumer-0.1.2 vs lib/vx/consumer/session.rb in vx-consumer-0.1.3
- old
+ new
@@ -38,13 +38,13 @@
end
def close
if open?
@@session_lock.synchronize do
- instrument("closing_collection", info: conn_info)
+ instrument("closing_connection", info: conn_info)
- instrument("close_collection", info: conn_info) do
+ instrument("close_connection", info: conn_info) do
begin
conn.close
while conn.status != :closed
sleep 0.01
end
@@ -96,14 +96,27 @@
else
"not connected"
end
end
- def with_channel
+ def pub_channel
assert_connection_is_open
- conn.with_channel { |ch| yield ch }
+ key = :vx_consumer_session_pub_channel
+ ch = Thread.current[key]
+
+ if ch and ch.closed?
+ ch = nil
+ end
+
+ unless ch
+ ch = conn.create_channel
+ assign_error_handlers_to_channel(ch)
+ ch
+ end
+
+ ch
end
def declare_exchange(ch, name, options = nil)
assert_connection_is_open
@@ -114,9 +127,14 @@
def declare_queue(ch, name, options = nil)
assert_connection_is_open
options ||= {}
ch.queue name, options
+ end
+
+ def assign_error_handlers_to_channel(ch)
+ ch.on_uncaught_exception {|e, c| ::Vx::Consumer.exception_handler(e, consumer: c) }
+ ch.on_error {|e, c| ::Vx::Consumer.exception_handler(e, consumer: c) }
end
private
def assert_connection_is_open