lib/message_bus/backends/postgres.rb in message_bus-3.3.7 vs lib/message_bus/backends/postgres.rb in message_bus-3.3.8
- old
+ new
@@ -84,14 +84,16 @@
def get_value(channel, id)
hold { |conn| exec_prepared(conn, 'get_message', [channel, id]) { |r| r.getvalue(0, 0) } }
end
- def reconnect
+ def after_fork
sync do
- @listening_on.clear
+ @pid = Process.pid
+ INHERITED_CONNECTIONS.concat(@available)
@available.clear
+ @listening_on.clear
end
end
# Dangerous, drops the message_bus table containing the backlog if it exists.
def reset!
@@ -99,10 +101,17 @@
conn.exec 'DROP TABLE IF EXISTS message_bus'
create_table(conn)
end
end
+ def destroy
+ sync do
+ @available.each(&:close)
+ @available.clear
+ end
+ end
+
# use with extreme care, will nuke all of the data
def expire_all_backlogs!
reset!
end
@@ -172,15 +181,11 @@
end
def hold
current_pid = Process.pid
if current_pid != @pid
- @pid = current_pid
- sync do
- INHERITED_CONNECTIONS.concat(@available)
- @available.clear
- end
+ after_fork
end
if conn = sync { @allocated[Thread.current] }
return yield(conn)
end
@@ -251,23 +256,29 @@
@max_backlog_size = max_backlog_size
@max_global_backlog_size = 2000
# after 7 days inactive backlogs will be removed
@max_backlog_age = 604800
@clear_every = config[:clear_every] || 1
+ @mutex = Mutex.new
end
# Reconnects to Postgres; used after a process fork, typically triggered by a forking webserver
# @see Base#after_fork
def after_fork
- client.reconnect
+ client.after_fork
end
# (see Base#reset!)
def reset!
client.reset!
end
+ # (see Base#destroy)
+ def destroy
+ client.destroy
+ end
+
# (see Base#expire_all_backlogs!)
def expire_all_backlogs!
client.expire_all_backlogs!
end
@@ -399,14 +410,10 @@
end
private
def client
- @client ||= new_connection
- end
-
- def new_connection
- Client.new(@config)
+ @client || @mutex.synchronize { @client ||= Client.new(@config) }
end
def postgresql_channel_name
db = @config[:db] || 0
"_message_bus_#{db}"