require "uri" require "json" require "net/http" require "erb" require "fileutils" module Twirl class Server ConfigTemplate = <<_EOC import com.twitter.conversions.storage._ import com.twitter.conversions.time._ import com.twitter.logging.config._ import com.twitter.ostrich.admin.config._ import net.lag.kestrel.config._ new KestrelConfig { listenAddress = "0.0.0.0" memcacheListenPort = <%= @memcache_port %> textListenPort = <%= @text_port %> thriftListenPort = <%= @thrift_port %> queuePath = "<%= @queue_path %>" clientTimeout = 30.seconds expirationTimerFrequency = 1.second maxOpenTransactions = 100 default.defaultJournalSize = 16.megabytes default.maxMemorySize = 128.megabytes default.maxJournalSize = 1.gigabyte admin.httpPort = <%= @admin_port %> admin.statsNodes = new StatsConfig { reporters = new TimeSeriesCollectorConfig } loggers = new LoggerConfig { level = Level.DEBUG handlers = new FileHandlerConfig { filename = "<%= @log_file %>" roll = Policy.Never } } } _EOC # Public: The version kestrel will run. attr_reader :version # Public: The memcache_port kestrel will run on. attr_reader :memcache_port # Public: The thrift_port kestrel will run on. attr_reader :thrift_port # Public: The text_port kestrel will run on. attr_reader :text_port # Public: The admin_port kestrel will run on. attr_reader :admin_port def initialize(dir, options = {}) @dir = dir @version = options.fetch(:version) { "2.4.1" } @memcache_port = options.fetch(:memcache_port) { 22133 } @thrift_port = options.fetch(:thrift_port) { 2229 } @text_port = options.fetch(:text_port) { 2222 } @admin_port = options.fetch(:admin_port) { 2223 } @download_dir = options.fetch(:download_dir) { "/tmp" } @stage = options.fetch(:stage) { "twirl" } @remote_zip = "http://robey.github.io/kestrel/download/kestrel-#{@version}.zip" @zip_file = File.join(@download_dir, "kestrel-#{@version}.zip") @unzipped_file = File.join(@download_dir, "kestrel-#{@version}") @jar_file = File.join(@unzipped_file, "kestrel_2.9.2-#{@version}.jar") @queue_path = File.join(@dir, "data") @log_path = File.join(@dir, "logs") @config_path = File.join(@dir, "config") @log_file = File.join(@log_path, "kestrel.log") @config_file = File.join(@config_path, "#{@stage}.scala") end # Public: Downloads, unzips and starts the server. def start ensure_downloaded ensure_unzipped ensure_configured start_server end # Public: Stops the server. def stop stop_server end # Private: Downloads the file if it has not been downloaded. def ensure_downloaded download unless downloaded? end # Private: Returns true or false depending on whether the file has # been downloaded. def downloaded? File.exists?(@zip_file) end # Private: Downloads the file. def download uri = URI(@remote_zip) Net::HTTP.start(uri.host, uri.port) do |http| request = Net::HTTP::Get.new uri.path http.request request do |response| if response.code.to_i == 200 downloaded = 0 last_percent = 0 total = response["content-length"].to_i puts "Downloading #{total} bytes to #{@zip_file}" File.open @zip_file, "w" do |io| response.read_body do |chunk| io.write chunk downloaded += chunk.size percent_complete = ((downloaded.to_f / total) * 100).round show_status = percent_complete % 5 == 0 && last_percent != percent_complete if show_status last_percent = percent_complete puts "#{downloaded}/#{total}\t#{percent_complete}%" end end end else abort "Could not downloaded kestrel from #{@remote_zip} #{response.inspect}" end end end end # Private: Ensures that file is unzipped if downloaded. def ensure_unzipped if downloaded? && !unzipped? unzip end end # Private: Unzips the file. def unzip Dir.chdir(File.dirname(@zip_file)) do system "unzip", "-o", @zip_file end end # Private: Returns true or false depending on whether the file has # been unzipped. def unzipped? File.exists?(@unzipped_file) end # Private: Ensure directories and configuration files are ready to go. def ensure_configured [ @queue_path, @log_path, @config_path, ].each do |path| FileUtils.mkdir_p path end config_contents = ERB.new(ConfigTemplate).result(binding) File.write @config_file, config_contents end # Private: Starts the server. Assumes downloaded and unzipped def start_server puts "Starting server." Dir.chdir(@dir) do system "java -jar #{@jar_file} -f #{@config_file} &" loop do break if running? end end puts "Started server." end # Private: Stops the server. def stop_server puts "Stopping server." shutdown loop do break if stopped? end puts "Stopped server." end # Private: Returns true if server is running else false. def running? return "pong" == ping rescue => exception $stderr.puts exception.inspect false end # Private: Returns true if server is stopped else false. def stopped? return !running? end # Private: Prints out the status of the server. def status if running? :running else :stopped end rescue => exception :unknown end # Private: Pings the kestrel server. def ping get_response("ping")["response"] end # Private: Shutsdown the kestrel server. def shutdown h = get_response("shutdown") return h["response"] == "ok" rescue => exception puts "Failed to shutdown: #{exception.inspect}" false end # Private: Allows requesting things from the kestrel admin. def get_response(path) uri = URI.parse("http://localhost:#{@admin_port}/#{path}") response = Net::HTTP.get_response(uri) JSON.parse(response.body) rescue => exception {} end end end