lib/message_bus/backends/postgres.rb in message_bus-2.0.0.beta.4 vs lib/message_bus/backends/postgres.rb in message_bus-2.0.0.beta.5

- old
+ new

@@ -101,29 +101,23 @@ def subscribe(channel) obj = Object.new sync{@listening_on[channel] = obj} listener = Listener.new yield listener - pid = Process.pid conn = raw_pg_connection conn.exec "LISTEN #{channel}" listener.do_sub.call while listening_on?(channel, obj) conn.wait_for_notify(10) do |_,_,payload| + break unless listening_on?(channel, obj) listener.do_message.call(nil, payload) end - break if pid != Process.pid end listener.do_unsub.call - if pid != Process.pid - sync{INHERITED_CONNECTIONS << conn} - else - conn.exec "UNLISTEN #{channel}" - end - + conn.exec "UNLISTEN #{channel}" nil end def unsubscribe sync{@listening_on.clear} @@ -137,22 +131,21 @@ ensure r.clear if r.respond_to?(:clear) end def create_table(conn) - conn.exec 'CREATE TABLE message_bus (id bigserial PRIMARY KEY, channel text NOT NULL, value text NOT NULL, added_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)' + conn.exec 'CREATE TABLE message_bus (id bigserial PRIMARY KEY, channel text NOT NULL, value text NOT NULL CHECK (octet_length(value) >= 2), added_at timestamp DEFAULT CURRENT_TIMESTAMP NOT NULL)' conn.exec 'CREATE INDEX table_channel_id_index ON message_bus (channel, id)' conn.exec 'CREATE INDEX table_added_at_index ON message_bus (added_at)' nil end def hold current_pid = Process.pid if current_pid != @pid @pid = current_pid sync do - @listening_on.clear INHERITED_CONNECTIONS.concat(@available) @available.clear end end @@ -212,13 +205,11 @@ end end class MessageBus::Postgres::ReliablePubSub attr_reader :subscribed - attr_accessor :max_publish_retries, :max_publish_wait, :max_backlog_size, - :max_global_backlog_size, :max_in_memory_publish_backlog, - :max_backlog_age + attr_accessor :max_backlog_size, :max_global_backlog_size, :max_backlog_age, :clear_every UNSUB_MESSAGE = "$$UNSUBSCRIBE" def self.reset!(config) MessageBus::Postgres::Client.new(config).reset! @@ -227,19 +218,13 @@ # max_backlog_size is per multiplexed channel def initialize(config = {}, max_backlog_size = 1000) @config = config @max_backlog_size = max_backlog_size @max_global_backlog_size = 2000 - @max_publish_retries = 10 - @max_publish_wait = 500 #ms - @max_in_memory_publish_backlog = 1000 - @in_memory_backlog = [] - @lock = Mutex.new - @flush_backlog_thread = nil # after 7 days inactive backlogs will be removed @max_backlog_age = 604800 - @h = {} + @clear_every = config[:clear_every] || 1 end def new_connection MessageBus::Postgres::Client.new(@config) end @@ -270,12 +255,14 @@ client = self.client backlog_id = client.add(channel, data) msg = MessageBus::Message.new backlog_id, backlog_id, channel, data payload = msg.encode client.publish postgresql_channel_name, payload - client.clear_global_backlog(backlog_id, @max_global_backlog_size) - client.expire(@max_backlog_age) - client.clear_channel_backlog(channel, backlog_id, @max_backlog_size) + if backlog_id % clear_every == 0 + client.clear_global_backlog(backlog_id, @max_global_backlog_size) + client.expire(@max_backlog_age) + client.clear_channel_backlog(channel, backlog_id, @max_backlog_size) + end backlog_id end def last_id(channel)