Sha256: 4a750ab9fe84ac33387ffa93e1f5e5f8897a8bbad7fadc373c9e36b31bcc4bd0

Contents?: true

Size: 1.77 KB

Versions: 8

Compression:

Stored size: 1.77 KB

Contents

# frozen_string_literal: true

module WaterDrop
  class Producer
    # Component for asynchronous producer operations
    module Async
      # Produces a message to Kafka and does not wait for results
      #
      # @param message [Hash] hash that complies with the {Contracts::Message} contract
      #
      # @return [Rdkafka::Producer::DeliveryHandle] delivery handle that might return the report
      #
      # @raise [Rdkafka::RdkafkaError] When adding the message to rdkafka's queue failed
      # @raise [Errors::MessageInvalidError] When provided message details are invalid and the
      #   message could not be sent to Kafka
      def produce_async(message)
        ensure_active!
        validate_message!(message)

        @monitor.instrument(
          'message.produced_async',
          producer: self,
          message: message
        ) { client.produce(**message) }
      end

      # Produces many messages to Kafka and does not wait for them to be delivered
      #
      # @param messages [Array<Hash>] array with messages that comply with the
      #   {Contracts::Message} contract
      #
      # @return [Array<Rdkafka::Producer::DeliveryHandle>] deliveries handles
      #
      # @raise [Rdkafka::RdkafkaError] When adding the messages to rdkafka's queue failed
      # @raise [Errors::MessageInvalidError] When any of the provided messages details are invalid
      #   and the message could not be sent to Kafka
      def produce_many_async(messages)
        ensure_active!
        messages.each { |message| validate_message!(message) }

        @monitor.instrument(
          'messages.produced_async',
          producer: self,
          messages: messages
        ) do
          messages.map { |message| client.produce(**message) }
        end
      end
    end
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
waterdrop-2.0.6 lib/water_drop/producer/async.rb
waterdrop-2.0.5 lib/water_drop/producer/async.rb
waterdrop-2.0.4 lib/water_drop/producer/async.rb
waterdrop-2.0.3 lib/water_drop/producer/async.rb
waterdrop-2.0.2 lib/water_drop/producer/async.rb
waterdrop-2.0.1 lib/water_drop/producer/async.rb
waterdrop-2.0.0 lib/water_drop/producer/async.rb
waterdrop-2.0.0.rc1 lib/water_drop/producer/async.rb