Sha256: 9842ca33ee446159564d4f9c4fb4299305f87fa537d4d0ffda2c9f06d015e917

Contents?: true

Size: 1.96 KB

Versions: 7

Compression:

Stored size: 1.96 KB

Contents

require 'java'
require 'jruby-kafka/namespace'
require 'concurrent'
require 'jruby-kafka/utility'

class Kafka::KafkaConsumer < Java::org.apache.kafka.clients.consumer.KafkaConsumer

  REQUIRED = [
    :bootstrap_servers, :key_deserializer, :value_deserializer
  ]

  # Create a Kafka high-level consumer.
  #
  # @param [Hash] config the consumer configuration.
  #
  # @option config [String]  :bootstrap_servers A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). Required.
  # @option config [String]  :key_deserializer Deserializer class for key that implements the Deserializer interface. Required.
  # @option config [String]  :value_deserializer Deserializer class for value that implements the Deserializer interface. Required.
  # @option config [Array]   :topics The topic to consume from. Required.
  #
  #
  # For other configuration properties and their default values see 
  # http://kafka.apache.org/documentation.html#newconsumerconfigs and
  # https://kafka.apache.org/090/javadoc/org/apache/kafka/clients/consumer/ConsumerConfig.html.
  #
  def initialize(config={})
    Kafka::Utility.validate_arguments REQUIRED, config
    @properties      =  config.clone
    @stop_called     =  Concurrent::AtomicBoolean.new(false)
    super Kafka::Utility.java_properties @properties
  end

  attr_reader :properties

  def stop
    @stop_called.make_true
    self.wakeup
  end

  # stop? should never be overriden
  def stop?
    @stop_called.value
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
jruby-kafka-4.2.0-java lib/jruby-kafka/kafka-consumer.rb
jruby-kafka-4.1.1-java lib/jruby-kafka/kafka-consumer.rb
jruby-kafka-4.0.0.ci.1.g0cd872b-java lib/jruby-kafka/kafka-consumer.rb
jruby-kafka-4.0.0-java lib/jruby-kafka/kafka-consumer.rb
jruby-kafka-3.6.0-java lib/jruby-kafka/kafka-consumer.rb
jruby-kafka-3.5.0-java lib/jruby-kafka/kafka-consumer.rb
jruby-kafka-3.4-java lib/jruby-kafka/kafka-consumer.rb