Sha256: ad2a7d07e6b5b1bd6c22471cea817c735704d60b3f056309383271b5ee9495b8

Contents?: true

Size: 1.47 KB

Versions: 5

Compression:

Stored size: 1.47 KB

Contents

require 'jruby-kafka/namespace'
require 'jruby-kafka/error'
require 'jruby-kafka/utility'

# noinspection JRubyStringImportInspection
class Kafka::KafkaProducer
  java_import 'org.apache.kafka.clients.producer.ProducerRecord'
  java_import 'org.apache.kafka.clients.producer.Callback'
  KAFKA_PRODUCER = Java::org.apache.kafka.clients.producer.KafkaProducer

  REQUIRED = [
    :bootstrap_servers, :key_serializer
  ]

  class RubyCallback
    include Callback

    def initialize(cb)
      @cb = cb
    end
    
    def onCompletion(metadata, exception)
      @cb.call(metadata, exception)
    end
  end

  attr_reader :producer, :send_method, :send_cb_method, :options

  def initialize(opts = {})
    Kafka::Utility.validate_arguments REQUIRED, opts
    @options = opts
    @send_method = @send_cb_method = proc { throw StandardError.new 'Producer is not connected' }
  end

  def connect
    @producer = KAFKA_PRODUCER.new(Kafka::Utility.java_properties @options)
    @send_method = producer.java_method :send, [ProducerRecord]
    @send_cb_method = producer.java_method :send, [ProducerRecord, Callback.java_class]
  end

  # throws FailedToSendMessageException or if not connected, StandardError.
  def send_msg(topic, partition, key, value, &block)
    if block
      send_cb_method.call(ProducerRecord.new(topic, partition, key, value), RubyCallback.new(block))
    else
      send_method.call(ProducerRecord.new(topic, partition, key, value))
    end
  end

  def close
    @producer.close
  end
end

Version data entries

5 entries across 5 versions & 1 rubygems

Version Path
jruby-kafka-2.2.2-java lib/jruby-kafka/kafka-producer.rb
jruby-kafka-3.6.0-java lib/jruby-kafka/kafka-producer.rb
jruby-kafka-3.5.0-java lib/jruby-kafka/kafka-producer.rb
jruby-kafka-3.4-java lib/jruby-kafka/kafka-producer.rb
jruby-kafka-3.3-java lib/jruby-kafka/kafka-producer.rb