# frozen_string_literal: true module WaterDrop class Producer # Component for synchronous producer operations module Sync # Produces a message to Kafka and waits for it to be delivered # # @param message [Hash] hash that complies with the {Contracts::Message} contract # # @return [Rdkafka::Producer::DeliveryReport] delivery report # # @raise [Rdkafka::RdkafkaError] When adding the message to rdkafka's queue failed # @raise [Rdkafka::Producer::WaitTimeoutError] When the timeout has been reached and the # handle is still pending # @raise [Errors::MessageInvalidError] When provided message details are invalid and the # message could not be sent to Kafka def produce_sync(message) message = middleware.run(message) validate_message!(message) @monitor.instrument( 'message.produced_sync', producer_id: id, message: message ) do wait(produce(message)) end rescue *SUPPORTED_FLOW_ERRORS => e # We use this syntax here because we want to preserve the original `#cause` when we # instrument the error and there is no way to manually assign `#cause` value begin raise Errors::ProduceError, e.inspect rescue Errors::ProduceError => ex @monitor.instrument( 'error.occurred', producer_id: id, message: message, error: ex, type: 'message.produce_sync' ) raise ex end end # Produces many messages to Kafka and waits for them to be delivered # # @param messages [Array] array with messages that comply with the # {Contracts::Message} contract # # @return [Array] delivery reports # # @raise [Rdkafka::RdkafkaError] When adding the messages to rdkafka's queue failed # @raise [Rdkafka::Producer::WaitTimeoutError] When the timeout has been reached and some # handles are still pending # @raise [Errors::MessageInvalidError] When any of the provided messages details are invalid # and the message could not be sent to Kafka def produce_many_sync(messages) messages = middleware.run_many(messages) messages.each { |message| validate_message!(message) } dispatched = [] @monitor.instrument('messages.produced_sync', producer_id: id, messages: messages) do with_transaction_if_transactional do messages.each do |message| dispatched << produce(message) end end dispatched.map! do |handler| wait(handler) end dispatched end rescue *SUPPORTED_FLOW_ERRORS => e re_raised = Errors::ProduceManyError.new(dispatched, e.inspect) @monitor.instrument( 'error.occurred', producer_id: id, messages: messages, dispatched: dispatched, error: re_raised, type: 'messages.produce_many_sync' ) raise re_raised end end end end