lib/message_bus/backends/postgres.rb in message_bus-2.0.0.beta.6 vs lib/message_bus/backends/postgres.rb in message_bus-2.0.0.beta.7
- old
+ new
@@ -50,15 +50,15 @@
hold{|conn| exec_prepared(conn, 'expire', [max_backlog_age])}
nil
end
def backlog(channel, backlog_id)
- hold{|conn| exec_prepared(conn, 'channel_backlog', [channel, backlog_id]){|r| r.values.each{|a| a[0] = a[0].to_i}}}
+ hold{|conn| exec_prepared(conn, 'channel_backlog', [channel, backlog_id]){|r| r.values.each{|a| a[0] = a[0].to_i}}} || []
end
def global_backlog(backlog_id)
- hold{|conn| exec_prepared(conn, 'global_backlog', [backlog_id]){|r| r.values.each{|a| a[0] = a[0].to_i}}}
+ hold{|conn| exec_prepared(conn, 'global_backlog', [backlog_id]){|r| r.values.each{|a| a[0] = a[0].to_i}}} || []
end
def get_value(channel, id)
hold{|conn| exec_prepared(conn, 'get_message', [channel, id]){|r| r.getvalue(0,0)}}
end
@@ -155,11 +155,11 @@
begin
conn = sync{@available.shift} || new_pg_connection
sync{@allocated[Thread.current] = conn}
yield conn
- rescue PG::ConnectionBad => e
+ rescue PG::ConnectionBad, PG::UnableToSend => e
# don't add this connection back to the pool
ensure
sync{@allocated.delete(Thread.current)}
if Process.pid != current_pid
sync{INHERITED_CONNECTIONS << conn}
@@ -277,11 +277,9 @@
MessageBus::Message.new id, id, channel, data
end
end
def global_backlog(last_id = nil)
- last_id = last_id.to_i
-
items = client.global_backlog last_id.to_i
items.map! do |id, channel, data|
MessageBus::Message.new id, id, channel, data
end