Sha256: 892dced4586a533878b33cd7a2264f08b7cb6a769ee319274f69feedf81227c9

Contents?: true

Size: 1.88 KB

Versions: 3

Compression:

Stored size: 1.88 KB

Contents

module Druid
  module Node
    class Overlord
      INDEXER_PATH = '/druid/indexer/v1/'.freeze
      RUNNING_TASKS_PATH = (INDEXER_PATH + 'runningTasks').freeze
      TASK_PATH = INDEXER_PATH + 'task/'

      attr_reader :config, :zk
      def initialize(config, zk)
        @config = config
        @zk = zk
      end

      #TODO: DRY: copy/paste
      def connection
        overlord = zk.registry["#{config.discovery_path}/druid:overlord"].first
        raise Druid::ConnectionError, 'no druid overlords available' if overlord.nil?
        zk.registry["#{config.discovery_path}/druid:overlord"].rotate! # round-robin load balancing
        Druid::Connection.new(host: overlord[:host], port: overlord[:port])
      end

      def running_tasks(datasource_name = nil)
        response = connection.get(RUNNING_TASKS_PATH)
        raise ConnectionError, 'Could not retrieve running tasks' unless response.code.to_i == 200
        tasks = JSON.parse(response.body).map{|task| task['id']}
        tasks.select!{ |task| task.include? datasource_name } if datasource_name
        tasks ? tasks : []
      end

      def shutdown_task(task)
        response = connection.post(TASK_PATH + task + '/shutdown')
        raise ConnectionError, 'Unable to shutdown task' unless response.code.to_i == 200
        bounded_wait_for_shutdown(task)
      end

      def shutdown_tasks(datasource_name = nil)
        tasks = running_tasks(datasource_name)
        tasks.each{|task| shutdown_task(task)}
      end

      private

      def bounded_wait_for_shutdown(task)
        condition = !(running_tasks.include? task)
        attempts = 0
        max = 10

        until(condition) do
          attempts += 1
          sleep 1
          condition = !(running_tasks.include? task)
          break if attempts >= max
        end

        raise ClientError, 'Task did not shutdown.' unless condition
        true
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 2 rubygems

Version Path
druiddb-1.0.1 lib/druid/node/overlord.rb
druiddb-1.0.0 lib/druid/node/overlord.rb
jruby-druid-2.0.0.edge.1 lib/druid/node/overlord.rb