# frozen_string_literal: true module Karafka module Web module Management # @note This runs on each process start that has `karafka.rb`. It needs to be executed # also in the context of other processes types and not only karafka server, because it # installs producers instrumentation and routing as well. class Enable < Base # Enables routing consumer group and subscribes Web-UI listeners def call extend_routing subscribe_to_monitor subscribe_to_close_web_producer end private # Enables all the needed routes def extend_routing ::Karafka::App.routes.draw do web_deserializer = ::Karafka::Web::Deserializer.new consumer_group ::Karafka::Web.config.processing.consumer_group do # Topic we listen on to materialize the states topic ::Karafka::Web.config.topics.consumers.reports do config(active: false) active ::Karafka::Web.config.processing.active # Since we materialize state in intervals, we can poll for half of this time without # impacting the reporting responsiveness max_wait_time ::Karafka::Web.config.processing.interval / 2 max_messages 1_000 consumer ::Karafka::Web::Processing::Consumer # This needs to be true in order not to reload the consumer in dev. This consumer # should not be affected by the end user development process consumer_persistence true deserializer web_deserializer manual_offset_management true # Start from the most recent data, do not materialize historical states # This prevents us from dealing with cases, where client id would be changed and # consumer group name would be renamed and we would start consuming all historical initial_offset 'latest' end # We define those three here without consumption, so Web understands how to deserialize # them when used / viewed topic ::Karafka::Web.config.topics.consumers.states do config(active: false) active false deserializer web_deserializer end topic ::Karafka::Web.config.topics.consumers.metrics do config(active: false) active false deserializer web_deserializer end topic ::Karafka::Web.config.topics.errors do config(active: false) active false deserializer web_deserializer end end end end # Subscribes with all needed listeners def subscribe_to_monitor # Installs all the consumer related listeners ::Karafka::Web.config.tracking.consumers.listeners.each do |listener| ::Karafka.monitor.subscribe(listener) end # Installs all the producer related listeners into Karafka default listener and # into Karafka::Web listener in case it would be different than the Karafka one ::Karafka::Web.config.tracking.producers.listeners.each do |listener| ::Karafka.producer.monitor.subscribe(listener) # Do not instrument twice in case only one default producer is used next if ::Karafka.producer == ::Karafka::Web.producer ::Karafka::Web.producer.monitor.subscribe(listener) end end # In most cases we want to close the producer if possible. # While we cannot do it easily in user processes and we should rely on WaterDrop # finalization logic, we can do it in `karafka server` on terminate # # In other places, this producer anyhow should not be used. def subscribe_to_close_web_producer ::Karafka::App.monitor.subscribe('app.terminated') do # If Web producer is the same as `Karafka.producer` it will do nothing as you can # call `#close` multiple times without side effects ::Karafka::Web.producer.close end end end end end end