lib/cloud_crowd/node.rb in documentcloud-cloud-crowd-0.2.0 vs lib/cloud_crowd/node.rb in documentcloud-cloud-crowd-0.2.1
- old
+ new
@@ -8,13 +8,28 @@
# [post /work] The central server hits <tt>/work</tt> to dispatch a WorkUnit to this Node.
class Node < Sinatra::Default
# A Node's default port. You only run a single node per machine, so they
# can all use the same port without any problems.
- DEFAULT_PORT = 9063
+ DEFAULT_PORT = 9063
- attr_reader :server, :asset_store
+ # A list of regex scrapers, which let us extract the one-minute load
+ # average and the amount of free memory on different flavors of UNIX.
+
+ SCRAPE_UPTIME = /\d+\.\d+/
+ SCRAPE_LINUX_MEMORY = /MemFree:\s+(\d+) kB/
+ SCRAPE_MAC_MEMORY = /Pages free:\s+(\d+)./
+ SCRAPE_MAC_PAGE = /page size of (\d+) bytes/
+
+ # The interval at which the node monitors the machine's load and memory use
+ # (if configured to do so in config.yml).
+ MONITOR_INTERVAL = 3
+
+ # The response sent back when this node is overloaded.
+ OVERLOADED_MESSAGE = 'Node Overloaded'
+
+ attr_reader :asset_store, :enabled_actions, :host, :port, :server
set :root, ROOT
set :authorization_realm, "CloudCrowd"
helpers Helpers
@@ -33,71 +48,125 @@
get '/heartbeat' do
"buh-bump"
end
# Posts a WorkUnit to this Node. Forks a Worker and returns the process id.
+ # Returns a 503 if this Node is overloaded.
post '/work' do
- pid = fork { Worker.new(self, JSON.parse(params[:work_unit])) }
+ throw :halt, [503, OVERLOADED_MESSAGE] if @overloaded
+ pid = fork { Worker.new(self, JSON.parse(params[:work_unit])).run }
Process.detach(pid)
json :pid => pid
end
- # Creating a Node registers with the central server and starts listening for
- # incoming WorkUnits.
+ # When creating a node, specify the port it should run on.
def initialize(port=DEFAULT_PORT)
require 'json'
@server = CloudCrowd.central_server
@host = Socket.gethostname
@enabled_actions = CloudCrowd.actions.keys
@asset_store = AssetStore.new
@port = port || DEFAULT_PORT
-
+ @overloaded = false
+ @max_load = CloudCrowd.config[:max_load]
+ @min_memory = CloudCrowd.config[:min_free_memory]
+ start unless test?
+ end
+
+ # Starting up a Node registers with the central server and begins to listen
+ # for incoming WorkUnits.
+ def start
trap_signals
start_server
- check_in
+ monitor_system if @max_load || @min_memory
+ check_in(true)
@server_thread.join
end
# Checking in with the central server informs it of the location and
# configuration of this Node. If it can't check-in, there's no point in
# starting.
- def check_in
+ def check_in(critical=false)
@server["/node/#{@host}"].put(
:port => @port,
+ :busy => @overloaded,
:max_workers => CloudCrowd.config[:max_workers],
:enabled_actions => @enabled_actions.join(',')
)
rescue Errno::ECONNREFUSED
- puts "Failed to connect to the central server (#{@server.to_s}), exiting..."
- raise SystemExit
+ puts "Failed to connect to the central server (#{@server.to_s})."
+ raise SystemExit if critical
end
# Before exiting, the Node checks out with the central server, releasing all
# of its WorkUnits for other Nodes to handle
def check_out
@server["/node/#{@host}"].delete
end
+ # Is the node overloaded? If configured, checks if the load average is
+ # greater than 'max_load', or if the available RAM is less than
+ # 'min_free_memory'.
+ def overloaded?
+ (@max_load && load_average > @max_load) ||
+ (@min_memory && free_memory < @min_memory)
+ end
+ # The current one-minute load average.
+ def load_average
+ `uptime`.match(SCRAPE_UPTIME).to_s.to_f
+ end
+
+ # The current amount of free memory in megabytes.
+ def free_memory
+ case RUBY_PLATFORM
+ when /darwin/
+ stats = `vm_stat`
+ @mac_page_size ||= stats.match(SCRAPE_MAC_PAGE)[1].to_f / 1048576.0
+ stats.match(SCRAPE_MAC_MEMORY)[1].to_f * @mac_page_size
+ when /linux/
+ `cat /proc/meminfo`.match(SCRAPE_LINUX_MEMORY)[1].to_f / 1024.0
+ else
+ raise NotImplementedError, "'min_free_memory' is not yet implemented on your platform"
+ end
+ end
+
+
private
# Launch the Node's Thin server in a separate thread because it blocks.
def start_server
@server_thread = Thread.new do
Thin::Server.start('0.0.0.0', @port, self, :signals => false)
end
end
+ # Launch a monitoring thread that periodically checks the node's load
+ # average and the amount of free memory remaining. If we transition out of
+ # the overloaded state, let central know.
+ def monitor_system
+ @monitor_thread = Thread.new do
+ loop do
+ was_overloaded = @overloaded
+ @overloaded = overloaded?
+ check_in if was_overloaded && !@overloaded
+ sleep MONITOR_INTERVAL
+ end
+ end
+ end
+
# Trap exit signals in order to shut down cleanly.
def trap_signals
Signal.trap('INT') { shut_down }
Signal.trap('KILL') { shut_down }
Signal.trap('TERM') { shut_down }
end
# At shut down, de-register with the central server before exiting.
def shut_down
+ @monitor_thread.kill if @monitor_thread
check_out
+ @server_thread.kill
Process.exit
end
end
\ No newline at end of file