Sha256: 8b61057694e03454e59ad6c08de93b2616fc00a6fb01c3e3996eff374d5a52ce

Contents?: true

Size: 1.77 KB

Versions: 2

Compression:

Stored size: 1.77 KB

Contents

module ROM
  module EventStore
    class Dataset
      attr_reader :category

      def initialize(category, connection, options = {})
        @category = category
        @connection = connection
        @options = options
      end

      def select(aggregate)
        __new__(aggregate: aggregate)
      end

      def from(id)
        __new__(from: id)
      end

      def limit(limit)
        __new__(limit: limit)
      end

      def stream
        aggregate = @options[:aggregate]
        aggregate ? "#{category}-#{aggregate}" : "$ce-#{category}"
      end

      def events
        @connection.read(stream, @options).sync
      end

      def append(events)
        @connection.append(stream, events).sync
        events
      end

      def subscribe
        subscription = @connection.subscription(stream, @options)
        subscription.on_event { |event| yield(dehydrate(event)) }
        subscription.start
      end

      def each
        with_events { |event| yield(event) }
      end

      private

      def __new__(new_opts = {})
        self.class.new(@category, @connection, @options.merge(new_opts))
      end

      def option(option, default)
        @options.fetch(option, default)
      end

      def with_events
        events.each { |event| yield(dehydrate(event)) }
      end

      def dehydrate(wrapper)
        event = wrapper.event
        category, aggregate = event.event_stream_id.split('-', 2)

        {
          id: Estore::Package.parse_uuid(event.event_id),
          type: event.event_type,
          data: event.data,
          category: category,
          aggregate: aggregate,
          number: event.event_number,
          position: wrapper.original_event_number,
          created_at: Time.at(event.created_epoch / 1000)
        }
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rom-event_store-0.0.7 lib/rom/event_store/dataset.rb
rom-event_store-0.0.6 lib/rom/event_store/dataset.rb