Sha256: 78a7372b7b0cfb724346102966341fcac1d4344fce7d6293505c5df81d6a8684

Contents?: true

Size: 1.77 KB

Versions: 14

Compression:

Stored size: 1.77 KB

Contents

# frozen_string_literal: true

module WaterDrop
  class Producer
    # Component for asynchronous producer operations
    module Async
      # Produces a message to Kafka and does not wait for results
      #
      # @param message [Hash] hash that complies with the {Contracts::Message} contract
      #
      # @return [Rdkafka::Producer::DeliveryHandle] delivery handle that might return the report
      #
      # @raise [Rdkafka::RdkafkaError] When adding the message to rdkafka's queue failed
      # @raise [Errors::MessageInvalidError] When provided message details are invalid and the
      #   message could not be sent to Kafka
      def produce_async(message)
        ensure_active!
        validate_message!(message)

        @monitor.instrument(
          'message.produced_async',
          producer_id: id,
          message: message
        ) { client.produce(**message) }
      end

      # Produces many messages to Kafka and does not wait for them to be delivered
      #
      # @param messages [Array<Hash>] array with messages that comply with the
      #   {Contracts::Message} contract
      #
      # @return [Array<Rdkafka::Producer::DeliveryHandle>] deliveries handles
      #
      # @raise [Rdkafka::RdkafkaError] When adding the messages to rdkafka's queue failed
      # @raise [Errors::MessageInvalidError] When any of the provided messages details are invalid
      #   and the message could not be sent to Kafka
      def produce_many_async(messages)
        ensure_active!
        messages.each { |message| validate_message!(message) }

        @monitor.instrument(
          'messages.produced_async',
          producer_id: id,
          messages: messages
        ) do
          messages.map { |message| client.produce(**message) }
        end
      end
    end
  end
end

Version data entries

14 entries across 14 versions & 1 rubygems

Version Path
waterdrop-2.4.6 lib/waterdrop/producer/async.rb
waterdrop-2.4.5 lib/waterdrop/producer/async.rb
waterdrop-2.4.4 lib/waterdrop/producer/async.rb
waterdrop-2.4.3 lib/waterdrop/producer/async.rb
waterdrop-2.4.2 lib/waterdrop/producer/async.rb
waterdrop-2.4.1 lib/waterdrop/producer/async.rb
waterdrop-2.4.0 lib/waterdrop/producer/async.rb
waterdrop-2.3.3 lib/waterdrop/producer/async.rb
waterdrop-2.3.2 lib/waterdrop/producer/async.rb
waterdrop-2.3.1 lib/waterdrop/producer/async.rb
waterdrop-2.3.0 lib/waterdrop/producer/async.rb
waterdrop-2.2.0 lib/waterdrop/producer/async.rb
waterdrop-2.1.0 lib/water_drop/producer/async.rb
waterdrop-2.0.7 lib/water_drop/producer/async.rb