lib/cloud_crowd/node.rb in documentcloud-cloud-crowd-0.2.1 vs lib/cloud_crowd/node.rb in documentcloud-cloud-crowd-0.2.2
- old
+ new
@@ -25,11 +25,11 @@
MONITOR_INTERVAL = 3
# The response sent back when this node is overloaded.
OVERLOADED_MESSAGE = 'Node Overloaded'
- attr_reader :asset_store, :enabled_actions, :host, :port, :server
+ attr_reader :enabled_actions, :host, :port, :central
set :root, ROOT
set :authorization_realm, "CloudCrowd"
helpers Helpers
@@ -51,60 +51,73 @@
# Posts a WorkUnit to this Node. Forks a Worker and returns the process id.
# Returns a 503 if this Node is overloaded.
post '/work' do
throw :halt, [503, OVERLOADED_MESSAGE] if @overloaded
- pid = fork { Worker.new(self, JSON.parse(params[:work_unit])).run }
+ unit = JSON.parse(params[:work_unit])
+ pid = fork { Worker.new(self, unit).run }
Process.detach(pid)
json :pid => pid
end
# When creating a node, specify the port it should run on.
- def initialize(port=DEFAULT_PORT)
+ def initialize(port=nil, daemon=false)
require 'json'
- @server = CloudCrowd.central_server
+ @central = CloudCrowd.central_server
@host = Socket.gethostname
@enabled_actions = CloudCrowd.actions.keys
- @asset_store = AssetStore.new
@port = port || DEFAULT_PORT
+ @daemon = daemon
@overloaded = false
@max_load = CloudCrowd.config[:max_load]
@min_memory = CloudCrowd.config[:min_free_memory]
start unless test?
end
# Starting up a Node registers with the central server and begins to listen
# for incoming WorkUnits.
def start
+ FileUtils.mkdir_p(CloudCrowd.log_path) if @daemon && !File.exists?(CloudCrowd.log_path)
+ @server = Thin::Server.new('0.0.0.0', @port, self, :signals => false)
+ @server.tag = 'cloud-crowd-node'
+ @server.pid_file = CloudCrowd.pid_path('node.pid')
+ @server.log_file = CloudCrowd.log_path('node.log')
+ @server.daemonize if @daemon
trap_signals
- start_server
- monitor_system if @max_load || @min_memory
+ asset_store
+ @server_thread = Thread.new { @server.start }
check_in(true)
+ monitor_system if @max_load || @min_memory
@server_thread.join
end
# Checking in with the central server informs it of the location and
# configuration of this Node. If it can't check-in, there's no point in
# starting.
def check_in(critical=false)
- @server["/node/#{@host}"].put(
+ @central["/node/#{@host}"].put(
:port => @port,
:busy => @overloaded,
:max_workers => CloudCrowd.config[:max_workers],
:enabled_actions => @enabled_actions.join(',')
)
rescue Errno::ECONNREFUSED
- puts "Failed to connect to the central server (#{@server.to_s})."
+ puts "Failed to connect to the central server (#{@central.to_s})."
raise SystemExit if critical
end
# Before exiting, the Node checks out with the central server, releasing all
# of its WorkUnits for other Nodes to handle
def check_out
- @server["/node/#{@host}"].delete
+ @central["/node/#{@host}"].delete
end
+ # Lazy-initialize the asset_store, preferably after the Node has launched.
+ def asset_store
+ @asset_store ||= AssetStore.new
+ end
+
# Is the node overloaded? If configured, checks if the load average is
# greater than 'max_load', or if the available RAM is less than
# 'min_free_memory'.
def overloaded?
(@max_load && load_average > @max_load) ||
@@ -131,17 +144,10 @@
end
private
- # Launch the Node's Thin server in a separate thread because it blocks.
- def start_server
- @server_thread = Thread.new do
- Thin::Server.start('0.0.0.0', @port, self, :signals => false)
- end
- end
-
# Launch a monitoring thread that periodically checks the node's load
# average and the amount of free memory remaining. If we transition out of
# the overloaded state, let central know.
def monitor_system
@monitor_thread = Thread.new do
@@ -154,19 +160,20 @@
end
end
# Trap exit signals in order to shut down cleanly.
def trap_signals
+ Signal.trap('QUIT') { shut_down }
Signal.trap('INT') { shut_down }
Signal.trap('KILL') { shut_down }
Signal.trap('TERM') { shut_down }
end
# At shut down, de-register with the central server before exiting.
def shut_down
@monitor_thread.kill if @monitor_thread
check_out
- @server_thread.kill
+ @server_thread.kill if @server_thread
Process.exit
end
end
\ No newline at end of file