lib/karafka/connection/status.rb in karafka-2.3.4 vs lib/karafka/connection/status.rb in karafka-2.4.0.beta1

- old
+ new

@@ -3,10 +3,15 @@ module Karafka # Namespace for Kafka connection related logic module Connection # Listener connection status representation class Status + include Helpers::ConfigImporter.new( + monitor: %i[monitor], + conductor: %i[internal connection conductor] + ) + # Available states and their transitions. STATES = { pending: :pending!, starting: :start!, running: :running!, @@ -24,11 +29,12 @@ # Do not allow reverse state transitions (we always go one way) or transition to the # same state as currently return if @status && STATES.keys.index(:#{state}) <= STATES.keys.index(@status) @status = :#{state} - @conductor.signal + conductor.signal + monitor.instrument("connection.listener.#{state}", caller: self) end end # @return [Boolean] are we in a given state def #{state}? @@ -37,24 +43,30 @@ RUBY end def initialize @mutex = Mutex.new - @conductor = Karafka::App.config.internal.connection.conductor pending! end # If this listener was not even running, will just move it through states until final. # If it was running, will start the stopping procedures. # Will do nothing if it was already stopped def stop! if pending? @status = :stopping + conductor.signal + monitor.instrument('connection.listener.stopping', caller: self) + stopped! elsif stopped? nil + elsif stopping? + nil else @status = :stopping + conductor.signal + monitor.instrument('connection.listener.stopping', caller: self) end end # Moves status back from stopped to pending (and only that). We should not be able to reset # listeners that are not stopped