lib/message_bus/backends/postgres.rb in message_bus-4.0.0 vs lib/message_bus/backends/postgres.rb in message_bus-4.1.0
- old
+ new
@@ -130,10 +130,25 @@
else
hold { |conn| exec_prepared(conn, 'max_id', &block) }
end
end
+ def max_ids(*channels)
+ block = proc do |pg_result|
+ ids = Array.new(channels.size, 0)
+ pg_result.ntuples.times do |i|
+ channel = pg_result.getvalue(i, 0)
+ max_id = pg_result.getvalue(i, 1)
+ channel_index = channels.index(channel)
+ ids[channel_index] = max_id.to_i
+ end
+ ids
+ end
+
+ hold { |conn| exec_prepared(conn, 'max_channel_ids', [PG::TextEncoder::Array.new.encode(channels)], &block) }
+ end
+
def publish(channel, data)
hold { |conn| exec_prepared(conn, 'publish', [channel, data]) }
end
def subscribe(channel)
@@ -173,13 +188,20 @@
ensure
r.clear if r.respond_to?(:clear)
end
def create_table(conn)
- conn.exec 'CREATE TABLE message_bus (id bigserial PRIMARY KEY, channel text NOT NULL, value text NOT NULL CHECK (octet_length(value) >= 2), added_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)'
- conn.exec 'CREATE INDEX table_channel_id_index ON message_bus (channel, id)'
- conn.exec 'CREATE INDEX table_added_at_index ON message_bus (added_at)'
+ sync do
+ begin
+ conn.exec("SELECT 'message_bus'::regclass")
+ rescue PG::UndefinedTable
+ conn.exec 'CREATE TABLE message_bus (id bigserial PRIMARY KEY, channel text NOT NULL, value text NOT NULL CHECK (octet_length(value) >= 2), added_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)'
+ conn.exec 'CREATE INDEX table_channel_id_index ON message_bus (channel, id)'
+ conn.exec 'CREATE INDEX table_added_at_index ON message_bus (added_at)'
+ end
+ end
+
nil
end
def hold
current_pid = Process.pid
@@ -212,24 +234,21 @@
end
def new_pg_connection
conn = raw_pg_connection
- begin
- conn.exec("SELECT 'message_bus'::regclass")
- rescue PG::UndefinedTable
- create_table(conn)
- end
+ create_table(conn)
conn.exec 'PREPARE insert_message AS INSERT INTO message_bus (channel, value) VALUES ($1, $2) RETURNING id'
conn.exec 'PREPARE clear_global_backlog AS DELETE FROM message_bus WHERE (id <= $1)'
conn.exec 'PREPARE clear_channel_backlog AS DELETE FROM message_bus WHERE ((channel = $1) AND (id <= (SELECT id FROM message_bus WHERE ((channel = $1) AND (id <= $2)) ORDER BY id DESC LIMIT 1 OFFSET $3)))'
conn.exec 'PREPARE channel_backlog AS SELECT id, value FROM message_bus WHERE ((channel = $1) AND (id > $2)) ORDER BY id'
conn.exec 'PREPARE global_backlog AS SELECT id, channel, value FROM message_bus WHERE (id > $1) ORDER BY id'
conn.exec "PREPARE expire AS DELETE FROM message_bus WHERE added_at < CURRENT_TIMESTAMP - ($1::text || ' seconds')::interval"
conn.exec 'PREPARE get_message AS SELECT value FROM message_bus WHERE ((channel = $1) AND (id = $2))'
conn.exec 'PREPARE max_channel_id AS SELECT max(id) FROM message_bus WHERE (channel = $1)'
+ conn.exec 'PREPARE max_channel_ids AS SELECT channel, max(id) FROM message_bus WHERE (channel = ANY($1)) GROUP BY channel'
conn.exec 'PREPARE max_id AS SELECT max(id) FROM message_bus'
conn.exec 'PREPARE publish AS SELECT pg_notify($1, $2)'
conn
end
@@ -308,10 +327,15 @@
# (see Base#last_id)
def last_id(channel)
client.max_id(channel)
end
- # (see Base#last_id)
+ # (see Base#last_ids)
+ def last_ids(*channels)
+ client.max_ids(*channels)
+ end
+
+ # (see Base#backlog)
def backlog(channel, last_id = 0)
items = client.backlog channel, last_id.to_i
items.map! do |id, data|
MessageBus::Message.new id, id, channel, data