Sha256: 5ee714676327e05f1cdefc79244adcffbd0773d46aad95e230739190db2c7ee0

Contents?: true

Size: 1.92 KB

Versions: 25

Compression:

Stored size: 1.92 KB

Contents

# frozen_string_literal: true

module PgEventstore
  module Commands
    # @!visibility private
    class SystemStreamReadPaginated < AbstractCommand
      # @see PgEventstore::Commands::Read for docs
      def call(stream, options: {})
        position = calc_initial_position(stream, options)
        Enumerator.new do |yielder|
          loop do
            events = read_cmd.call(stream, options: options.merge(from_position: position))
            yielder << events if events.any?
            raise StopIteration if end_reached?(events, options[:max_count])

            position = calc_next_position(events, options[:direction])
            raise StopIteration if position <= 0
          end
        end
      end

      private

      # @param stream [PgEventstore::Stream]
      # @param options [Hash]
      # @return [Integer]
      def calc_initial_position(stream, options)
        return options[:from_position] if options[:from_position]
        return 1 if forwards?(options[:direction])

        read_cmd.call(stream, options: options.merge(max_count: 1)).first.global_position
      end

      # @param events [Array<PgEventstore::Event>]
      # @param max_count [Integer]
      # @return [Boolean]
      def end_reached?(events, max_count)
        events.size < max_count
      end

      # @param events [Array<PgEventstore::Event>]
      # @param direction [String, Symbol, nil]
      # @return [Integer]
      def calc_next_position(events, direction)
        return events.last.global_position + 1 if forwards?(direction)

        events.last.global_position - 1
      end

      # @param direction [String, Symbol, nil]
      # @return [Boolean]
      def forwards?(direction)
        QueryBuilders::EventsFiltering::SQL_DIRECTIONS[direction] == QueryBuilders::EventsFiltering::SQL_DIRECTIONS[:asc]
      end

      # @return [PgEventstore::Commands::Read]
      def read_cmd
        @read_cmd ||= Read.new(queries)
      end
    end
  end
end

Version data entries

25 entries across 25 versions & 1 rubygems

Version Path
pg_eventstore-1.3.3 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.3.2 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.3.1 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.3.0 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.2.0 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.1.5 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.1.4 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.1.3 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.1.2 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.1.1 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.1.0 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.0.4 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.0.0.rc2 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-1.0.0.rc1 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-0.10.2 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-0.10.1 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-0.9.0 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-0.8.0 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-0.7.2 lib/pg_eventstore/commands/system_stream_read_paginated.rb
pg_eventstore-0.7.1 lib/pg_eventstore/commands/system_stream_read_paginated.rb