Sha256: 1d0e93cf37946f7c903b1972d7776719b830f670e7b11544504ff39696e40f85

Contents?: true

Size: 1.96 KB

Versions: 25

Compression:

Stored size: 1.96 KB

Contents

# frozen_string_literal: true

module PgEventstore
  module Commands
    # @!visibility private
    class RegularStreamReadPaginated < AbstractCommand
      # @see PgEventstore::Commands::Read for docs
      def call(stream, options: {})
        revision = calc_initial_revision(stream, options)
        Enumerator.new do |yielder|
          loop do
            events = read_cmd.call(stream, options: options.merge(from_revision: revision))
            yielder << events if events.any?
            raise StopIteration if end_reached?(events, options[:max_count])

            revision = calc_next_revision(events, revision, options[:direction])
            raise StopIteration if revision.negative?
          end
        end
      end

      private

      # @param stream [PgEventstore::Stream]
      # @param options [Hash]
      # @return [Integer]
      def calc_initial_revision(stream, options)
        return options[:from_revision] if options[:from_revision]
        return 0 if forwards?(options[:direction])

        read_cmd.call(stream, options: options.merge(max_count: 1)).first.stream_revision
      end

      # @param events [Array<PgEventstore::Event>]
      # @param max_count [Integer]
      # @return [Boolean]
      def end_reached?(events, max_count)
        events.size < max_count
      end

      # @param events [Array<PgEventstore::Event>]
      # @param revision [Integer]
      # @param direction [String, Symbol, nil]
      # @return [Integer]
      def calc_next_revision(events, revision, direction)
        return revision + events.size if forwards?(direction)

        revision - events.size
      end

      # @param direction [String, Symbol, nil]
      # @return [Boolean]
      def forwards?(direction)
        QueryBuilders::EventsFiltering::SQL_DIRECTIONS[direction] == QueryBuilders::EventsFiltering::SQL_DIRECTIONS[:asc]
      end

      # @return [PgEventstore::Commands::Read]
      def read_cmd
        @read_cmd ||= Read.new(queries)
      end
    end
  end
end

Version data entries

25 entries across 25 versions & 1 rubygems

Version Path
pg_eventstore-1.3.3 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.3.2 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.3.1 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.3.0 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.2.0 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.1.5 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.1.4 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.1.3 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.1.2 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.1.1 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.1.0 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.0.4 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.0.0.rc2 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-1.0.0.rc1 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-0.10.2 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-0.10.1 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-0.9.0 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-0.8.0 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-0.7.2 lib/pg_eventstore/commands/regular_stream_read_paginated.rb
pg_eventstore-0.7.1 lib/pg_eventstore/commands/regular_stream_read_paginated.rb