Sha256: f86008991c89646710c3d362898b64c18fcae05a7edd5b313a68f149e8d3da18

Contents?: true

Size: 922 Bytes

Versions: 1

Compression:

Stored size: 922 Bytes

Contents

# frozen_string_literal: true

# :nodoc:
module Jstreams
  ##
  # Publishes messages to the given stream.
  class Publisher
    ##
    # @param [ConnectionPool] redis_pool Redis connection pool
    # @param [Serializer] serializer Serializer
    # @param [TaggedLogging] logger Logger
    def initialize(redis_pool:, serializer:, logger:)
      @redis_pool = redis_pool
      @serializer = serializer
      @logger = logger
    end

    ##
    # Publishes a message to the given stream
    #
    # @param [String] stream Destination stream name
    # @param [Hash] message Message payload
    def publish(stream, message)
      @logger.tagged('publisher') do
        @redis_pool.with do |redis|
          redis.xadd(stream, payload: @serializer.serialize(message, stream))
        end
        @logger.debug { "published to stream #{stream}: #{message.inspect}" }
      end
    end
  end

  private_constant :Publisher
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
jstreams-0.1.0.alpha lib/jstreams/publisher.rb