README.md in phobos-1.8.1 vs README.md in phobos-1.8.2.pre.beta1

- old
+ new

@@ -1,12 +1,10 @@ ![Phobos](https://raw.githubusercontent.com/klarna/phobos/master/logo.png) -[![Build Status](https://travis-ci.org/klarna/phobos.svg?branch=master)](https://travis-ci.org/klarna/phobos) -[![Maintainability](https://api.codeclimate.com/v1/badges/2d00845fc6e7e83df6e7/maintainability)](https://codeclimate.com/github/klarna/phobos/maintainability) -[![Test Coverage](https://api.codeclimate.com/v1/badges/2d00845fc6e7e83df6e7/test_coverage)](https://codeclimate.com/github/klarna/phobos/test_coverage) -[![Depfu](https://badges.depfu.com/badges/57da3d5ff1da449cf8739cfe30b8d2f8/count.svg)](https://depfu.com/github/klarna/phobos?project=Bundler) -[![Chat with us on Discord](https://discordapp.com/api/guilds/379938130326847488/widget.png)](https://discord.gg/rfMUBVD) +[![Build Status](https://travis-ci.com/phobos/phobos.svg?branch=master)](https://travis-ci.com/phobos/phobos) +[![Maintainability](https://api.codeclimate.com/v1/badges/e3814d747c91247b24c6/maintainability)](https://codeclimate.com/github/phobos/phobos/maintainability) +[![Test Coverage](https://api.codeclimate.com/v1/badges/e3814d747c91247b24c6/test_coverage)](https://codeclimate.com/github/phobos/phobos/test_coverage) # Phobos Simplifying Kafka for Ruby apps! @@ -212,11 +210,54 @@ `.start` -> `#consume` -> `.stop` or optionally, `.start` -> `#before_consume` -> `#around_consume` [ `#consume` ] -> `.stop` + +#### Batch Consumption +In addition to the regular handler, Phobos provides a `BatchHandler`. The +basic ideas are identical, except that instead of being passed a single message +at a time, the `BatchHandler` is passed a batch of messages. All methods +follow the same pattern as the regular handler except that they each +end in `_batch` and are passed an array of `Phobos::BatchMessage`s instead +of a single payload. + +To enable handling of batches on the consumer side, you must specify +a delivery method of `inline_batch` in [phobos.yml](config/phobos.yml.example), +and your handler must include `BatchHandler`. Using a delivery method of `batch` +assumes that you are still processing the messages one at a time and should +use `Handler`. + +```ruby +class MyBatchHandler + includes Phobos::BatchHandler + + def before_consume_batch(payloads, metadata) + payloads.each do |p| + p.payload[:timestamp] = Time.zone.now + end + end + + def around_consume_batch(payloads, metadata) + yield + end + + def consume_batch(payloads, metadata) + payloads.each do |p| + logger.info("Got payload #{p.payload}, #{p.partition}, #{p.offset}, #{p.key}, #{p.payload[:timestamp]}") + end + end + +end +``` + +Note that retry logic will happen on the *batch* level in this case. If you are +processing messages individually and an error happens in the middle, Phobos's +retry logic will retry the entire batch. If this is not the behavior you want, +consider using `batch` instead of `inline_batch`. + ### <a name="usage-producing-messages-to-kafka"></a> Producing messages to Kafka `ruby-kafka` provides several options for publishing messages, Phobos offers them through the module `Phobos::Producer`. It is possible to turn any ruby class into a producer (including your handlers), just include the producer module, example: ```ruby @@ -434,47 +475,69 @@ * handler * batch_size * partition * offset_lag * highwater_mark_offset - * `listener.process_message` is sent after process a message. It includes the following payload: + * `listener.process_message` is sent after processing a message. It includes the following payload: * listener_id * group_id * topic * handler * key * partition * offset * retry_count - * `listener.retry_handler_error` is sent after waited for `handler#consume` retry. It includes the following payload: + * `listener.process_batch_inline` is sent after processing a batch with `batch_inline` mode. It includes the following payload: * listener_id * group_id * topic * handler + * batch_size + * partition + * offset_lag + * retry_count + * `listener.retry_handler_error` is sent after waiting for `handler#consume` retry. It includes the following payload: + * listener_id + * group_id + * topic + * handler * key * partition * offset * retry_count * waiting_time * exception_class * exception_message * backtrace + * `listener.retry_handler_error_batch` is sent after waiting for `handler#consume_batch` retry. It includes the following payload: + * listener_id + * group_id + * topic + * handler + * batch_size + * partition + * offset_lag + * retry_count + * waiting_time + * exception_class + * exception_message + * backtrace * `listener.retry_aborted` is sent after waiting for a retry but the listener was stopped before the retry happened. It includes the following payload: * listener_id * group_id * topic * handler - * `listener.stopping` is sent when the listener receives signal to stop + * `listener.stopping` is sent when the listener receives signal to stop. * listener_id * group_id * topic * handler - * `listener.stop_handler` is sent after stopping the handler + * `listener.stop_handler` is sent after stopping the handler. * listener_id * group_id * topic * handler - * `listener.stop` is send after stopping the listener + * `listener.stop` is send after stopping the listener. * listener_id * group_id * topic * handler