# Wukong-Storm

The Hadoop plugin for Wukong lets you run <a
href="http://github.com/infochimps-labs/wukong/tree/3.0.0">Wukong</a>
processors and dataflows as <a
href="https://github.com/nathanmarz/storm">Storm</a> topologies reading data in and out from <a href="http://kafka.apache.org/">Kafka</a>.

Before you use Wukong-Storm to develop, test, and write your Hadoop
jobs, you might want to read about <a
href="http://github.com/infochimps-labs/wukong/tree/3.0.0">Wukong</a>,
write some <a
href="http://github.com/infochimps-labs/wukong/tree/3.0.0#writing-simple-processors">simple
processors</a>, and read about some of Storm's <a
href="https://github.com/nathanmarz/storm/wiki/Concepts">core
concepts</a>.

You might also want to check out some other projects which enrich the
Wukong and Hadoop experience:

* <a href="http://github.com/infochimps-labs/wukong-hadoop">wukong-hadoop</a>: Run Wukong processors and dataflows as mappers and/or reducers within the Hadoop framework.  Model jobs locally before you run them.
* <a href="http://github.com/infochimps-labs/wukong-load">wukong-load</a>: Load the output data from your local Wukong jobs and flows into a variety of different data stores.
* <a href="http://github.com/infochimps-labs/wukong-deploy">wukong-deploy</a>: Orchestrate Wukong and other wu-tools together to support an application running on the Infochimps Platform.

<a name="installation"></a>
## Installation & Setup

Wukong-Storm can be installed as a RubyGem:

```
$ sudo gem install wukong-storm
```

If you actually want to run your dataflows as functioning Storm
topologies reading/writing to/from Kafka, you'll of course need access
to Storm and Kafka installations.  <a
href="http://github.com/infochimps-labs/ironfan">Ironfan</a> is a
great tool for building and managing Storm clusters and other
distributed infrastructure quickly and easily.

To run Storm jobs through Wukong-Storm, you'll need to move your your
Wukong code to each worker of the Storm cluster, install Wukong-Storm
on each, and log in and launch your job fron one of them.  Ironfan
again helps with configuring this.

<a name="anatomy"></a>
## Anatomy of a running topology

Storm defines the concept of a **topology**.  A topology contains
spouts and bolts.  A **spout** is a source of data.  A **bolt**
processes data.  Bolts can be connected to each other and to spouts in
arbitrary ways.

Tooplogies submitted to Storm's Nimbus but run within a Storm
supervisor.  Each supervisor can dedicate a certain number of
**workers** to a topology. Within each worker, **parallelism**
controls the number of threads the worker assigns to the topology.

Wukong-Storm runs each Wukong dataflow as a single bolt within a
single topology.  Data is passed to this bolt over STDIN and collected
over STDOUT, similar to the way <a
href="http://hadoop.apache.org/docs/r0.15.2/streaming.html">Hadoop
streaming </a> operates.

This topology is hooked up to a
`storm.kafka.trident.OpaqueTridentKafkaSpout` (part of
[storm-contrib](https://github.com/nathanmarz/storm-contrib)) which
reads from a single input topic within Kafka.

Output records are written to a default Kafka topic but this can be
overridden on a per-record basis.

<a name="protocol"></a>
### Communication protocol

A Wukong dataflow launched within Storm runs as a single bolt (see
[`com.infochimps.wukong.storm.SubprocessFunction`](https://github.com/infochimps-labs/wukong-storm/blob/master/src/main/java/com/infochimps/wukong/storm/SubprocessFunction.java)).
This bolt works by launching an arbitrary command-line and sending it
records over STDIN and reading its output over STDOUT.  The
`SubprocessFunction` class expects whatever command it launched to
obey a protocol under which the output after **each** input consists
of each output record followed by a newline, with the full batch of
output records followed by a batch terminator (default: `---`) then
another newline.

Wukong-Storm comes with a command `wu-bolt` which works very similarly
to `wu-local` but implements this protocol.  Here's an example of
using `wu-bolt` directly with a processor:

```
$ echo 2 | wu-bolt prime_factorizer.rb
2
---
$ echo 12 | wu-bolt prime_factorizer.rb
2
2
3
---
$ echo 19 | wu-bolt prime_factorizer.rb
---
```

Notice that in the last example, the presence of the batch delimiter
after each input record make it easy to tell the difference between
"no output records" and "no output records yet" which, over
STDIN/STDOUT, is rather hard to tell otherwise.

## Running a dataflow

### A simple processor

Assuming you have correctly installed Wukong-Storm, Storm, Kafka,
Zookeeper, &c., and you have defined a simple dataflow (or in this
case, just a single processor) like this:

```ruby
# in upcaser.rb
Wukong.processor(:upcaser) do
  def process line
    yield line.upcase
  end
end
```

Then you can launch it directly into Storm:

```
$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic
```

If a topology named `upcaser` already exists, you'll get an error.
Add the `--rm` flag to first kill the running topology before
launching the new one:

```
$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic --rm
```

The default amount of time to wait for the topology to die is 300
seconds (5 minutes), just like the `storm kill` command (which is used
under the hood).  When debugging a topology in development, it's
helpful to add `--wait=1` to immediately kill the topology.

See exactly what happened behind the scenes by adding the `--dry_run`
flag which will print commands and not execute them:

```
$ wu-storm upcaser.rb --input=some_input_topic --output=some_output_topic --rm --dry_run
```

### A more complicated example

Say you have a dataflow:

```ruby
# in my_flow.rb
Wukong.dataflow(:my_flow) do
  my_parser | does_something | then_something_else | to_json
end
```

You can launch it using a different topology name as well as target
arbitrary locations for your Zookeeper, Kafka, and Storm servers:

```
$ wu-storm my_flow.rb --name=my_flow_attempt_3 --zookeeper_hosts=10.121.121.121,10.122.122.122 --kafka_hosts=10.123.123.123 --nimbus_host=10.124.124.124 --input=some_input_topic --output=some_output_topic
```

### Running non-Wukong or non-Ruby code

You can also use Wukong-Storm as a harness to run non-Wukong or
non-Ruby code.  As long as you can specificy a command-line to run
which supports the [communication protocol](#protocol), then you can
run it with `wu-storm`:

```
$ wu-storm --bolt_command='my_cmd --some-option=value -af -q 3' --input=some_input_topic --output=some_output_topic
```

### Scaling options

Storm provides several options for scaling up or down a topology.
Wukong-Storm makes them accessible at launch time via the following
options:

* `--workers` specify the number of workers (a.k.a. "executors" or "slots") for the topology.  Defaults to 1.
* `--input_parallelism` specify the number of threads within the spout reading from Kafka within each worker.  Defaults to 1.
* `--parallelism` specify the number of threads within the bolt running Wukong code within each worker.  Defaults to 1.