Sha256: eca7d32766f191970de7d95ece67200c3da67d45334faf4fae5538c74f99c125

Contents?: true

Size: 1.96 KB

Versions: 1

Compression:

Stored size: 1.96 KB

Contents

# frozen_string_literal: true

# :reek:TooManyInstanceVariables

module RubyRabbitmqJanus
  module Janus
    module Concurrencies
      # Object thread
      class KeepaliveThread < Thread
        attr_reader :timer, :instance, :session

        def initialize(instance, rabbit, &block)
          @publisher = @session = nil
          @rabbit = rabbit
          @timer = KeepaliveTimer.new
          @message = KeepaliveMessage.new(instance)
          super(&block)
        end

        # Initialize a transaction with Janus Instance.
        # Create a session and save response
        def initialize_janus_session
          @publisher = publisher
          @session = response_session
        end

        # Restart session
        def restart_session
          @session = response_session
          response_keepalive
        end

        # Start a timer for TTL
        def start
          @timer.loop_keepalive { response_keepalive }
        end

        # Kill session and disable instance
        def kill
          response_destroy
          super
        end

        def instance_is_down
          janus = Models::JanusInstance.find_by_session(@session)
          janus.set(enable: false)

          Tools::Log.instance.fatal \
            "Janus Instance [#{janus.instance}] is down, kill thread."
          Thread.instance_method(:kill).bind(self).call
        end

        private

        def publisher
          Rabbit::Publisher::PublishKeepalive.new(@rabbit.channel)
        end

        def response_session
          @message.response_session(publish(@message.session))
        end

        def response_keepalive
          keepalive = @message.keepalive(@session)
          @message.response_acknowledgement(publish(keepalive))
        end

        def response_destroy
          destroy = @message.destroy(@session)
          @message.response_destroy(publish(destroy))
        end

        def publish(message)
          @publisher.publish(message)
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
ruby_rabbitmq_janus-2.2.0.pre.166 lib/rrj/janus/processus/keepalive/keepalive_thread.rb