Sha256: ffd4854a503fcf0e466734888bf894071f9a55ece77d907ae7b4cf6fbce1bd32

Contents?: true

Size: 1.5 KB

Versions: 3

Compression:

Stored size: 1.5 KB

Contents

module Druid
  module Node
    class Overlord
      INDEXER_PATH = '/druid/indexer/v1/'.freeze
      RUNNING_TASKS_PATH = (INDEXER_PATH + 'runningTasks').freeze
      TASK_PATH = INDEXER_PATH + 'task/'

      attr_reader :connection
      def initialize(config)
        @connection = Druid::Connection.new(config.overlord_uri)
      end

      def running_tasks(datasource_name = nil)
        response = connection.get(RUNNING_TASKS_PATH)
        raise ConnectionError, 'Could not retrieve running tasks' unless response.code.to_i == 200
        tasks = JSON.parse(response.body).map{|task| task['id']}
        tasks.select!{ |task| task.include? datasource_name } if datasource_name
        tasks ? tasks : []
      end

      def shutdown_task(task)
        response = connection.post(TASK_PATH + task + '/shutdown')
        raise ConnectionError, 'Unable to shutdown task' unless response.code.to_i == 200
        bounded_wait_for_shutdown(task)
      end

      def shutdown_tasks(datasource_name = nil)
        tasks = running_tasks(datasource_name)
        tasks.each{|task| shutdown_task(task)}
      end

      private

      def bounded_wait_for_shutdown(task)
        condition = !(running_tasks.include? task)
        attempts = 0
        max = 10

        until(condition) do
          attempts += 1
          sleep 1
          condition = !(running_tasks.include? task)
          break if attempts >= max
        end

        raise ClientError, 'Task did not shutdown.' unless condition
        true
      end
    end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
jruby-druid-1.0.0.pre.rc4 lib/druid/node/overlord.rb
jruby-druid-1.0.0.pre.rc3 lib/druid/node/overlord.rb
jruby-druid-1.0.0.pre.rc2 lib/druid/node/overlord.rb