lib/message_bus/backends/postgres.rb in message_bus-3.3.7 vs lib/message_bus/backends/postgres.rb in message_bus-3.3.8

- old
+ new

@@ -84,14 +84,16 @@ def get_value(channel, id) hold { |conn| exec_prepared(conn, 'get_message', [channel, id]) { |r| r.getvalue(0, 0) } } end - def reconnect + def after_fork sync do - @listening_on.clear + @pid = Process.pid + INHERITED_CONNECTIONS.concat(@available) @available.clear + @listening_on.clear end end # Dangerous, drops the message_bus table containing the backlog if it exists. def reset! @@ -99,10 +101,17 @@ conn.exec 'DROP TABLE IF EXISTS message_bus' create_table(conn) end end + def destroy + sync do + @available.each(&:close) + @available.clear + end + end + # use with extreme care, will nuke all of the data def expire_all_backlogs! reset! end @@ -172,15 +181,11 @@ end def hold current_pid = Process.pid if current_pid != @pid - @pid = current_pid - sync do - INHERITED_CONNECTIONS.concat(@available) - @available.clear - end + after_fork end if conn = sync { @allocated[Thread.current] } return yield(conn) end @@ -251,23 +256,29 @@ @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 # after 7 days inactive backlogs will be removed @max_backlog_age = 604800 @clear_every = config[:clear_every] || 1 + @mutex = Mutex.new end # Reconnects to Postgres; used after a process fork, typically triggered by a forking webserver # @see Base#after_fork def after_fork - client.reconnect + client.after_fork end # (see Base#reset!) def reset! client.reset! end + # (see Base#destroy) + def destroy + client.destroy + end + # (see Base#expire_all_backlogs!) def expire_all_backlogs! client.expire_all_backlogs! end @@ -399,14 +410,10 @@ end private def client - @client ||= new_connection - end - - def new_connection - Client.new(@config) + @client || @mutex.synchronize { @client ||= Client.new(@config) } end def postgresql_channel_name db = @config[:db] || 0 "_message_bus_#{db}"