lib/message_bus/backends/postgres.rb in message_bus-2.0.0.beta.4 vs lib/message_bus/backends/postgres.rb in message_bus-2.0.0.beta.5
- old
+ new
@@ -101,29 +101,23 @@
def subscribe(channel)
obj = Object.new
sync{@listening_on[channel] = obj}
listener = Listener.new
yield listener
- pid = Process.pid
conn = raw_pg_connection
conn.exec "LISTEN #{channel}"
listener.do_sub.call
while listening_on?(channel, obj)
conn.wait_for_notify(10) do |_,_,payload|
+ break unless listening_on?(channel, obj)
listener.do_message.call(nil, payload)
end
- break if pid != Process.pid
end
listener.do_unsub.call
- if pid != Process.pid
- sync{INHERITED_CONNECTIONS << conn}
- else
- conn.exec "UNLISTEN #{channel}"
- end
-
+ conn.exec "UNLISTEN #{channel}"
nil
end
def unsubscribe
sync{@listening_on.clear}
@@ -137,22 +131,21 @@
ensure
r.clear if r.respond_to?(:clear)
end
def create_table(conn)
- conn.exec 'CREATE TABLE message_bus (id bigserial PRIMARY KEY, channel text NOT NULL, value text NOT NULL, added_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)'
+ conn.exec 'CREATE TABLE message_bus (id bigserial PRIMARY KEY, channel text NOT NULL, value text NOT NULL CHECK (octet_length(value) >= 2), added_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)'
conn.exec 'CREATE INDEX table_channel_id_index ON message_bus (channel, id)'
conn.exec 'CREATE INDEX table_added_at_index ON message_bus (added_at)'
nil
end
def hold
current_pid = Process.pid
if current_pid != @pid
@pid = current_pid
sync do
- @listening_on.clear
INHERITED_CONNECTIONS.concat(@available)
@available.clear
end
end
@@ -212,13 +205,11 @@
end
end
class MessageBus::Postgres::ReliablePubSub
attr_reader :subscribed
- attr_accessor :max_publish_retries, :max_publish_wait, :max_backlog_size,
- :max_global_backlog_size, :max_in_memory_publish_backlog,
- :max_backlog_age
+ attr_accessor :max_backlog_size, :max_global_backlog_size, :max_backlog_age, :clear_every
UNSUB_MESSAGE = "$$UNSUBSCRIBE"
def self.reset!(config)
MessageBus::Postgres::Client.new(config).reset!
@@ -227,19 +218,13 @@
# max_backlog_size is per multiplexed channel
def initialize(config = {}, max_backlog_size = 1000)
@config = config
@max_backlog_size = max_backlog_size
@max_global_backlog_size = 2000
- @max_publish_retries = 10
- @max_publish_wait = 500 #ms
- @max_in_memory_publish_backlog = 1000
- @in_memory_backlog = []
- @lock = Mutex.new
- @flush_backlog_thread = nil
# after 7 days inactive backlogs will be removed
@max_backlog_age = 604800
- @h = {}
+ @clear_every = config[:clear_every] || 1
end
def new_connection
MessageBus::Postgres::Client.new(@config)
end
@@ -270,12 +255,14 @@
client = self.client
backlog_id = client.add(channel, data)
msg = MessageBus::Message.new backlog_id, backlog_id, channel, data
payload = msg.encode
client.publish postgresql_channel_name, payload
- client.clear_global_backlog(backlog_id, @max_global_backlog_size)
- client.expire(@max_backlog_age)
- client.clear_channel_backlog(channel, backlog_id, @max_backlog_size)
+ if backlog_id % clear_every == 0
+ client.clear_global_backlog(backlog_id, @max_global_backlog_size)
+ client.expire(@max_backlog_age)
+ client.clear_channel_backlog(channel, backlog_id, @max_backlog_size)
+ end
backlog_id
end
def last_id(channel)