README.md in rflow-1.0.0a2 vs README.md in rflow-1.0.0a3

- old
+ new

@@ -18,25 +18,38 @@ supports generalized connection types and message serialization, however only two are in current use, namely ZeroMQ connections and Avro serialization. RFlow currently runs as a single-threaded, evented system on top of -[Eventmachine](http://rubyeventmachine.com/), meaning that any code +[EventMachine](http://rubyeventmachine.com/), meaning that any code should be coded in an asynchronous style so as to not block the -Eventmachine reactor (and thus block all the other components). There -is currently work being done to "shard" the workflow among multiple -processes and/or threads. +EventMachine reactor (and thus block all the other components). Use +`EM.defer` and other such patterns, along with EventMachine plugins +for various servers and clients, to work in this style and defer +computation to background threads. +RFlow component workflows may be split into `shards` to improve +parallelism. Each shard is currently represented by a separate process, +though threads may be supported in the future. Multiple copies of a +shard may be instantiated, which will cooperate to round-robin +incoming messages. + Some of the long-term goals of RFlow are to allow for components and portions of the workflow to be defined in any language that supports -Avro and ZeroMQ, which are numerous. +Avro and ZeroMQ, which are numerous. For this reason, the official +specification of an RFlow workflow is a SQLite database containing +information on its components, connections, ports, settings, etc. +There is a Ruby DSL that aids in populating the database but the intent +is that multiple processes and languages could access and manipulate +the database form. ## Developer Notes You will need ZeroMQ preinstalled. Currently, EventMachine only supports v3.2.4, not v4.x, so install that version. Older versions like 2.2 will not -work. +work. (You will probably get errors saying arcane things like +`assertion failed, mailbox.cpp(84)`). ## Definitions * __Component__ - the basic unit of RFlow computation. Each component is a shared-nothing, individual computation module that @@ -48,12 +61,12 @@ Ports can be "keyed" or "indexed" to allow better multiplexing of messages out/in a single port, as well as allow a single port to be accessed by an array. * __Connection__ - a directed link between an output port and an input - port. RFlow supports generalized connection types, however only - ZeroMQ IPC links are currently used. + port. RFlow supports generalized connection types; however, only + ZeroMQ links are currently used. * __Message__ - a bit of serialized data that is sent out an output port and recieved on an input port. Due to the serialization, message types and schemas are explicitly defined. In a departure from "pure" FBP, RFlow supports sending multiple message types via a @@ -61,11 +74,10 @@ * __Workflow__ - the common name for the digraph created when the components (nodes) are wired together via connections to their respective output/input ports. - ## Component Examples The following describes the API of an RFlow component: ```ruby @@ -88,23 +100,23 @@ input and output ports. * `configure!` (called with a hash configuration) is called after the component is instantiated but before the workflow has been wired or any messages have been sent. Note that this is called outside the - Eventmachine reactor. + EventMachine reactor. * `run!` is called after all the components have been wired together with connections and the entire workflow has been created. For a component that is a source of messages, this is where messages will be sent. For example, if the component is reading from a file, this is where the file will be opened, the contents read into a message, and the message sent out the output port. `run!` is called within - the Eventmachine reactor. + the EventMachine reactor. * `process_message` is an evented callback that is called whenever the component receives a message on one of its input ports. - `process_message` is called withing the Eventmachine reactor + `process_message` is called within the EventMachine reactor * `shutdown!` is called when the flow is being terminated, and is meant to allow the components to do penultimate processing and send any final messages. All components in a flow will be told to `shutdown!` before they are told to `cleanup!`. @@ -184,25 +196,24 @@ ```ruby class RFlow::Components::FileOutput < RFlow::Component input_port :in - attr_accessor :output_file_path, :output_file + attr_accessor :output_file_path def configure!(config) self.output_file_path = config['output_file_path'] - self.output_file = File.new output_file_path, 'w+' end def process_message(input_port, input_port_key, connection, message) - output_file.puts message.data.data_object.inspect - output_file.flush + File.open(output_file_path, 'a') do |f| + f.flock(File::LOCK_EX) + f.puts message.data.data_object.inspect + f.flush + f.flock(File::LOCK_UN) + end end - - def cleanup - output_file.close - end end ``` ## RFlow Messages @@ -312,52 +323,54 @@ ## RFlow Workflow Configuration RFlow currently stores its configuration in a SQLite database which are internally accessed via ActiveRecord. Given that SQLite is a rather simple and standard interface, non-RFlow components could -access it and determine their respsective ZMQ connections. +access it and determine their respective ZMQ connections. DB schemas for the configuration database are in [lib/rflow/configuration/migrations](lib/rflow/configuration/migrations) and define the complete workflow configuration. Note that each of the tables uses a UUID primary key, and UUIDs are used within RFlow to identify specific components. * settings - general application settings, such as log levels, app - names, directories, etc + names, directories, etc. +* shards - a list of the shards defined for the workflow, including + UUID, type, and number of workers for the shard + * components - a list of the components including its name, - specification (Ruby class), and options. Note that the options are + specification (Ruby class), shard, and options. Note that the options are serialized to the database as YAML, and components should understand that the round-trip through the database might not be perfect (e.g. Ruby symbols might become strings). A component also has a number of input ports and output ports. * ports - belonging to a component (via `component_uuid` foreign key), - also has a `type` colum for ActiveRecord STI, which gets set to + also has a `type` column for ActiveRecord STI, which gets set to either a `RFlow::Configuration::InputPort` or `RFlow::Configuration::OutputPort`. * connections - a connection between two ports via foriegn keys `input_port_uuid` and `output_port_uuid`. Like ports, connections - are typed via AR STI (`RFlow::Configuration::ZMQConnection` or - `RFlow::Configuration::AMQPConnection`) and have a YAML serialized - `options` hash. A connection also (potentially) defines the port - keys. + are typed via AR STI (`RFlow::Configuration::ZMQConnection` and + 'RFlow::Configuration::BrokeredZMGConnection` are the only + supported values for now) and have a YAML serialized `options` + hash. A connection also (potentially) defines the port keys. RFlow also provides a RubyDSL for configuration-like file to be used to load the database: ```ruby RFlow::Configuration::RubyDSL.configure do |config| # Configure the settings, which include paths for various files, log # levels, and component specific stuffs - config.setting('rflow.log_level', 'DEBUG') - config.setting('rflow.application_directory_path', '../tmp') + config.setting 'rflow.log_level', 'DEBUG' + config.setting 'rflow.application_directory_path', '../tmp' + config.setting 'rflow.application_name', 'testapp' - config.setting('rflow.application_name', 'testapp') - # Instantiate components config.component 'generate_ints1', 'RFlow::Components::GenerateIntegerSequence', { 'start' => 0, 'finish' => 10, 'step' => 3, @@ -384,13 +397,85 @@ config.connect 'filter#out' => 'output1#in' config.connect 'filter#filtered' => 'output2#in' end ``` +## Parallelism + +RFlow supports parallelizing workflows and splitting them into multiple +`shard`s. By default, components declared in the Ruby DSL exist in the +default shard, named `DEFAULT`. There is only one worker for the default +shard. + +ZeroMQ communication between components in the same shard uses ZeroMQ's +`inproc` socket type for maximum performance. ZeroMQ communication between +components in different shards is accomplished with a ZeroMQ `ipc` socket. +In the case of a many-to-many connection (many workers in a producing +shard and many workers in a consuming shard), a ZeroMQ message broker +process is created to route the messages appropriately. Senders round-robin +to receivers and receivers fair-queue the messages from the senders. +Load balancing based on receiver responsiveness is not currently implemented. + +To define a custom shard in the Ruby DSL, use the `shard` method. For +example: + +```ruby +RFlow::Configuration::RubyDSL.configure do |config| + # Configure the settings, which include paths for various files, log + # levels, and component specific stuffs + config.setting 'rflow.log_level', 'DEBUG' + config.setting 'rflow.application_directory_path', '../tmp' + config.setting 'rflow.application_name', 'testapp' + + config.shard 'integers', :process => 2 do |shard| + shard.component 'generate_ints1', 'RFlow::Components::GenerateIntegerSequence', { + 'start' => 0, + 'finish' => 10, + 'step' => 3, + 'interval_seconds' => 1 + } + shard.component 'generate_ints2', 'RFlow::Components::GenerateIntegerSequence', { + 'start' => 20, + 'finish' => 30 + } + end + + # another style of specifying type and count; count defaults to 1 + config.shard 'filters', :type => :process, :count => 1 do |shard| + shard.component 'filter', 'RFlow::Components::RubyProcFilter', { + 'filter_proc_string' => 'lambda {|message| true}' + } + end + + # another way of specifying type + config.process 'filters', :count => 2 do |shard| + shard.component 'output1', 'RFlow::Components::FileOutput', { + 'output_file_path' => '/tmp/out1' + } + end + + # this component will be created in the DEFAULT shard + config.component 'output2', 'RFlow::Components::FileOutput', { + 'output_file_path' => '/tmp/out2' + } + + # Wire components together + config.connect 'generate_ints1#out' => 'filter#in' + config.connect 'generate_ints2#out' => 'filter#in' + config.connect 'filter#filtered' => 'replicate#in' + config.connect 'filter#out' => 'output1#in' + config.connect 'filter#filtered' => 'output2#in' +end +``` + +At runtime, shards with no components defined will have no workers and +will not be started. (So, if you put all components in a custom shard, +no `DEFAULT` workers will be seen.) + ## Command-Line Operation RFlow includes the `rflow` binary that can load a database from a Ruby -DSL, as well as start/stop the wokflow application as a daemon. +DSL, as well as start/stop the workflow application as a daemon. Invoking the `rflow` binary without any options will give a brief help: ``` Usage: rflow [options] (start|stop|status|load) -d, --database DB Config database (sqlite) path (GENERALLY REQUIRED)