# The Grand Central of code loading... $LOAD_PATH.unshift File.expand_path(File.dirname(__FILE__)) # Common Gems: require 'rubygems' gem 'activerecord' gem 'json' gem 'sinatra' gem 'thin' # Autoloading for all the pieces which may or may not be needed: autoload :ActiveRecord, 'active_record' autoload :Benchmark, 'benchmark' autoload :Digest, 'digest' autoload :ERB, 'erb' autoload :FileUtils, 'fileutils' autoload :JSON, 'json' autoload :RightAws, 'right_aws' autoload :CloudFiles, 'cloudfiles' autoload :Sinatra, 'sinatra' autoload :Thin, 'thin' autoload :YAML, 'yaml' # Common code which should really be required in every circumstance. require 'socket' require 'net/http' require 'cloud_crowd/exceptions' require 'rest_client' require 'active_model_serializers' ActiveModel::Serializer.root = false module CloudCrowd # Autoload all the CloudCrowd internals. autoload :Action, 'cloud_crowd/action' autoload :AssetStore, 'cloud_crowd/asset_store' autoload :CommandLine, 'cloud_crowd/command_line' autoload :Helpers, 'cloud_crowd/helpers' autoload :Inflector, 'cloud_crowd/inflector' autoload :Job, 'cloud_crowd/models' autoload :Node, 'cloud_crowd/node' autoload :NodeRecord, 'cloud_crowd/models' autoload :Server, 'cloud_crowd/server' autoload :Worker, 'cloud_crowd/worker' autoload :WorkUnit, 'cloud_crowd/models' # Keep this version in sync with the gemspec. VERSION = '0.7.2' # Increment the schema version when there's a backwards incompatible change. SCHEMA_VERSION = 4 # Root directory of the CloudCrowd gem. ROOT = File.expand_path(File.dirname(__FILE__) + '/..') # Default folder to log daemonized servers and nodes into. LOG_PATH = 'log' # Default folder to contain the pids of daemonized servers and nodes. PID_PATH = 'tmp/pids' # Minimum number of attempts per work unit. MIN_RETRIES = 1 # A Job is processing if its WorkUnits are in the queue to be handled by nodes. PROCESSING = 1 # A Job has succeeded if all of its WorkUnits have finished successfully. SUCCEEDED = 2 # A Job has failed if even a single one of its WorkUnits has failed (they may # be attempted multiple times on failure, however). FAILED = 3 # A Job is splitting if it's in the process of dividing its inputs up into # multiple WorkUnits. SPLITTING = 4 # A Job is merging if it's busy collecting all of its successful WorkUnits # back together into the final result. MERGING = 5 # A Job is considered to be complete if it succeeded or if it failed. COMPLETE = [SUCCEEDED, FAILED] # A Job is considered incomplete if it's being processed, split up or merged. INCOMPLETE = [PROCESSING, SPLITTING, MERGING] # Mapping of statuses to their display strings. DISPLAY_STATUS_MAP = ['unknown', 'processing', 'succeeded', 'failed', 'splitting', 'merging'] class << self attr_reader :config attr_accessor :identity # Configure CloudCrowd by passing in the path to config.yml. def configure(config_path) @config_path = File.expand_path(File.dirname(config_path)) @config = YAML.load(ERB.new(File.read(config_path)).result) @config[:work_unit_retries] ||= MIN_RETRIES if @config[:actions_path] path = Pathname.new( @config[:actions_path] ).realpath $LOAD_PATH.unshift( path ) unless $LOAD_PATH.include?( path ) end end # Configure the CloudCrowd central database (and connect to it), by passing # in a path to database.yml. The file should use the standard # ActiveRecord connection format. def configure_database(config_path, validate_schema=true) configuration = YAML.load(ERB.new(File.read(config_path)).result) ActiveRecord::Base.establish_connection(configuration) if validate_schema version = ActiveRecord::Base.connection.select_values('select max(version) from schema_migrations').first.to_i return true if version == SCHEMA_VERSION puts "Your database schema is out of date. Please use `crowd load_schema` to update it. This will wipe all the tables, so make sure that your jobs have a chance to finish first.\nexiting..." exit end end # Starts a new thread with a ActiveRecord connection_pool # and yields for peforming work inside the blocks def defer Thread.new do ActiveRecord::Base.connection_pool.with_connection do yield end end end # Get a reference to the central server, including authentication if # configured. def central_server @central_server ||= RestClient::Resource.new(CloudCrowd.config[:central_server], CloudCrowd.client_options) end # The path that daemonized servers and nodes will log to. def log_path(log_file=nil) @log_path ||= config[:log_path] || LOG_PATH log_file ? File.join(@log_path, log_file) : @log_path end # The path in which daemonized servers and nodes will store their pids. def pid_path(pid_file=nil) @pid_path ||= config[:pid_path] || PID_PATH pid_file ? File.join(@pid_path, pid_file) : @pid_path end # The standard RestClient options for the central server talking to nodes, # as well as the other way around. There's a timeout of 5 seconds to open # a connection, and a timeout of 30 to finish reading it. def client_options return @client_options if @client_options @client_options = { :timeout => (self.server? ? config[:node_timeout] : config[:server_timeout]) || 30, :open_timeout => config[:open_timeout] || 5 } if CloudCrowd.config[:http_authentication] @client_options[:user] = CloudCrowd.config[:login] @client_options[:password] = CloudCrowd.config[:password] end @client_options end # Return the displayable status name of an internal CloudCrowd status number. # (See the above constants). def display_status(status) DISPLAY_STATUS_MAP[status] || 'unknown' end # CloudCrowd::Actions are requested dynamically by name. Access them through # this actions property, which behaves like a hash. At load time, we # load all installed Actions and CloudCrowd's default Actions into it. # If you wish to have certain nodes be specialized to only handle certain # Actions, then install only those into the actions directory. def actions return @actions if @actions @actions = action_paths.inject({}) do |memo, path| path = Pathname.new(path) require path.relative? ? path.basename : path name = path.basename('.*').to_s memo[name] = Module.const_get( Inflector.camelize( name ) ) memo end rescue NameError => e adjusted_message = "One of your actions failed to load. Please ensure that the name of your action class can be deduced from the name of the file. ex: 'word_count.rb' => 'WordCount'\n#{e.message}" raise NameError.new(adjusted_message, e.name) end # Retrieve the list of every installed Action for this node or server. def action_paths default_actions = config[:disable_default_actions] ? [] : Dir["#{ROOT}/actions/*.rb"] installed_actions = Dir["#{@config_path}/actions/*.rb"] custom_actions = CloudCrowd.config[:actions_path] ? Dir["#{CloudCrowd.config[:actions_path]}/*.rb"] : [] default_actions + installed_actions + custom_actions end # Is this CloudCrowd instance a server? Useful for avoiding loading unneeded # code from actions. def server? @identity == :server end # Or is it a node? def node? @identity == :node end end end