lib/karafka/web/ui/models/status.rb in karafka-web-0.6.3 vs lib/karafka/web/ui/models/status.rb in karafka-web-0.7.0

- old
+ new

@@ -23,10 +23,11 @@ def partial_namespace case status when :success then 'successes' when :warning then 'warnings' when :failure then 'failures' + when :halted then 'failures' else raise ::Karafka::Errors::UnsupportedCaseError, status end end @@ -34,27 +35,45 @@ def to_s status.to_s end end - # Initializes the status object and tries to connect to Kafka - def initialize - connect + # Is karafka-web enabled in the `karafka.rb` + # Checks if the consumer group for web-ui is injected. + # It does **not** check if the group is active because this may depend on the + # configuration details, but for the Web-UI web app to work, the routing needs to be + # aware of the deserializer, etc + def enabled + enabled = ::Karafka::App.routes.map(&:name).include?( + ::Karafka::Web.config.processing.consumer_group + ) + + Step.new( + enabled ? :success : :failure, + nil + ) end # @return [Status::Step] were we able to connect to Kafka or not and how fast. # Some people try to work with Kafka over the internet with really high latency and this # should be highlighted in the UI as often the connection just becomes unstable def connection - level = if @connection_time < 1_000 - :success - elsif @connection_time < 1_000_000 - :warning - else - :failure - end + if enabled.success? + # Do not connect more than once during the status object lifetime + @connection_time || connect + level = if @connection_time < 1_000 + :success + elsif @connection_time < 1_000_000 + :warning + else + :failure + end + else + level = :halted + end + Step.new( level, { time: @connection_time } ) end @@ -79,10 +98,11 @@ def partitions if topics.success? status = :success status = :failure if topics_details[topics_consumers_states][:partitions] != 1 status = :failure if topics_details[topics_consumers_reports][:partitions] != 1 + status = :failure if topics_details[topics_consumers_metrics][:partitions] != 1 details = topics_details else status = :halted details = {} end @@ -91,14 +111,35 @@ status, details ) end - # @return [Status::Step] Is the initial state present in the setup or not - def initial_state + # @return [Status::Step] do we have correct replication for given env + def replication if partitions.success? - @current_state ||= Models::State.current + status = :success + # low replication is not an error but just a warning and a potential problem + # in case of a crash, this is why we do not fail but warn only + status = :warning if topics_details.values.any? { |det| det[:replication] < 2 } + # Allow for non-production setups to use replication 1 as it is not that relevant + status = :success unless Karafka.env.production? + details = topics_details + else + status = :halted + details = {} + end + + Step.new( + status, + details + ) + end + + # @return [Status::Step] Is the initial consumers state present in Kafka + def initial_consumers_state + if replication.success? + @current_state ||= Models::ConsumersState.current status = @current_state ? :success : :failure else status = :halted end @@ -106,14 +147,29 @@ status, nil ) end + # @return [Status::Step] Is the initial consumers metrics record present in Kafka + def initial_consumers_metrics + if initial_consumers_state.success? + @current_metrics ||= Models::ConsumersMetrics.current + status = @current_metrics ? :success : :failure + else + status = :halted + end + + Step.new( + status, + nil + ) + end + # @return [Status::Step] Is there at least one active karafka server reporting to the # Web UI def live_reporting - if initial_state.success? + if initial_consumers_metrics.success? @processes ||= Models::Processes.active(@current_state) status = @processes.empty? ? :failure : :success else status = :halted end @@ -126,11 +182,15 @@ # @return [Status::Step] is there a subscription to our reports topic that is being # consumed actively. def state_calculation if live_reporting.success? - @subscriptions ||= Models::Health.current(@current_state).values.flat_map(&:keys) + @subscriptions ||= Models::Health + .current(@current_state) + .values.map { |consumer_group| consumer_group[:topics] } + .flat_map(&:keys) + status = @subscriptions.include?(topics_consumers_reports) ? :success : :failure else status = :halted end @@ -138,15 +198,30 @@ status, nil ) end + # @return [Status::Step] Are we able to actually digest the consumers reports with the + # consumer that is consuming them. + def consumers_reports_schema_state + status = if state_calculation.success? + @current_state[:schema_state] == 'compatible' ? :success : :failure + else + :halted + end + + Step.new( + status, + nil + ) + end + # @return [Status::Step] is Pro enabled with all of its features. # @note It's not an error not to have it but we want to warn, that some of the features # may not work without Pro. def pro_subscription - status = if state_calculation.success? + status = if consumers_reports_schema_state.success? ::Karafka.pro? ? :success : :warning else :halted end @@ -166,39 +241,52 @@ # @return [String] consumers reports topic name def topics_consumers_reports ::Karafka::Web.config.topics.consumers.reports.to_s end + # @return [String] consumers metrics topic name + def topics_consumers_metrics + ::Karafka::Web.config.topics.consumers.metrics.to_s + end + # @return [String] errors topic name def topics_errors ::Karafka::Web.config.topics.errors end # @return [Hash] hash with topics with which we work details (even if don't exist) def topics_details + base = { present: false, partitions: 0, replication: 1 } + topics = { - topics_consumers_states => { present: false, partitions: 0 }, - topics_consumers_reports => { present: false, partitions: 0 }, - topics_errors => { present: false, partitions: 0 } + topics_consumers_states => base.dup, + topics_consumers_reports => base.dup, + topics_consumers_metrics => base.dup, + topics_errors => base.dup } @cluster_info.topics.each do |topic| name = topic[:topic_name] next unless topics.key?(name) - topics[name][:present] = true - topics[name][:partitions] = topic[:partition_count] + topics[name].merge!( + present: true, + partitions: topic[:partition_count], + replication: topic[:partitions].map { |part| part[:replica_count] }.max + ) end topics end # Tries connecting with the cluster and saves the cluster info and the connection time # @note If fails, `connection_time` will be 1_000_000 def connect started = Time.now.to_f - @cluster_info = ::Karafka::Admin.cluster_info + # For status we always need uncached data, otherwise status could cache outdated + # info + @cluster_info = Models::ClusterInfo.fetch(cached: false) @connection_time = (Time.now.to_f - started) * 1_000 rescue ::Rdkafka::RdkafkaError @connection_time = 1_000_000 end end