Sha256: 6eca822f1c3f07ec19e1823abbd8dfc16980904f4706ee4eab8ab5bb03e4bd07

Contents?: true

Size: 1.22 KB

Versions: 3

Compression:

Stored size: 1.22 KB

Contents

require 'time'

module Eventus
  class Stream

    attr_reader :id, :committed_events, :uncommitted_events

    def initialize(id, persistence, dispatcher)
      @id = id
      @persistence = persistence
      @committed_events = []
      @uncommitted_events = []
      @dispatcher = dispatcher
      load_events @persistence.load(id)
    end

    def add(name, body={})
      @uncommitted_events << {'name' => name, 'body' => body}
    end

    def commit
      time = Time.now.utc.iso8601
      @uncommitted_events.each.with_index(version) do |e, i|
        e['time'] = time
        e['sid'] = @id
        e['sequence'] = i
      end
      Eventus::logger.debug "Committing #{@uncommitted_events.length} events to #{@id}"
      @persistence.commit @uncommitted_events
      load_events @uncommitted_events
      @dispatcher.dispatch @uncommitted_events if @dispatcher
      @uncommitted_events.clear
    rescue ConcurrencyError => e
      Eventus.logger.info "ConcurrencyError, loading new events for: #{id}"
      load_events @persistence.load(id, version)
      raise e
    end

    def version
      @committed_events.length
    end

    private

    def load_events(events)
      events.each { |e| @committed_events << e }
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
eventus-0.3.6 lib/eventus/stream.rb
eventus-0.3.5 lib/eventus/stream.rb
eventus-0.3.4 lib/eventus/stream.rb