Sha256: 8cd3e33b06e3e044fc3eeef92578d48c892586656a4c61c509a3e7b6b7ba2ea6

Contents?: true

Size: 1.28 KB

Versions: 1

Compression:

Stored size: 1.28 KB

Contents

module Turbo::Replay
  module Message
    extend self

    def get_current_sequence_number(broadcasting:)
      Turbo::Replay.configuration.repo.get_current_sequence_number(broadcasting: broadcasting)
    end

    def get_after_sequence_number(broadcasting:, sequence_number:)
      messages =
        Turbo::Replay.configuration.repo
          .get_all_messages(broadcasting: broadcasting)
          .sort_by(&BySequenceNumber)

      return :unrecoverable if IsUnrecoverable.call(sequence_number, messages)

      messages.filter(&AfterSequenceNumber[sequence_number])
    end

    def insert(broadcasting:, content:)
      Turbo::Replay.configuration.repo.insert_message(
        broadcasting: broadcasting,
        content: content,
        retention: Turbo::Replay.configuration.retention
      )
    end

    private

    BySequenceNumber =
      ->(content_with_sequence_number) {
        content_with_sequence_number[:sequence_number]
      }

    IsUnrecoverable =
      ->(sequence_number, messages) {
        return false if messages.empty?

        sequence_number < messages.first[:sequence_number] - 1
      }

    AfterSequenceNumber =
      ->(sequence_number, content_with_sequence_number) {
        content_with_sequence_number[:sequence_number] > sequence_number
      }.curry
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
turbo-replay-0.1.0 lib/turbo/replay/message.rb