Sha256: 6011cd6241eab3a0ba46c73dffdfe8225aa05ad811ebdfbe026c3628528d502a
Contents?: true
Size: 1.02 KB
Versions: 3
Compression:
Stored size: 1.02 KB
Contents
# frozen_string_literal: true require 'forwardable' module KafkaCommand class Broker extend Forwardable attr_reader :broker def_delegators :@broker, :port, :host, :node_id, :fetch_metadata, :fetch_offsets alias_method :kafka_broker_id, :node_id alias_method :hostname, :host def initialize(broker) @broker = broker end def host_with_port "#{host}:#{port}" end def as_json(*) { id: node_id, host: host_with_port } end # needs to be the group coordinator to work def offsets_for(group, topic) offsets = @broker.fetch_offsets( group_id: group.group_id, topics: { topic.name => topic.partitions.map(&:partition_id) } ).topics[topic.name] offsets.keys.each { |partition_id| offsets[partition_id] = offsets[partition_id].offset } offsets end def connected? @broker.api_versions # simple request to check connections true rescue Kafka::ConnectionError false end end end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
kafka_command-0.0.3 | app/models/kafka_command/broker.rb |
kafka_command-0.0.2 | app/models/kafka_command/broker.rb |
kafka_command-0.0.1 | app/models/kafka_command/broker.rb |