lib/karafka/connection/status.rb in karafka-2.3.4 vs lib/karafka/connection/status.rb in karafka-2.4.0.beta1
- old
+ new
@@ -3,10 +3,15 @@
module Karafka
# Namespace for Kafka connection related logic
module Connection
# Listener connection status representation
class Status
+ include Helpers::ConfigImporter.new(
+ monitor: %i[monitor],
+ conductor: %i[internal connection conductor]
+ )
+
# Available states and their transitions.
STATES = {
pending: :pending!,
starting: :start!,
running: :running!,
@@ -24,11 +29,12 @@
# Do not allow reverse state transitions (we always go one way) or transition to the
# same state as currently
return if @status && STATES.keys.index(:#{state}) <= STATES.keys.index(@status)
@status = :#{state}
- @conductor.signal
+ conductor.signal
+ monitor.instrument("connection.listener.#{state}", caller: self)
end
end
# @return [Boolean] are we in a given state
def #{state}?
@@ -37,24 +43,30 @@
RUBY
end
def initialize
@mutex = Mutex.new
- @conductor = Karafka::App.config.internal.connection.conductor
pending!
end
# If this listener was not even running, will just move it through states until final.
# If it was running, will start the stopping procedures.
# Will do nothing if it was already stopped
def stop!
if pending?
@status = :stopping
+ conductor.signal
+ monitor.instrument('connection.listener.stopping', caller: self)
+
stopped!
elsif stopped?
nil
+ elsif stopping?
+ nil
else
@status = :stopping
+ conductor.signal
+ monitor.instrument('connection.listener.stopping', caller: self)
end
end
# Moves status back from stopped to pending (and only that). We should not be able to reset
# listeners that are not stopped