Sha256: e3fcc5409926d29c72eca1880de795d2ad82009ecf9dcdf785daee61b561624b

Contents?: true

Size: 1.55 KB

Versions: 3

Compression:

Stored size: 1.55 KB

Contents

module Turbo::Replay
  module Repo
    class Memory < Base
      def initialize
        @mutex = Mutex.new
        @counters = {}
        @messages = {}
        @ttl = {}
      end

      def get_current_sequence_number(broadcasting:)
        synchronize(broadcasting) do
          @counters.fetch(broadcasting, 0)
        end
      end

      def get_all_messages(broadcasting:)
        synchronize(broadcasting) do
          @messages.fetch(broadcasting, [])
        end
      end

      def insert_message(broadcasting:, content:, retention:)
        synchronize(broadcasting) do
          @ttl[broadcasting] =
            Time.current + retention.ttl

          next_sequence_number =
            (@counters[broadcasting] = @counters.fetch(broadcasting, 0) + 1)

          content_with_sequence_number =
            {sequence_number: next_sequence_number, content: content}

          (@messages[broadcasting] ||= []).tap do |messages|
            messages << content_with_sequence_number
            messages.shift if messages.length > retention.size
          end

          content_with_sequence_number
        end
      end

      private

      def synchronize(broadcasting)
        @mutex.synchronize do
          delete_cached_data_if_expired(broadcasting)

          yield
        end
      end

      def delete_cached_data_if_expired(broadcasting)
        return if @ttl[broadcasting].nil? || @ttl[broadcasting].after?(Time.current)

        @ttl.delete(broadcasting)
        @counters.delete(broadcasting)
        @messages.delete(broadcasting)
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
turbo-replay-0.1.2 lib/turbo/replay/repo/memory.rb
turbo-replay-0.1.1 lib/turbo/replay/repo/memory.rb
turbo-replay-0.1.0 lib/turbo/replay/repo/memory.rb