The ProjectBroker is the central object of the TaskJuggler daemon. It can manage multiple scheduled projects that it keeps in separate sub processes. Requests to a specific project will be redirected to the specific ProjectServer process. Projects can be added or removed. Adding an already existing one (identified by project ID) will replace the old one as soon as the new one has been scheduled successfully.
The daemon uses DRb to communicate with the client and it’s sub processes. The communication is restricted to localhost. All remote commands require an authentication key.
Currently only tj3client can be used to communicate with the TaskJuggler daemon.
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 43 43: def initialize 44: super 45: # We don't have a default key. The user must provice a key in the config 46: # file. Otherwise the daemon will not start. 47: @authKey = nil 48: # The default TCP/IP port. ASCII code decimals for 'T' and 'J'. 49: @port = 8474 50: # The name of the URI file. 51: @uriFile = nil 52: # A list of loaded projects as Array of ProjectRecord objects. 53: @projects = [] 54: # We operate with multiple threads so we need a Monitor to synchronize 55: # the access to the list. 56: @projects.extend(MonitorMixin) 57: 58: # A list of the initial projects. Array with Array of files names. 59: @projectFiles = [] 60: 61: # This Queue is used to load new projects. The DRb thread pushes load 62: # requests that the housekeeping thread will then perform. 63: @projectsToLoad = Queue.new 64: 65: # Set this flag to true to have standard IO logged into files. There 66: # will be seperate set of files for each process. 67: @logStdIO = !@daemonize 68: 69: # Reference to WEBrick object. 70: @webServer = nil 71: 72: # Port used by the web server 73: @webServerPort = 8080 74: 75: # True if web server should be activated 76: @enableWebServer = false 77: 78: # This flag will be set to true to terminate the daemon. 79: @terminate = false 80: end
Adding a new project or replacing an existing one. The command waits until the project has been loaded or the load has failed.
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 214 214: def addProject(cwd, args, stdOut, stdErr, stdIn, silent) 215: # We need some tag to identify the ProjectRecord that this project was 216: # associated to. Just use a large enough random number. 217: tag = rand(9999999999999) 218: 219: @log.debug("Pushing #{tag} to load Queue") 220: @projectsToLoad.push(tag) 221: 222: # Now we have to wait until the project shows up in the @projects 223: # list. We use our tag to identify the right entry. 224: pr = nil 225: while pr.nil? 226: @projects.synchronize do 227: @projects.each do |p| 228: if p.tag == tag 229: pr = p 230: break 231: end 232: end 233: end 234: # The wait in this loop should be pretty short and we don't want to 235: # miss IO from the ProjectServer process. 236: sleep 0.1 unless pr 237: end 238: 239: @log.debug("Found tag #{tag} in list of loaded projects with URI " + 240: "#{pr.uri}") 241: # Return the URI and the authentication key of the new ProjectServer. 242: [ pr.uri, pr.authKey ] 243: 244: # Open a DRb connection to the ProjectServer process 245: begin 246: projectServer = DRbObject.new(nil, pr.uri) 247: rescue 248: stdErr.puts "Can't get ProjectServer object: #{$!}" 249: return false 250: end 251: begin 252: # Hook up IO from requestor to the ProjectServer process. 253: projectServer.connect(pr.authKey, stdOut, stdErr, stdIn, silent) 254: rescue 255: stdErr.puts "Can't connect IO: #{$!}" 256: return false 257: end 258: 259: # Ask the ProjectServer to load the files in _args_ into the 260: # ProjectServer. 261: begin 262: res = projectServer.loadProject(pr.authKey, [ cwd, *args ]) 263: rescue 264: stdErr.puts "Loading of project failed: #{$!}" 265: return false 266: end 267: 268: # Disconnect the IO from the ProjectServer and close the DRb connection. 269: begin 270: projectServer.disconnect(pr.authKey) 271: rescue 272: stdErr.puts "Can't disconnect IO: #{$!}" 273: return false 274: end 275: 276: res 277: end
All remote commands must provide the proper authentication key. Usually the client and the server get this secret key from the same configuration file.
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 162 162: def checkKey(authKey, command) 163: if authKey == @authKey 164: @log.debug("Accepted authentication key for command '#{command}'") 165: else 166: @log.warning("Rejected wrong authentication key '#{authKey}' " + 167: "for command '#{command}'") 168: return false 169: end 170: true 171: end
Return the ProjectServer URI and authKey for the project with project ID projectId.
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 308 308: def getProject(projectId) 309: # Find the project with the ID args[0]. 310: project = nil 311: @projects.synchronize do 312: @projects.each do |p| 313: project = p if p.id == projectId && p.state == :ready 314: end 315: end 316: 317: if project.nil? 318: @log.debug("No project with ID #{projectId} found") 319: return [ nil, nil ] 320: end 321: [ project.uri, project.authKey ] 322: end
Return a list of IDs of projects that are in state :ready.
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 338 338: def getProjectList 339: list = [] 340: @projects.synchronize do 341: @projects.each do |project| 342: list << project.id if project.state == :ready 343: end 344: end 345: list 346: end
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 279 279: def removeProject(indexOrId) 280: @projects.synchronize do 281: # Find all projects with the IDs in indexOrId and mark them as :obsolete. 282: if /^[0-9]$/.match(indexOrId) 283: index = indexOrId.to_i - 1 284: if index >= 0 && index < @projects.length 285: # If we have marked the project as obsolete, we return false to 286: # indicate the double remove. 287: return false if p.state == :obsolete 288: @projects[index].state = :obsolete 289: return true 290: end 291: else 292: @projects.each do |p| 293: if indexOrId == p.id 294: # If we have marked the project as obsolete, we return false to 295: # indicate the double remove. 296: return false if p.state == :obsolete 297: p.state = :obsolete 298: return true 299: end 300: end 301: end 302: end 303: false 304: end
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 348 348: def report(projectId, reportId) 349: uri, key = getProject(projectId) 350: end
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 82 82: def start 83: # To ensure a certain level of security, the user must provide an 84: # authentication key to authenticate the client to this server. 85: unless @authKey 86: @log.fatal(You must set an authentication key in the configuration file. Create a filenamed .taskjugglerrc or taskjuggler.rc that contains at least the followinglines. Replace 'your_secret_key' with some random character sequence._global: authKey: your_secret_key 87: ) 88: end 89: 90: # In daemon mode, we fork twice and only the 2nd child continues here. 91: super() 92: @log.debug("Starting project broker") 93: 94: if @enableWebServer 95: begin 96: # The web server must be started before we turn SAFE mode on. 97: @webServer = WebServer.new(self, @webServerPort) 98: @log.info("TaskJuggler web server is listening on port " + 99: "#{@webServerPort}") 100: rescue 101: @log.fatal("Cannot start web server: #{$!}") 102: end 103: end 104: 105: # Setup a DRb server to handle the incomming requests from the clients. 106: brokerIface = ProjectBrokerIface.new(self) 107: begin 108: $SAFE = 1 109: DRb.install_acl(ACL.new(] deny all 110: allow 127.0.0.1 ])) 111: @uri = DRb.start_service("druby://127.0.0.1:#{@port}", brokerIface).uri 112: @log.info("TaskJuggler daemon is listening on #{@uri}") 113: rescue 114: @log.fatal("Cannot listen on port #{@port}: #{$!}") 115: end 116: 117: if @port == 0 && @uriFile 118: # If the port is set to 0 (any port) we save the ProjectBroker URI in 119: # the file .tj3d.uri. tj3client will look for it. 120: begin 121: File.open(@uriFile, 'w') { |f| f.write @uri } 122: rescue 123: @log.fatal("Cannot write URI file #{@uriFile}: #{$!}") 124: end 125: end 126: 127: # If project files were specified on the command line, we add them here. 128: i = 0 129: @projectFiles.each do |project| 130: @projectsToLoad.push(project) 131: end 132: 133: # Start a Thread that waits for the @terminate flag to be set and does 134: # some other work asynchronously. 135: startHousekeeping 136: 137: # Cleanup the DRb threads 138: DRb.thread.join 139: 140: # If we have created a URI file, we need to delete it again. 141: if @port == 0 && @uriFile 142: begin 143: File.delete(@uriFile) 144: rescue 145: @log.fatal("Cannot delete URI file .tj3d.uri: #{$!}") 146: end 147: end 148: 149: @log.info('TaskJuggler daemon terminated') 150: end
Generate a table with information about the loaded projects.
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 194 194: def status 195: if @projects.empty? 196: "No projects registered\n" 197: else 198: format = " %3s | %-25s | %-14s | %s | %-20s\n" 199: out = sprintf(format, 'No.', 'Project ID', 'Status', 'M', 200: 'Loaded since') 201: out += " #{'-' * 4}+#{'-' * 27}+#{'-' * 16}+#{'-' * 3}+#{'-' * 20}\n" 202: @projects.synchronize do 203: i = 0 204: @projects.each do |project| 205: out += project.to_s(format, i += 1) 206: end 207: end 208: out 209: end 210: end
This command will initiate the termination of the daemon.
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 174 174: def stop 175: @log.debug('Terminating on client request') 176: 177: # Shut down the web server if we've started one. 178: if @webServer 179: @webServer.stop 180: end 181: 182: # Send termination signal to all ProjectServer instances 183: @projects.synchronize do 184: @projects.each { |p| p.terminateServer } 185: end 186: 187: # Setting the @terminate flag to true will case the terminator Thread to 188: # call DRb.stop_service 189: @terminate = true 190: super 191: end
Reload all projects that have modified files and are not already being reloaded.
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 326 326: def update 327: @projects.synchronize do 328: @projects.each do |project| 329: if project.modified && !project.reloading 330: project.reloading = true 331: @projectsToLoad.push(project.files) 332: end 333: end 334: end 335: end
This is a callback from the ProjectServer process. It’s used to update the current state of the ProjectServer in the ProjectRecord list. projectKey is the authentication key for that project. It is used to idenfity the entry in the ProjectRecord list to be updated.
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 356 356: def updateState(projectKey, filesOrId, state, modified) 357: result = false 358: if filesOrId.is_a?(Array) 359: files = filesOrId 360: # Use the name of the master files for now. 361: id = files[1] 362: elsif filesOrId.is_a?(String) 363: id = filesOrId 364: files = nil 365: else 366: id = files = nil 367: end 368: 369: @projects.synchronize do 370: @projects.each do |project| 371: # Don't accept updates for already obsolete entries. 372: next if project.state == :obsolete 373: 374: @log.debug("Updating state for #{id} to #{state}") 375: # Only update the record that has the matching key 376: if project.authKey == projectKey 377: project.id = id if id 378: # An Array of [ workingDir, tjpFile, ... other tji files ] 379: project.files = files if files 380: 381: # If the state is being changed from something to :ready, this is 382: # now the current project for the project ID. 383: if state == :ready && project.state != :ready 384: # Mark other project records with same project ID as obsolete 385: @projects.each do |p| 386: if p != project && p.id == id 387: p.state = :obsolete 388: @log.debug("Marking entry with ID #{id} as obsolete") 389: end 390: end 391: project.readySince = TjTime.new 392: end 393: 394: # Failed ProjectServers are terminated automatically. We can't 395: # reach them any more. 396: project.uri = nil if state == :failed 397: 398: project.state = state 399: project.modified = modified 400: result = true 401: break 402: end 403: end 404: end 405: 406: result 407: end
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 473 473: def loadProject(tagOrProject) 474: if tagOrProject.is_a?(Array) 475: tag = rand(9999999999999) 476: project = tagOrProject 477: # The 2nd element of the Array is the *.tjp file name. 478: @log.debug("Loading project #{tagOrProject[1]} with tag #{tag}") 479: else 480: tag = tagOrProject 481: project = nil 482: @log.debug("Loading project for tag #{tag}") 483: end 484: pr = ProjectRecord.new(tag) 485: ps = ProjectServer.new(@authKey, project, @logStdIO) 486: # The ProjectServer can be reached via this DRb URI 487: pr.uri = ps.uri 488: # Method calls must be authenticated with this key 489: pr.authKey = ps.authKey 490: 491: # Add the ProjectRecord to the @projects list 492: @projects.synchronize do 493: @projects << pr 494: end 495: end
# File lib/taskjuggler/daemon/ProjectBroker.rb, line 411 411: def startHousekeeping 412: Thread.new do 413: begin 414: cntr = 0 415: loop do 416: if @terminate 417: # Give the caller a chance to properly terminate the connection. 418: sleep 0.5 419: @log.debug('Shutting down DRb server') 420: DRb.stop_service 421: break 422: elsif !@projectsToLoad.empty? 423: loadProject(@projectsToLoad.pop) 424: else 425: # Send termination command to all obsolute ProjectServer 426: # objects. To minimize the locking of @projects we collect the 427: # obsolete items first. 428: termList = [] 429: @projects.synchronize do 430: @projects.each do |p| 431: if p.state == :obsolete 432: termList << p 433: elsif p.state == :failed 434: # Start removal of entries that didn't parse. 435: p.state = :obsolete 436: end 437: end 438: end 439: # And then send them a termination command. 440: termList.each { |p| p.terminateServer } 441: 442: # Check every 10 seconds that the ProjectServer processes are 443: # still alive. If not, remove them from the list. 444: if (cntr += 1) > 10 445: @projects.synchronize do 446: @projects.each do |p| 447: unless p.ping 448: termList << p unless termList.include?(p) 449: end 450: end 451: end 452: cntr = 0 453: end 454: 455: # The housekeeping thread rarely needs to so something. Make 456: # sure it's sleeping most of the time. 457: sleep 1 458: 459: # Remove the obsolete records from the @projects list. 460: @projects.synchronize do 461: @projects.delete_if { |p| termList.include?(p) } 462: end 463: end 464: end 465: rescue 466: $stderr.print $!.to_s 467: $stderr.print $!.backtrace.join("\n") 468: @log.fatal("ProjectBroker housekeeping error: #{$!}") 469: end 470: end 471: end
Disabled; run with --debug to generate this.
Generated with the Darkfish Rdoc Generator 1.1.6.