lib/message_bus/backends/postgres.rb in message_bus-2.0.0.beta.6 vs lib/message_bus/backends/postgres.rb in message_bus-2.0.0.beta.7

- old
+ new

@@ -50,15 +50,15 @@ hold{|conn| exec_prepared(conn, 'expire', [max_backlog_age])} nil end def backlog(channel, backlog_id) - hold{|conn| exec_prepared(conn, 'channel_backlog', [channel, backlog_id]){|r| r.values.each{|a| a[0] = a[0].to_i}}} + hold{|conn| exec_prepared(conn, 'channel_backlog', [channel, backlog_id]){|r| r.values.each{|a| a[0] = a[0].to_i}}} || [] end def global_backlog(backlog_id) - hold{|conn| exec_prepared(conn, 'global_backlog', [backlog_id]){|r| r.values.each{|a| a[0] = a[0].to_i}}} + hold{|conn| exec_prepared(conn, 'global_backlog', [backlog_id]){|r| r.values.each{|a| a[0] = a[0].to_i}}} || [] end def get_value(channel, id) hold{|conn| exec_prepared(conn, 'get_message', [channel, id]){|r| r.getvalue(0,0)}} end @@ -155,11 +155,11 @@ begin conn = sync{@available.shift} || new_pg_connection sync{@allocated[Thread.current] = conn} yield conn - rescue PG::ConnectionBad => e + rescue PG::ConnectionBad, PG::UnableToSend => e # don't add this connection back to the pool ensure sync{@allocated.delete(Thread.current)} if Process.pid != current_pid sync{INHERITED_CONNECTIONS << conn} @@ -277,11 +277,9 @@ MessageBus::Message.new id, id, channel, data end end def global_backlog(last_id = nil) - last_id = last_id.to_i - items = client.global_backlog last_id.to_i items.map! do |id, channel, data| MessageBus::Message.new id, id, channel, data end