Sha256: cb79391a66204ff0a5467ea566e82183a6113436275e0be59201ef5c36fced6e

Contents?: true

Size: 1.32 KB

Versions: 2

Compression:

Stored size: 1.32 KB

Contents

module Turbo::Replay
  module Message
    extend self

    def get_current_sequence_number(broadcasting:)
      Turbo::Replay.configuration.repo.get_current_sequence_number(broadcasting: broadcasting)
    end

    def get_after_sequence_number(broadcasting:, sequence_number:)
      return [] if sequence_number.nil?

      messages =
        Turbo::Replay.configuration.repo
          .get_all_messages(broadcasting: broadcasting)
          .sort_by(&BySequenceNumber)

      return :unrecoverable if IsUnrecoverable.call(sequence_number, messages)

      messages.filter(&AfterSequenceNumber[sequence_number])
    end

    def insert(broadcasting:, content:)
      Turbo::Replay.configuration.repo.insert_message(
        broadcasting: broadcasting,
        content: content,
        retention: Turbo::Replay.configuration.retention
      )
    end

    private

    BySequenceNumber =
      ->(content_with_sequence_number) {
        content_with_sequence_number[:sequence_number]
      }

    IsUnrecoverable =
      ->(sequence_number, messages) {
        return false if messages.empty?

        sequence_number < messages.first[:sequence_number] - 1
      }

    AfterSequenceNumber =
      ->(sequence_number, content_with_sequence_number) {
        content_with_sequence_number[:sequence_number] > sequence_number
      }.curry
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
turbo-replay-0.1.2 lib/turbo/replay/message.rb
turbo-replay-0.1.1 lib/turbo/replay/message.rb