Sha256: e7c5a4a94f92dee1b013968c74b6487aacb4651e67109b904e2ad4bff36c367d

Contents?: true

Size: 1.86 KB

Versions: 5

Compression:

Stored size: 1.86 KB

Contents

# frozen_string_literal: true

module WaterDrop
  class Producer
    # Component for asynchronous producer operations
    module Async
      # Produces a message to Kafka and does not wait for results
      #
      # @param message [Hash] hash that complies with the {Contracts::Message} contract
      #
      # @return [Rdkafka::Producer::DeliveryHandle] delivery handle that might return the report
      #
      # @raise [Rdkafka::RdkafkaError] When adding the message to rdkafka's queue failed
      # @raise [Errors::MessageInvalidError] When provided message details are invalid and the
      #   message could not be sent to Kafka
      def produce_async(message)
        ensure_active!

        message = middleware.run(message)
        validate_message!(message)

        @monitor.instrument(
          'message.produced_async',
          producer_id: id,
          message: message
        ) { client.produce(**message) }
      end

      # Produces many messages to Kafka and does not wait for them to be delivered
      #
      # @param messages [Array<Hash>] array with messages that comply with the
      #   {Contracts::Message} contract
      #
      # @return [Array<Rdkafka::Producer::DeliveryHandle>] deliveries handles
      #
      # @raise [Rdkafka::RdkafkaError] When adding the messages to rdkafka's queue failed
      # @raise [Errors::MessageInvalidError] When any of the provided messages details are invalid
      #   and the message could not be sent to Kafka
      def produce_many_async(messages)
        ensure_active!

        messages = middleware.run_many(messages)
        messages.each { |message| validate_message!(message) }

        @monitor.instrument(
          'messages.produced_async',
          producer_id: id,
          messages: messages
        ) do
          messages.map { |message| client.produce(**message) }
        end
      end
    end
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
waterdrop-2.4.11 lib/waterdrop/producer/async.rb
waterdrop-2.4.10 lib/waterdrop/producer/async.rb
waterdrop-2.4.9 lib/waterdrop/producer/async.rb
waterdrop-2.4.8 lib/waterdrop/producer/async.rb
waterdrop-2.4.7 lib/waterdrop/producer/async.rb