lib/karafka/web/ui/models/status.rb in karafka-web-0.6.3 vs lib/karafka/web/ui/models/status.rb in karafka-web-0.7.0
- old
+ new
@@ -23,10 +23,11 @@
def partial_namespace
case status
when :success then 'successes'
when :warning then 'warnings'
when :failure then 'failures'
+ when :halted then 'failures'
else
raise ::Karafka::Errors::UnsupportedCaseError, status
end
end
@@ -34,27 +35,45 @@
def to_s
status.to_s
end
end
- # Initializes the status object and tries to connect to Kafka
- def initialize
- connect
+ # Is karafka-web enabled in the `karafka.rb`
+ # Checks if the consumer group for web-ui is injected.
+ # It does **not** check if the group is active because this may depend on the
+ # configuration details, but for the Web-UI web app to work, the routing needs to be
+ # aware of the deserializer, etc
+ def enabled
+ enabled = ::Karafka::App.routes.map(&:name).include?(
+ ::Karafka::Web.config.processing.consumer_group
+ )
+
+ Step.new(
+ enabled ? :success : :failure,
+ nil
+ )
end
# @return [Status::Step] were we able to connect to Kafka or not and how fast.
# Some people try to work with Kafka over the internet with really high latency and this
# should be highlighted in the UI as often the connection just becomes unstable
def connection
- level = if @connection_time < 1_000
- :success
- elsif @connection_time < 1_000_000
- :warning
- else
- :failure
- end
+ if enabled.success?
+ # Do not connect more than once during the status object lifetime
+ @connection_time || connect
+ level = if @connection_time < 1_000
+ :success
+ elsif @connection_time < 1_000_000
+ :warning
+ else
+ :failure
+ end
+ else
+ level = :halted
+ end
+
Step.new(
level,
{ time: @connection_time }
)
end
@@ -79,10 +98,11 @@
def partitions
if topics.success?
status = :success
status = :failure if topics_details[topics_consumers_states][:partitions] != 1
status = :failure if topics_details[topics_consumers_reports][:partitions] != 1
+ status = :failure if topics_details[topics_consumers_metrics][:partitions] != 1
details = topics_details
else
status = :halted
details = {}
end
@@ -91,14 +111,35 @@
status,
details
)
end
- # @return [Status::Step] Is the initial state present in the setup or not
- def initial_state
+ # @return [Status::Step] do we have correct replication for given env
+ def replication
if partitions.success?
- @current_state ||= Models::State.current
+ status = :success
+ # low replication is not an error but just a warning and a potential problem
+ # in case of a crash, this is why we do not fail but warn only
+ status = :warning if topics_details.values.any? { |det| det[:replication] < 2 }
+ # Allow for non-production setups to use replication 1 as it is not that relevant
+ status = :success unless Karafka.env.production?
+ details = topics_details
+ else
+ status = :halted
+ details = {}
+ end
+
+ Step.new(
+ status,
+ details
+ )
+ end
+
+ # @return [Status::Step] Is the initial consumers state present in Kafka
+ def initial_consumers_state
+ if replication.success?
+ @current_state ||= Models::ConsumersState.current
status = @current_state ? :success : :failure
else
status = :halted
end
@@ -106,14 +147,29 @@
status,
nil
)
end
+ # @return [Status::Step] Is the initial consumers metrics record present in Kafka
+ def initial_consumers_metrics
+ if initial_consumers_state.success?
+ @current_metrics ||= Models::ConsumersMetrics.current
+ status = @current_metrics ? :success : :failure
+ else
+ status = :halted
+ end
+
+ Step.new(
+ status,
+ nil
+ )
+ end
+
# @return [Status::Step] Is there at least one active karafka server reporting to the
# Web UI
def live_reporting
- if initial_state.success?
+ if initial_consumers_metrics.success?
@processes ||= Models::Processes.active(@current_state)
status = @processes.empty? ? :failure : :success
else
status = :halted
end
@@ -126,11 +182,15 @@
# @return [Status::Step] is there a subscription to our reports topic that is being
# consumed actively.
def state_calculation
if live_reporting.success?
- @subscriptions ||= Models::Health.current(@current_state).values.flat_map(&:keys)
+ @subscriptions ||= Models::Health
+ .current(@current_state)
+ .values.map { |consumer_group| consumer_group[:topics] }
+ .flat_map(&:keys)
+
status = @subscriptions.include?(topics_consumers_reports) ? :success : :failure
else
status = :halted
end
@@ -138,15 +198,30 @@
status,
nil
)
end
+ # @return [Status::Step] Are we able to actually digest the consumers reports with the
+ # consumer that is consuming them.
+ def consumers_reports_schema_state
+ status = if state_calculation.success?
+ @current_state[:schema_state] == 'compatible' ? :success : :failure
+ else
+ :halted
+ end
+
+ Step.new(
+ status,
+ nil
+ )
+ end
+
# @return [Status::Step] is Pro enabled with all of its features.
# @note It's not an error not to have it but we want to warn, that some of the features
# may not work without Pro.
def pro_subscription
- status = if state_calculation.success?
+ status = if consumers_reports_schema_state.success?
::Karafka.pro? ? :success : :warning
else
:halted
end
@@ -166,39 +241,52 @@
# @return [String] consumers reports topic name
def topics_consumers_reports
::Karafka::Web.config.topics.consumers.reports.to_s
end
+ # @return [String] consumers metrics topic name
+ def topics_consumers_metrics
+ ::Karafka::Web.config.topics.consumers.metrics.to_s
+ end
+
# @return [String] errors topic name
def topics_errors
::Karafka::Web.config.topics.errors
end
# @return [Hash] hash with topics with which we work details (even if don't exist)
def topics_details
+ base = { present: false, partitions: 0, replication: 1 }
+
topics = {
- topics_consumers_states => { present: false, partitions: 0 },
- topics_consumers_reports => { present: false, partitions: 0 },
- topics_errors => { present: false, partitions: 0 }
+ topics_consumers_states => base.dup,
+ topics_consumers_reports => base.dup,
+ topics_consumers_metrics => base.dup,
+ topics_errors => base.dup
}
@cluster_info.topics.each do |topic|
name = topic[:topic_name]
next unless topics.key?(name)
- topics[name][:present] = true
- topics[name][:partitions] = topic[:partition_count]
+ topics[name].merge!(
+ present: true,
+ partitions: topic[:partition_count],
+ replication: topic[:partitions].map { |part| part[:replica_count] }.max
+ )
end
topics
end
# Tries connecting with the cluster and saves the cluster info and the connection time
# @note If fails, `connection_time` will be 1_000_000
def connect
started = Time.now.to_f
- @cluster_info = ::Karafka::Admin.cluster_info
+ # For status we always need uncached data, otherwise status could cache outdated
+ # info
+ @cluster_info = Models::ClusterInfo.fetch(cached: false)
@connection_time = (Time.now.to_f - started) * 1_000
rescue ::Rdkafka::RdkafkaError
@connection_time = 1_000_000
end
end