Sha256: b46c461a77011589eeb5bf4699e37cd897994977867e573792c8a37dc9691df2
Contents?: true
Size: 1.81 KB
Versions: 1
Compression:
Stored size: 1.81 KB
Contents
require 'jruby-kafka/namespace' require 'jruby-kafka/utility' # noinspection JRubyStringImportInspection class Kafka::KafkaProducer < Java::org.apache.kafka.clients.producer.KafkaProducer java_import 'org.apache.kafka.clients.producer.ProducerRecord' java_import 'org.apache.kafka.clients.producer.Callback' class RubyCallback include Callback def initialize(cb) @cb = cb end def onCompletion(metadata, exception) @cb.call(metadata, exception) end end attr_reader :properties # Create a Kafka producer. # # @param [Hash] config the producer configuration. # # @option config [String] :bootstrap_servers A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Required. # @option config [String] :key_serializer Serializer class for key that implements the Deserializer interface. Required. # @option config [String] :value_serializer Serializer class for value that implements the Deserializer interface. Required. # # For other configuration properties and their default values see # http://kafka.apache.org/documentation.html#producerconfigs and # http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html. # def initialize(opts = {}) @properties = opts.clone super Kafka::Utility.java_properties @properties end java_alias :send_method , :send, [ProducerRecord] java_alias :send_cb_method, :send, [ProducerRecord, Callback.java_class] # throws FailedToSendMessageException or if not connected, StandardError. def send_msg(topic, partition, key, value, &block) if block send_cb_method ProducerRecord.new(topic, partition, key, value), RubyCallback.new(block) else send_method ProducerRecord.new(topic, partition, key, value) end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
jruby-kafka-4.0.0-java | lib/jruby-kafka/kafka-producer.rb |