Sha256: 6f6a408259299999bdac2695f20b55d4c49e5a15a084ea6ebd517d070e464b73

Contents?: true

Size: 1.94 KB

Versions: 1

Compression:

Stored size: 1.94 KB

Contents

module Eventboss
  class Runner
    extend Logging

    class << self
      def launch
        queues = Eventboss::QueueListener.list
        client = Eventboss.configuration.sqs_client
        config = Eventboss.configuration

        Eventboss::Instrumentation.add(queues)

        launcher = Launcher.new(queues, client, worker_count: config.concurrency)

        self_read = setup_signals([:SIGTERM])

        begin
          launcher.start
          handle_signals(self_read, launcher)
        rescue Interrupt
          launcher.stop
          exit 0
        end
      end

      def start
        configuration = Eventboss.configuration

        queue_listeners = Eventboss::QueueListener.list
        Eventboss::Instrumentation.add(queue_listeners)
        polling_strategy = configuration.polling_strategy.call(queue_listeners.keys)

        fetcher = Eventboss::Fetcher.new(configuration)
        executor = Concurrent.global_io_executor

        manager = Eventboss::Manager.new(
          fetcher,
          polling_strategy,
          executor,
          queue_listeners,
          configuration.concurrency,
          configuration.error_handlers
        )

        manager.start

        self_read = setup_signals([:SIGTERM])

        begin
          handle_signals(self_read)
        rescue Interrupt
          executor.shutdown
          executor.wait_for_termination
          exit 0
        end
      end

      private

      def setup_signals(signals)
        self_read, self_write = IO.pipe

        signals.each do |signal|
          trap signal do
            self_write.puts signal
          end
        end

        self_read
      end


      def handle_signals(self_read, launcher)
        while readable_io = IO.select([self_read])
          signal = readable_io.first[0].gets.strip
          logger.info("Received #{ signal } signal, gracefully shutdowning...", 'runner')

          launcher.stop
          exit 0
        end
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
eventboss-1.1.0 lib/eventboss/runner.rb