Sha256: de6a3239e54ec2e0f6aae30bda751f79817b4d9666262f37cebee7010d616a28

Contents?: true

Size: 1.53 KB

Versions: 2

Compression:

Stored size: 1.53 KB

Contents

module Asynk
  class Server
    include Singleton

    def initialize
    end

    def run
      require 'asynk/worker'
      prepare_consumers
      register_signal_handlers
      Asynk.logger.info "All consumers are prepared"
      handle_signals
      # handle_signals
    end

    def shutdown
      futures = workers.map { |w| w.future.shutdown }
      futures.map(&:value)
      Asynk.broker.amqp_connection.close
      Asynk.logger.info "Server shutdown!"
    end

    private
      def handle_signals
        loop do
          signal = Thread.main[:signal_queue].shift
          if signal
            Asynk.logger.info "Caught sig#{signal.downcase}, stopping asynk server..."
            shutdown
            break
          end
          sleep(0.1)
        end
      end

      def workers
        @workers ||= []
      end

      def register_signal_handlers
        Thread.main[:signal_queue] = []
        %w(QUIT TERM INT).keep_if { |s| Signal.list.keys.include? s }.map(&:to_sym).each do |sig|
          # This needs to be reentrant, so we queue up signals to be handled
          # in the run loop, rather than acting on signals here
          trap(sig) do
            Thread.main[:signal_queue] << sig
          end
        end
      end

      def prepare_consumers
        Asynk.consumers.each{ |consumer| prepare_consumer(consumer) }
      end

      def prepare_consumer(consumer)
        consumer.concurrency.times do |index|
          workers << Asynk::Worker.new(Asynk.broker.amqp_connection, consumer, index)
        end
      end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
asynk-0.0.2 lib/asynk/server.rb
asynk-0.0.1 lib/asynk/server.rb