Sha256: 4a750ab9fe84ac33387ffa93e1f5e5f8897a8bbad7fadc373c9e36b31bcc4bd0
Contents?: true
Size: 1.77 KB
Versions: 8
Compression:
Stored size: 1.77 KB
Contents
# frozen_string_literal: true module WaterDrop class Producer # Component for asynchronous producer operations module Async # Produces a message to Kafka and does not wait for results # # @param message [Hash] hash that complies with the {Contracts::Message} contract # # @return [Rdkafka::Producer::DeliveryHandle] delivery handle that might return the report # # @raise [Rdkafka::RdkafkaError] When adding the message to rdkafka's queue failed # @raise [Errors::MessageInvalidError] When provided message details are invalid and the # message could not be sent to Kafka def produce_async(message) ensure_active! validate_message!(message) @monitor.instrument( 'message.produced_async', producer: self, message: message ) { client.produce(**message) } end # Produces many messages to Kafka and does not wait for them to be delivered # # @param messages [Array<Hash>] array with messages that comply with the # {Contracts::Message} contract # # @return [Array<Rdkafka::Producer::DeliveryHandle>] deliveries handles # # @raise [Rdkafka::RdkafkaError] When adding the messages to rdkafka's queue failed # @raise [Errors::MessageInvalidError] When any of the provided messages details are invalid # and the message could not be sent to Kafka def produce_many_async(messages) ensure_active! messages.each { |message| validate_message!(message) } @monitor.instrument( 'messages.produced_async', producer: self, messages: messages ) do messages.map { |message| client.produce(**message) } end end end end end
Version data entries
8 entries across 8 versions & 1 rubygems