lib/message_bus/backends/postgres.rb in message_bus-3.3.6 vs lib/message_bus/backends/postgres.rb in message_bus-3.3.7
- old
+ new
@@ -42,10 +42,11 @@
def initialize(config)
@config = config
@listening_on = {}
@available = []
@allocated = {}
+ @subscribe_connection = nil
@mutex = Mutex.new
@pid = Process.pid
end
def add(channel, value)
@@ -129,11 +130,11 @@
obj = Object.new
sync { @listening_on[channel] = obj }
listener = Listener.new
yield listener
- conn = raw_pg_connection
+ conn = @subscribe_connection = raw_pg_connection
conn.exec "LISTEN #{channel}"
listener.do_sub.call
while listening_on?(channel, obj)
conn.wait_for_notify(10) do |_, _, payload|
break unless listening_on?(channel, obj)
@@ -143,10 +144,13 @@
end
listener.do_unsub.call
conn.exec "UNLISTEN #{channel}"
nil
+ ensure
+ @subscribe_connection&.close
+ @subscribe_connection = nil
end
def unsubscribe
sync { @listening_on.clear }
end
@@ -249,11 +253,11 @@
# after 7 days inactive backlogs will be removed
@max_backlog_age = 604800
@clear_every = config[:clear_every] || 1
end
- # Reconnects to Postgres; used after a process fork, typically triggerd by a forking webserver
+ # Reconnects to Postgres; used after a process fork, typically triggered by a forking webserver
# @see Base#after_fork
def after_fork
client.reconnect
end
@@ -275,10 +279,10 @@
c = client
backlog_id = c.add(channel, data)
msg = MessageBus::Message.new backlog_id, backlog_id, channel, data
payload = msg.encode
c.publish postgresql_channel_name, payload
- if backlog_id % clear_every == 0
+ if backlog_id && backlog_id % clear_every == 0
max_backlog_size = (opts && opts[:max_backlog_size]) || self.max_backlog_size
max_backlog_age = (opts && opts[:max_backlog_age]) || self.max_backlog_age
c.clear_global_backlog(backlog_id, @max_global_backlog_size)
c.expire(max_backlog_age)
c.clear_channel_backlog(channel, backlog_id, max_backlog_size)