Sha256: 199602b40041955616dd2150ca1dc089bfdf83e528b638a2a96ba5ba5957a47b
Contents?: true
Size: 1.26 KB
Versions: 14
Compression:
Stored size: 1.26 KB
Contents
require 'time' module Eventus class Stream attr_reader :id, :committed_events, :uncommitted_events def initialize(id, persistence, dispatcher) @id = id @persistence = persistence @committed_events = [] @uncommitted_events = [] @dispatcher = dispatcher load_events @persistence.load(id) end def add(name, body={}) @uncommitted_events << {'name' => name, 'body' => body} end def commit time = Time.now.utc.iso8601 @uncommitted_events.each.with_index(version) do |e, i| e['time'] = time e['sid'] = @id e['sequence'] = i end Eventus::logger.debug "Committing #{@uncommitted_events.length} events to #{@id}" return if @uncommitted_events.empty? payload = @persistence.commit @uncommitted_events load_events @uncommitted_events @dispatcher.dispatch(payload) if @dispatcher @uncommitted_events.clear rescue ConcurrencyError => e Eventus.logger.info "ConcurrencyError, loading new events for: #{id}" load_events @persistence.load(id, version) raise e end def version @committed_events.length end private def load_events(events) events.each { |e| @committed_events << e } end end end
Version data entries
14 entries across 14 versions & 1 rubygems