RFlow starts read in DB create new shards - Create a set of workers with the shard configuration - each worker creates a set of components - create components RFlow Manager Components Input Ports Output Ports Connections Input Ports Output Ports rflow figure out a work directory make sure that it has the right subdirectories (can be overridden) run tmp logs schemas components Set up logging to logs/rflow.log Load all schemas Verify all component installation Initialize components Start components running and make sure that they "daemonize" correctly - place pid files in deployment's run directory Configure components via zmq Daemonize self class Component def self.input_port end def self.output_port end attr_accessor :state def initialize(config, run_directory) end def run end def configure end class PassThrough < Component input_port [:in] input_port :another_in output_port :out output_port :another_out def initialize(config, run_directory) # This will initialize the ports super # Do stuff to initialize component end end Computation Requirements: Initial startup with: - management bus connection information - group and instance UUID - beacon interval - run directory, containing - PID files - log dir + logs - computation-specific configuration (conf dir) Needs to process the following messages from mgmt bus: - CONFIGURE (ports) - RUN - SHUTDOWN Needs to send the following messages to mgmt bus: - LOG - BEACON (state machine of the below submessages) - STARTED - CONFIGURED - RUNNING - STOPPING - STOPPED - ERROR On startup: - listen to mgmt bus - publish BEACON + state to mgmt bus every (beacon interval) seconds (default to 1 sec) External Computations: - Given (out-of-band) startup info (mgmt bus, UUIDs, beacon interval) - RFlow - Will need a DB for config - Initial startup will need to resolve all remaining outstanding items (ports, UUIDs, etc) and store in config DB - MVC, Mongrel2-like? Translate - Need to add where name attr can be used in later XML templates ---------------- Plugins: an externally defined plugin needs access to all current data types, as well as being able to define its own and tell the system about that. - necessary to tell system? - need a protocol for defining schema transfer - each message has attached schema lib/rflow/message.rb RFlow::Config RFlow::Management - Somewhere for external people to register new computations with running system - computation says that its running and asks for Connection configuration - how will it specify where in the workflow it wants to run???? RFlow::Message(complete on-the-wire Avro message format) data_type, provenance, external_ids, empty, data (see below) RFlow::Data::(various message data blocks) RFlow::Computation uuid, name, class, input_ports, output_ports RFlow::Connection encapsulates link knowlege and provides an API for sending and receiving each computation will have one for each port each computation will call into the connection to send (possibly via a Port object) and recieve RFlow::Connection::AMQP will manage connections to an AMQP server RFlow::Connection::ZMQ computation_a.output_port -> (connection.incoming -> connection.outgoing) -> computation_b.input_port AMQP::Topic - responsible for setting up a topic -> queue binding r.incoming = amqp connection, channel, vhost, login, password, topic r.outgoing = amqp connection, channel, vhost, login, password, queue name behavior -> n x m, "round-robin" among the connected outgoing incoming behavior will need to set topic/key, uses the data type in the RFlow::Message ZMQ::PubSub - device-less, responsible for assigning ip/port and assigning one client to bind the port r.incoming = zmq connection string (tcp://ip:port), type pub r.outgoing = zmq connection string (tcp://ip:port), type sub behavior -> n x m, broadcast sending, ZMQ::PushPull - device-less, responsible for assigning ip/port and assigning one client to bind the port r.incoming = zmq connection string (tcp://ip:port), type push r.outgoing = zmq connection string (tcp://ip:port), type pull Startup RFlow.run is the management process for the workflow computations = config.computations.map do |c| instantiate_computation(c) # Check for errors here, which would be evident if a computation couldn't be found/created # Just creating single process ruby objects here to check for errors end computations.each do |c| c.configure # with what???? # Still single ruby process to set and deconflict all the configuration parameters end computations.each do |c| c.run end listen_for_management_events_from_old_computations listen_for_new_computation_registration