lib/cloud_crowd/node.rb in documentcloud-cloud-crowd-0.2.1 vs lib/cloud_crowd/node.rb in documentcloud-cloud-crowd-0.2.2

- old
+ new

@@ -25,11 +25,11 @@ MONITOR_INTERVAL = 3 # The response sent back when this node is overloaded. OVERLOADED_MESSAGE = 'Node Overloaded' - attr_reader :asset_store, :enabled_actions, :host, :port, :server + attr_reader :enabled_actions, :host, :port, :central set :root, ROOT set :authorization_realm, "CloudCrowd" helpers Helpers @@ -51,60 +51,73 @@ # Posts a WorkUnit to this Node. Forks a Worker and returns the process id. # Returns a 503 if this Node is overloaded. post '/work' do throw :halt, [503, OVERLOADED_MESSAGE] if @overloaded - pid = fork { Worker.new(self, JSON.parse(params[:work_unit])).run } + unit = JSON.parse(params[:work_unit]) + pid = fork { Worker.new(self, unit).run } Process.detach(pid) json :pid => pid end # When creating a node, specify the port it should run on. - def initialize(port=DEFAULT_PORT) + def initialize(port=nil, daemon=false) require 'json' - @server = CloudCrowd.central_server + @central = CloudCrowd.central_server @host = Socket.gethostname @enabled_actions = CloudCrowd.actions.keys - @asset_store = AssetStore.new @port = port || DEFAULT_PORT + @daemon = daemon @overloaded = false @max_load = CloudCrowd.config[:max_load] @min_memory = CloudCrowd.config[:min_free_memory] start unless test? end # Starting up a Node registers with the central server and begins to listen # for incoming WorkUnits. def start + FileUtils.mkdir_p(CloudCrowd.log_path) if @daemon && !File.exists?(CloudCrowd.log_path) + @server = Thin::Server.new('0.0.0.0', @port, self, :signals => false) + @server.tag = 'cloud-crowd-node' + @server.pid_file = CloudCrowd.pid_path('node.pid') + @server.log_file = CloudCrowd.log_path('node.log') + @server.daemonize if @daemon trap_signals - start_server - monitor_system if @max_load || @min_memory + asset_store + @server_thread = Thread.new { @server.start } check_in(true) + monitor_system if @max_load || @min_memory @server_thread.join end # Checking in with the central server informs it of the location and # configuration of this Node. If it can't check-in, there's no point in # starting. def check_in(critical=false) - @server["/node/#{@host}"].put( + @central["/node/#{@host}"].put( :port => @port, :busy => @overloaded, :max_workers => CloudCrowd.config[:max_workers], :enabled_actions => @enabled_actions.join(',') ) rescue Errno::ECONNREFUSED - puts "Failed to connect to the central server (#{@server.to_s})." + puts "Failed to connect to the central server (#{@central.to_s})." raise SystemExit if critical end # Before exiting, the Node checks out with the central server, releasing all # of its WorkUnits for other Nodes to handle def check_out - @server["/node/#{@host}"].delete + @central["/node/#{@host}"].delete end + # Lazy-initialize the asset_store, preferably after the Node has launched. + def asset_store + @asset_store ||= AssetStore.new + end + # Is the node overloaded? If configured, checks if the load average is # greater than 'max_load', or if the available RAM is less than # 'min_free_memory'. def overloaded? (@max_load && load_average > @max_load) || @@ -131,17 +144,10 @@ end private - # Launch the Node's Thin server in a separate thread because it blocks. - def start_server - @server_thread = Thread.new do - Thin::Server.start('0.0.0.0', @port, self, :signals => false) - end - end - # Launch a monitoring thread that periodically checks the node's load # average and the amount of free memory remaining. If we transition out of # the overloaded state, let central know. def monitor_system @monitor_thread = Thread.new do @@ -154,19 +160,20 @@ end end # Trap exit signals in order to shut down cleanly. def trap_signals + Signal.trap('QUIT') { shut_down } Signal.trap('INT') { shut_down } Signal.trap('KILL') { shut_down } Signal.trap('TERM') { shut_down } end # At shut down, de-register with the central server before exiting. def shut_down @monitor_thread.kill if @monitor_thread check_out - @server_thread.kill + @server_thread.kill if @server_thread Process.exit end end \ No newline at end of file