lib/deimos/producer.rb in deimos-ruby-1.8.1.pre.beta9 vs lib/deimos/producer.rb in deimos-ruby-1.8.2.pre.beta1
- old
+ new
@@ -85,21 +85,23 @@
nil
end
# Publish the payload to the topic.
# @param payload [Hash] with an optional payload_key hash key.
- def publish(payload)
- publish_list([payload])
+ # @param topic [String] if specifying the topic
+ def publish(payload, topic: self.topic)
+ publish_list([payload], topic: topic)
end
# Publish a list of messages.
# @param payloads [Hash|Array<Hash>] with optional payload_key hash key.
# @param sync [Boolean] if given, override the default setting of
# whether to publish synchronously.
# @param force_send [Boolean] if true, ignore the configured backend
# and send immediately to Kafka.
- def publish_list(payloads, sync: nil, force_send: false)
+ # @param topic [String] if specifying the topic
+ def publish_list(payloads, sync: nil, force_send: false, topic: self.topic)
return if Deimos.config.kafka.seed_brokers.blank? ||
Deimos.config.producers.disabled ||
Deimos.producers_disabled?(self)
backend_class = determine_backend_class(sync, force_send)
@@ -108,11 +110,11 @@
producer: self,
topic: topic,
payloads: payloads
) do
messages = Array(payloads).map { |p| Deimos::Message.new(p, self) }
- messages.each(&method(:_process_message))
+ messages.each { |m| _process_message(m, topic) }
messages.in_groups_of(MAX_BATCH_SIZE, false) do |batch|
self.produce_batch(backend_class, batch)
end
end
end
@@ -161,11 +163,12 @@
end
private
# @param message [Message]
- def _process_message(message)
+ # @param topic [String]
+ def _process_message(message, topic)
# this violates the Law of Demeter but it has to happen in a very
# specific order and requires a bunch of methods on the producer
# to work correctly.
message.add_fields(encoder.schema_fields.map(&:name))
message.partition_key = self.partition_key(message.payload)
@@ -173,10 +176,10 @@
# need to do this before _coerce_fields because that might result
# in an empty payload which is an *error* whereas this is intended.
message.payload = nil if message.payload.blank?
message.coerce_fields(encoder)
message.encoded_key = _encode_key(message.key)
- message.topic = self.topic
+ message.topic = topic
message.encoded_payload = if message.payload.nil?
nil
else
encoder.encode(message.payload,
topic: "#{config[:topic]}-value")