lib/message_bus/backends/postgres.rb in message_bus-4.0.0 vs lib/message_bus/backends/postgres.rb in message_bus-4.1.0

- old
+ new

@@ -130,10 +130,25 @@ else hold { |conn| exec_prepared(conn, 'max_id', &block) } end end + def max_ids(*channels) + block = proc do |pg_result| + ids = Array.new(channels.size, 0) + pg_result.ntuples.times do |i| + channel = pg_result.getvalue(i, 0) + max_id = pg_result.getvalue(i, 1) + channel_index = channels.index(channel) + ids[channel_index] = max_id.to_i + end + ids + end + + hold { |conn| exec_prepared(conn, 'max_channel_ids', [PG::TextEncoder::Array.new.encode(channels)], &block) } + end + def publish(channel, data) hold { |conn| exec_prepared(conn, 'publish', [channel, data]) } end def subscribe(channel) @@ -173,13 +188,20 @@ ensure r.clear if r.respond_to?(:clear) end def create_table(conn) - conn.exec 'CREATE TABLE message_bus (id bigserial PRIMARY KEY, channel text NOT NULL, value text NOT NULL CHECK (octet_length(value) >= 2), added_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)' - conn.exec 'CREATE INDEX table_channel_id_index ON message_bus (channel, id)' - conn.exec 'CREATE INDEX table_added_at_index ON message_bus (added_at)' + sync do + begin + conn.exec("SELECT 'message_bus'::regclass") + rescue PG::UndefinedTable + conn.exec 'CREATE TABLE message_bus (id bigserial PRIMARY KEY, channel text NOT NULL, value text NOT NULL CHECK (octet_length(value) >= 2), added_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)' + conn.exec 'CREATE INDEX table_channel_id_index ON message_bus (channel, id)' + conn.exec 'CREATE INDEX table_added_at_index ON message_bus (added_at)' + end + end + nil end def hold current_pid = Process.pid @@ -212,24 +234,21 @@ end def new_pg_connection conn = raw_pg_connection - begin - conn.exec("SELECT 'message_bus'::regclass") - rescue PG::UndefinedTable - create_table(conn) - end + create_table(conn) conn.exec 'PREPARE insert_message AS INSERT INTO message_bus (channel, value) VALUES ($1, $2) RETURNING id' conn.exec 'PREPARE clear_global_backlog AS DELETE FROM message_bus WHERE (id <= $1)' conn.exec 'PREPARE clear_channel_backlog AS DELETE FROM message_bus WHERE ((channel = $1) AND (id <= (SELECT id FROM message_bus WHERE ((channel = $1) AND (id <= $2)) ORDER BY id DESC LIMIT 1 OFFSET $3)))' conn.exec 'PREPARE channel_backlog AS SELECT id, value FROM message_bus WHERE ((channel = $1) AND (id > $2)) ORDER BY id' conn.exec 'PREPARE global_backlog AS SELECT id, channel, value FROM message_bus WHERE (id > $1) ORDER BY id' conn.exec "PREPARE expire AS DELETE FROM message_bus WHERE added_at < CURRENT_TIMESTAMP - ($1::text || ' seconds')::interval" conn.exec 'PREPARE get_message AS SELECT value FROM message_bus WHERE ((channel = $1) AND (id = $2))' conn.exec 'PREPARE max_channel_id AS SELECT max(id) FROM message_bus WHERE (channel = $1)' + conn.exec 'PREPARE max_channel_ids AS SELECT channel, max(id) FROM message_bus WHERE (channel = ANY($1)) GROUP BY channel' conn.exec 'PREPARE max_id AS SELECT max(id) FROM message_bus' conn.exec 'PREPARE publish AS SELECT pg_notify($1, $2)' conn end @@ -308,10 +327,15 @@ # (see Base#last_id) def last_id(channel) client.max_id(channel) end - # (see Base#last_id) + # (see Base#last_ids) + def last_ids(*channels) + client.max_ids(*channels) + end + + # (see Base#backlog) def backlog(channel, last_id = 0) items = client.backlog channel, last_id.to_i items.map! do |id, data| MessageBus::Message.new id, id, channel, data