lib/deimos/producer.rb in deimos-ruby-1.22.3 vs lib/deimos/producer.rb in deimos-ruby-1.22.4

- old
+ new

@@ -93,24 +93,26 @@ end # Publish the payload to the topic. # @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key. # @param topic [String] if specifying the topic + # @param headers [Hash] if specifying headers # @return [void] - def publish(payload, topic: self.topic) - publish_list([payload], topic: topic) + def publish(payload, topic: self.topic, headers: nil) + publish_list([payload], topic: topic, headers: headers) end # Publish a list of messages. # @param payloads [Array<Hash, SchemaClass::Record>] with optional payload_key hash key. # @param sync [Boolean] if given, override the default setting of # whether to publish synchronously. # @param force_send [Boolean] if true, ignore the configured backend # and send immediately to Kafka. # @param topic [String] if specifying the topic + # @param headers [Hash] if specifying headers # @return [void] - def publish_list(payloads, sync: nil, force_send: false, topic: self.topic) + def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil) return if Deimos.config.kafka.seed_brokers.blank? || Deimos.config.producers.disabled || Deimos.producers_disabled?(self) raise 'Topic not specified. Please specify the topic.' if topic.blank? @@ -120,10 +122,10 @@ 'encode_messages', producer: self, topic: topic, payloads: payloads ) do - messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self) } + messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) } messages.each { |m| _process_message(m, topic) } messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend_class, batch) end end