Sha256: 78edd031dd856673509cbe3e4e0d27e948c1e48e0b107f8f2b209b1cff01ea5a
Contents?: true
Size: 1.49 KB
Versions: 1
Compression:
Stored size: 1.49 KB
Contents
require_relative './page' module EsReadModel class Stream def Stream.open(name, connection, listener) Stream.new("/streams/#{name}", connection, listener) end def initialize(head_uri, connection, listener) @connection = connection @listener = listener @current_etag = nil @listener.call({ level: 'info', tag: 'connecting', msg: "Connecting to #{head_uri} on #{connection}" }) fetch_first_page(head_uri) end def wait_for_new_events while @current_page.empty? sleep 1 fetch(@current_uri) end end def each_event(&blk) while !@current_page.empty? @current_page.each_event(&blk) fetch(@current_page.newer_events_uri) if @current_page.newer_events_uri end end private def fetch_first_page(uri) back_off = 1 loop do begin fetch(uri) last = @current_page.first_event_uri fetch(last) if last return rescue Exception => ex @listener.call({ level: 'error', tag: 'connection.error', msg: "#{ex.class}: #{ex.message}. Retry in #{back_off}s." }) sleep back_off back_off *= 2 end end end def fetch(uri) response = @connection.get(uri, @current_etag) @current_page = Page.new(response.body) @current_uri = uri @current_etag = response.headers['etag'] end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
es-readmodel-0.0.1 | lib/es_readmodel/stream.rb |