lib/message_bus/backends/postgres.rb in message_bus-3.3.6 vs lib/message_bus/backends/postgres.rb in message_bus-3.3.7

- old
+ new

@@ -42,10 +42,11 @@ def initialize(config) @config = config @listening_on = {} @available = [] @allocated = {} + @subscribe_connection = nil @mutex = Mutex.new @pid = Process.pid end def add(channel, value) @@ -129,11 +130,11 @@ obj = Object.new sync { @listening_on[channel] = obj } listener = Listener.new yield listener - conn = raw_pg_connection + conn = @subscribe_connection = raw_pg_connection conn.exec "LISTEN #{channel}" listener.do_sub.call while listening_on?(channel, obj) conn.wait_for_notify(10) do |_, _, payload| break unless listening_on?(channel, obj) @@ -143,10 +144,13 @@ end listener.do_unsub.call conn.exec "UNLISTEN #{channel}" nil + ensure + @subscribe_connection&.close + @subscribe_connection = nil end def unsubscribe sync { @listening_on.clear } end @@ -249,11 +253,11 @@ # after 7 days inactive backlogs will be removed @max_backlog_age = 604800 @clear_every = config[:clear_every] || 1 end - # Reconnects to Postgres; used after a process fork, typically triggerd by a forking webserver + # Reconnects to Postgres; used after a process fork, typically triggered by a forking webserver # @see Base#after_fork def after_fork client.reconnect end @@ -275,10 +279,10 @@ c = client backlog_id = c.add(channel, data) msg = MessageBus::Message.new backlog_id, backlog_id, channel, data payload = msg.encode c.publish postgresql_channel_name, payload - if backlog_id % clear_every == 0 + if backlog_id && backlog_id % clear_every == 0 max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size max_backlog_age = (opts && opts[:max_backlog_age]) || self.max_backlog_age c.clear_global_backlog(backlog_id, @max_global_backlog_size) c.expire(max_backlog_age) c.clear_channel_backlog(channel, backlog_id, max_backlog_size)