README.md in kraps-0.5.0 vs README.md in kraps-0.6.0
- old
+ new
@@ -1,15 +1,16 @@
# Kraps
**Easily process big data in ruby**
Kraps allows to process and perform calculations on very large datasets in
-parallel using a map/reduce framework and runs on a background job framework
-you already have. You just need some space on your filesystem, S3 as a storage
-layer with temporary lifecycle policy enabled, the already mentioned background
-job framework (like sidekiq, shoryuken, etc) and redis to keep track of the
-progress. Most things you most likely already have in place anyways.
+parallel using a map/reduce framework similar to [spark](https://spark.apache.org/),
+but runs on a background job framework you already have. You just need some
+space on your filesystem, S3 as a storage layer with temporary lifecycle policy
+enabled, the already mentioned background job framework (like sidekiq,
+shoryuken, etc) and redis to keep track of the progress. Most things you most
+likely already have in place anyways.
## Installation
Install the gem and add to the application's Gemfile by executing:
@@ -113,26 +114,25 @@
memory and your background framework spawns 5 threads. Theoretically, you might
be able to give 300-400 megabytes to Kraps then, but now divide this by 10 and
specify a `memory_limit` of around `30.megabytes`, better less. The
`memory_limit` affects how much chunks will be written to disk depending on the
data size you are processing and how big these chunks are. The smaller the
-value, the more chunks and the more chunks, the more runs Kraps need to merge
-the chunks. It can affect the performance The `chunk_limit` ensures that only
-the specified amount of chunks are processed in a single run. A run basically
-means: it takes up to `chunk_limit` chunks, reduces them and pushes the result
-as a new chunk to the list of chunks to process. Thus, if your number of file
-descriptors is unlimited, you want to set it to a higher number to avoid the
-overhead of multiple runs. `concurrency` tells Kraps how much threads to use to
+value, the more chunks. The more chunks, the more runs Kraps need to merge
+the chunks. The `chunk_limit` ensures that only the specified amount of chunks
+are processed in a single run. A run basically means: it takes up to
+`chunk_limit` chunks, reduces them and pushes the result as a new chunk to the
+list of chunks to process. Thus, if your number of file descriptors is
+unlimited, you want to set it to a higher number to avoid the overhead of
+multiple runs. `concurrency` tells Kraps how much threads to use to
concurrently upload/download files from the storage layer. Finally, `retries`
specifies how often Kraps should retry the job step in case of errors. Kraps
will sleep for 5 seconds between those retries. Please note that it's not yet
possible to use the retry mechanism of your background job framework with
Kraps. Please note, however, that `parallelize` is not covered by `retries`
yet, as the block passed to `parallelize` is executed by the runner, not the
workers.
-
Now, executing your job is super easy:
```ruby
Kraps::Runner.new(SearchLogCounter).call(start_date: '2018-01-01', end_date: '2022-01-01')
```
@@ -180,15 +180,15 @@
https://github.com/mrkamel/map-reduce-ruby/#limitations-for-keys
## Storage
Kraps stores temporary results of steps in a storage layer. Currently, only S3
-is supported besides a in memory driver used for testing purposes. Please be
+is supported besides a in-memory driver used for testing purposes. Please be
aware that Kraps does not clean up any files from the storage layer, as it
-would be a safe thing to do in case of errors anyways. Instead, Kraps relies on
-lifecycle features of modern object storage systems. Therefore, it is recommend
-to e.g. configure a lifecycle policy to delete any files after e.g. 7 days
+would not be a safe thing to do in case of errors anyways. Instead, Kraps
+relies on lifecycle features of modern object storage systems. Therefore, it is
+required to configure a lifecycle policy to delete any files after e.g. 7 days
either for a whole bucket or for a certain prefix like e.g. `temp/` and tell
Kraps about the prefix to use (e.g. `temp/kraps/`).
```ruby
Kraps::Drivers::S3Driver.new(s3_client: Aws::S3::Client.new("..."), bucket: "some-bucket", prefix: "temp/kraps/"),
@@ -227,10 +227,23 @@
The block gets each key-value pair passed and the `collector` block can be
called as often as neccessary. This is also the reason why `map` can not simply
return the new key-value pair, but the `collector` must be used instead.
+* `map_partitions`: Maps the key value pairs to other key value pairs, but the
+ block receives all data of each partition as an enumerable and sorted by key.
+ Please be aware that you should not call `to_a` or similar on the enumerable.
+ Prefer `map` over `map_partitions` when possible.
+
+```ruby
+job.map_partitions(partitions: 128, partitioner: partitioner, worker: MyKrapsWorker) do |pairs, collector|
+ pairs.each do |key, value|
+ collector.call("changed #{key}", "changed #{value}")
+ end
+end
+```
+
* `reduce`: Reduces the values of pairs having the same key
```ruby
job.reduce(worker: MyKrapsWorker) do |key, value1, value2|
value1 + value2
@@ -243,30 +256,65 @@
The `key` itself is also passed to the block for the case that you need to
customize the reduce calculation according to the value of the key. However,
most of the time, this is not neccessary and the key can simply be ignored.
+* `combine`: Combines the results of 2 jobs by combining every key available
+ in the current job result with the corresponding key from the passed job
+ result. When the passed job result does not have the corresponding key,
+ `nil` will be passed to the block. Keys which are only available in the
+ passed job result are completely omitted.
+
+```ruby
+ job.combine(other_job, worker: MyKrapsWorker) do |key, value1, value2|
+ (value1 || {}).merge(value2 || {})
+ end
+```
+
+Please note that the keys, partitioners and the number of partitions must match
+for the jobs to be combined. Further note that the results of `other_job` must
+be reduced, meaning that every key must be unique. Finally, `other_job` must
+not neccessarily be listed in the array of jobs returned by the `call` method,
+since Kraps detects the dependency on its own.
+
* `repartition`: Used to change the partitioning
```ruby
job.repartition(partitions: 128, partitioner: partitioner, worker: MyKrapsWorker)
```
Repartitions all data into the specified number of partitions and using the
specified partitioner.
* `each_partition`: Passes the partition number and all data of each partition
- as a lazy enumerable
+ as an enumerable and sorted by key. Please be aware that you should not call
+ `to_a` or similar on the enumerable.
```ruby
job.each_partition do |partition, pairs|
pairs.each do |key, value|
# ...
end
end
```
+* `dump`: Store all current data per partition under the specified prefix
+
+```ruby
+job.dump(prefix: "path/to/dump", worker: MyKrapsWorker)
+```
+
+It creates a folder for every partition and stores one or more chunks in there.
+
+* `load`: Loads the previously dumped data
+
+```ruby
+job.load(prefix: "path/to/dump", partitions: 32, partitioner: Kraps::HashPartitioner.new, worker: MyKrapsWorker)
+```
+
+The number of partitions and the partitioner must be specified.
+
Please note that every API method accepts a `before` callable:
```ruby
before_block = proc do
# runs once before the map action in every worker, which can be useful to
@@ -323,9 +371,55 @@
```
When you execute the job, Kraps will execute the jobs one after another and as
the jobs build up on each other, Kraps will execute the steps shared by both
jobs only once.
+
+## Testing
+
+Kraps ships with an in-memory fake driver for storage, which you can use for
+testing purposes instead of the s3 driver:
+
+```ruby Kraps.configure(
+ driver: Kraps::Drivers::FakeDriver.new(bucket: "kraps"),
+ # ...
+) ```
+
+This is of course much faster than using s3 or some s3 compatible service.
+Moreover, when testing large Kraps jobs you maybe want to test intermediate
+steps. You can use `#dump` for this purpose and test that the data dumped is
+correct.
+
+```ruby
+job = job.dump("path/to/dump")
+```
+
+and in your tests do
+
+```ruby
+Kraps.driver.value("path/to/dump/0/chunk.json") # => data of partition 0
+Kraps.driver.value("path/to/dump/1/chunk.json") # => data of partition 1
+# ...
+```
+
+The data is stored in lines, each line is a json encoded array of key and
+value.
+
+```ruby
+data = Kraps.driver.value("path/to/dump/0/chunk.json).lines.map do |line|
+ JSON.parse(line) # => [key, value]
+end
+```
+
+The API of the driver is:
+
+* `store(name, data_or_ui, options = {})`: Stores `data_or_io` as `name`
+* `list(prefix: nil)`: Lists all objects or all objects matching the `prefix`
+* `value(name)`: Returns the object content of `name`
+* `download(name, path)`: Downloads the object `name` to `path` in your
+ filesystem
+* `exists?(name)`: Returns `true`/`false`
+* `flush`: Removes all objects from the fake storage
## Dependencies
Kraps is built on top of
[map-reduce-ruby](https://github.com/mrkamel/map-reduce-ruby) for the