lib/cloud_crowd/node.rb in documentcloud-cloud-crowd-0.2.0 vs lib/cloud_crowd/node.rb in documentcloud-cloud-crowd-0.2.1

- old
+ new

@@ -8,13 +8,28 @@ # [post /work] The central server hits <tt>/work</tt> to dispatch a WorkUnit to this Node. class Node < Sinatra::Default # A Node's default port. You only run a single node per machine, so they # can all use the same port without any problems. - DEFAULT_PORT = 9063 + DEFAULT_PORT = 9063 - attr_reader :server, :asset_store + # A list of regex scrapers, which let us extract the one-minute load + # average and the amount of free memory on different flavors of UNIX. + + SCRAPE_UPTIME = /\d+\.\d+/ + SCRAPE_LINUX_MEMORY = /MemFree:\s+(\d+) kB/ + SCRAPE_MAC_MEMORY = /Pages free:\s+(\d+)./ + SCRAPE_MAC_PAGE = /page size of (\d+) bytes/ + + # The interval at which the node monitors the machine's load and memory use + # (if configured to do so in config.yml). + MONITOR_INTERVAL = 3 + + # The response sent back when this node is overloaded. + OVERLOADED_MESSAGE = 'Node Overloaded' + + attr_reader :asset_store, :enabled_actions, :host, :port, :server set :root, ROOT set :authorization_realm, "CloudCrowd" helpers Helpers @@ -33,71 +48,125 @@ get '/heartbeat' do "buh-bump" end # Posts a WorkUnit to this Node. Forks a Worker and returns the process id. + # Returns a 503 if this Node is overloaded. post '/work' do - pid = fork { Worker.new(self, JSON.parse(params[:work_unit])) } + throw :halt, [503, OVERLOADED_MESSAGE] if @overloaded + pid = fork { Worker.new(self, JSON.parse(params[:work_unit])).run } Process.detach(pid) json :pid => pid end - # Creating a Node registers with the central server and starts listening for - # incoming WorkUnits. + # When creating a node, specify the port it should run on. def initialize(port=DEFAULT_PORT) require 'json' @server = CloudCrowd.central_server @host = Socket.gethostname @enabled_actions = CloudCrowd.actions.keys @asset_store = AssetStore.new @port = port || DEFAULT_PORT - + @overloaded = false + @max_load = CloudCrowd.config[:max_load] + @min_memory = CloudCrowd.config[:min_free_memory] + start unless test? + end + + # Starting up a Node registers with the central server and begins to listen + # for incoming WorkUnits. + def start trap_signals start_server - check_in + monitor_system if @max_load || @min_memory + check_in(true) @server_thread.join end # Checking in with the central server informs it of the location and # configuration of this Node. If it can't check-in, there's no point in # starting. - def check_in + def check_in(critical=false) @server["/node/#{@host}"].put( :port => @port, + :busy => @overloaded, :max_workers => CloudCrowd.config[:max_workers], :enabled_actions => @enabled_actions.join(',') ) rescue Errno::ECONNREFUSED - puts "Failed to connect to the central server (#{@server.to_s}), exiting..." - raise SystemExit + puts "Failed to connect to the central server (#{@server.to_s})." + raise SystemExit if critical end # Before exiting, the Node checks out with the central server, releasing all # of its WorkUnits for other Nodes to handle def check_out @server["/node/#{@host}"].delete end + # Is the node overloaded? If configured, checks if the load average is + # greater than 'max_load', or if the available RAM is less than + # 'min_free_memory'. + def overloaded? + (@max_load && load_average > @max_load) || + (@min_memory && free_memory < @min_memory) + end + # The current one-minute load average. + def load_average + `uptime`.match(SCRAPE_UPTIME).to_s.to_f + end + + # The current amount of free memory in megabytes. + def free_memory + case RUBY_PLATFORM + when /darwin/ + stats = `vm_stat` + @mac_page_size ||= stats.match(SCRAPE_MAC_PAGE)[1].to_f / 1048576.0 + stats.match(SCRAPE_MAC_MEMORY)[1].to_f * @mac_page_size + when /linux/ + `cat /proc/meminfo`.match(SCRAPE_LINUX_MEMORY)[1].to_f / 1024.0 + else + raise NotImplementedError, "'min_free_memory' is not yet implemented on your platform" + end + end + + private # Launch the Node's Thin server in a separate thread because it blocks. def start_server @server_thread = Thread.new do Thin::Server.start('0.0.0.0', @port, self, :signals => false) end end + # Launch a monitoring thread that periodically checks the node's load + # average and the amount of free memory remaining. If we transition out of + # the overloaded state, let central know. + def monitor_system + @monitor_thread = Thread.new do + loop do + was_overloaded = @overloaded + @overloaded = overloaded? + check_in if was_overloaded && !@overloaded + sleep MONITOR_INTERVAL + end + end + end + # Trap exit signals in order to shut down cleanly. def trap_signals Signal.trap('INT') { shut_down } Signal.trap('KILL') { shut_down } Signal.trap('TERM') { shut_down } end # At shut down, de-register with the central server before exiting. def shut_down + @monitor_thread.kill if @monitor_thread check_out + @server_thread.kill Process.exit end end \ No newline at end of file