lib/deimos/producer.rb in deimos-ruby-1.22.3 vs lib/deimos/producer.rb in deimos-ruby-1.22.4
- old
+ new
@@ -93,24 +93,26 @@
end
# Publish the payload to the topic.
# @param payload [Hash, SchemaClass::Record] with an optional payload_key hash key.
# @param topic [String] if specifying the topic
+ # @param headers [Hash] if specifying headers
# @return [void]
- def publish(payload, topic: self.topic)
- publish_list([payload], topic: topic)
+ def publish(payload, topic: self.topic, headers: nil)
+ publish_list([payload], topic: topic, headers: headers)
end
# Publish a list of messages.
# @param payloads [Array<Hash, SchemaClass::Record>] with optional payload_key hash key.
# @param sync [Boolean] if given, override the default setting of
# whether to publish synchronously.
# @param force_send [Boolean] if true, ignore the configured backend
# and send immediately to Kafka.
# @param topic [String] if specifying the topic
+ # @param headers [Hash] if specifying headers
# @return [void]
- def publish_list(payloads, sync: nil, force_send: false, topic: self.topic)
+ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic, headers: nil)
return if Deimos.config.kafka.seed_brokers.blank? ||
Deimos.config.producers.disabled ||
Deimos.producers_disabled?(self)
raise 'Topic not specified. Please specify the topic.' if topic.blank?
@@ -120,10 +122,10 @@
'encode_messages',
producer: self,
topic: topic,
payloads: payloads
) do
- messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self) }
+ messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self, headers: headers) }
messages.each { |m| _process_message(m, topic) }
messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch|
self.produce_batch(backend_class, batch)
end
end