lib/deimos/producer.rb in deimos-ruby-1.8.1.pre.beta9 vs lib/deimos/producer.rb in deimos-ruby-1.8.2.pre.beta1

- old
+ new

@@ -85,21 +85,23 @@ nil end # Publish the payload to the topic. # @param payload [Hash] with an optional payload_key hash key. - def publish(payload) - publish_list([payload]) + # @param topic [String] if specifying the topic + def publish(payload, topic: self.topic) + publish_list([payload], topic: topic) end # Publish a list of messages. # @param payloads [Hash|Array<Hash>] with optional payload_key hash key. # @param sync [Boolean] if given, override the default setting of # whether to publish synchronously. # @param force_send [Boolean] if true, ignore the configured backend # and send immediately to Kafka. - def publish_list(payloads, sync: nil, force_send: false) + # @param topic [String] if specifying the topic + def publish_list(payloads, sync: nil, force_send: false, topic: self.topic) return if Deimos.config.kafka.seed_brokers.blank? || Deimos.config.producers.disabled || Deimos.producers_disabled?(self) backend_class = determine_backend_class(sync, force_send) @@ -108,11 +110,11 @@ producer: self, topic: topic, payloads: payloads ) do messages = Array(payloads).map { |p| Deimos::Message.new(p, self) } - messages.each(&method(:_process_message)) + messages.each { |m| _process_message(m, topic) } messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch| self.produce_batch(backend_class, batch) end end end @@ -161,11 +163,12 @@ end private # @param message [Message] - def _process_message(message) + # @param topic [String] + def _process_message(message, topic) # this violates the Law of Demeter but it has to happen in a very # specific order and requires a bunch of methods on the producer # to work correctly. message.add_fields(encoder.schema_fields.map(&:name)) message.partition_key = self.partition_key(message.payload) @@ -173,10 +176,10 @@ # need to do this before _coerce_fields because that might result # in an empty payload which is an *error* whereas this is intended. message.payload = nil if message.payload.blank? message.coerce_fields(encoder) message.encoded_key = _encode_key(message.key) - message.topic = self.topic + message.topic = topic message.encoded_payload = if message.payload.nil? nil else encoder.encode(message.payload, topic: "#{config[:topic]}-value")