Sha256: f86008991c89646710c3d362898b64c18fcae05a7edd5b313a68f149e8d3da18
Contents?: true
Size: 922 Bytes
Versions: 1
Compression:
Stored size: 922 Bytes
Contents
# frozen_string_literal: true # :nodoc: module Jstreams ## # Publishes messages to the given stream. class Publisher ## # @param [ConnectionPool] redis_pool Redis connection pool # @param [Serializer] serializer Serializer # @param [TaggedLogging] logger Logger def initialize(redis_pool:, serializer:, logger:) @redis_pool = redis_pool @serializer = serializer @logger = logger end ## # Publishes a message to the given stream # # @param [String] stream Destination stream name # @param [Hash] message Message payload def publish(stream, message) @logger.tagged('publisher') do @redis_pool.with do |redis| redis.xadd(stream, payload: @serializer.serialize(message, stream)) end @logger.debug { "published to stream #{stream}: #{message.inspect}" } end end end private_constant :Publisher end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
jstreams-0.1.0.alpha | lib/jstreams/publisher.rb |