lib/deimos/producer.rb in deimos-ruby-1.11.2 vs lib/deimos/producer.rb in deimos-ruby-1.12.0

- old
+ new

@@ -55,10 +55,11 @@ include SharedConfig MAX_BATCH_SIZE = 500 class << self + # @return [Hash] def config @config ||= { encode_key: true, namespace: Deimos.config.producers.schema_namespace @@ -84,18 +85,18 @@ def partition_key(_payload) nil end # Publish the payload to the topic. - # @param payload [Hash] with an optional payload_key hash key. + # @param payload [Hash|SchemaClass::Record] with an optional payload_key hash key. # @param topic [String] if specifying the topic def publish(payload, topic: self.topic) publish_list([payload], topic: topic) end # Publish a list of messages. - # @param payloads [Hash|Array<Hash>] with optional payload_key hash key. + # @param payloads [Array<Hash|SchemaClass::Record>] with optional payload_key hash key. # @param sync [Boolean] if given, override the default setting of # whether to publish synchronously. # @param force_send [Boolean] if true, ignore the configured backend # and send immediately to Kafka. # @param topic [String] if specifying the topic @@ -111,10 +112,10 @@ 'encode_messages', producer: self, topic: topic, payloads: payloads ) do - messages = Array(payloads).map { |p| Deimos::Message.new(p, self) } + messages = Array(payloads).map { |p| Deimos::Message.new(p.to_h, self) } messages.each { |m| _process_message(m, topic) } messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend_class, batch) end end