Sha256: b77ad99e7b255411a2db3446f5b261b8696518b344db9bb95b718984425f9f2b
Contents?: true
Size: 1.83 KB
Versions: 6
Compression:
Stored size: 1.83 KB
Contents
# frozen_string_literal: true module Karafka module Web module Ui module Models # Single consumer process representation class Process < Lib::HashProxy class << self # Looks for a given process based on its id # @param state [State] state of the system based on which we will do the lookup # @param process_id [String] id of the process we are looking for # @return [Process] selected process or error raised # @raise [::Karafka::Web::Errors::Ui::NotFoundError] raised if process not found def find(state, process_id) found_process = Processes.active(state).find { |process| process.id == process_id } found_process || raise(::Karafka::Web::Errors::Ui::NotFoundError, process_id) end end # @return [String] process id without the name and ip def id @id ||= name.split(':').last end # @return [Array<ConsumerGroup>] consumer groups to which this process is subscribed in # an alphabetical order def consumer_groups super .values .map { |cg_hash| ConsumerGroup.new(cg_hash) } .sort_by(&:id) end # Jobs sorted from longest running to youngest # @return [Array<Job>] current jobs of this process def jobs super .map { |job| Job.new(job) } .sort_by(&:started_at) end # @return [Integer] collective lag on this process def lag_stored consumer_groups .flat_map(&:topics) .flat_map(&:partitions) .map(&:lag_stored) .delete_if(&:negative?) .sum end end end end end end
Version data entries
6 entries across 6 versions & 1 rubygems