# Tobox: Transactional outbox pattern implementation in ruby
[![Gem Version](https://badge.fury.io/rb/tobox.svg)](http://rubygems.org/gems/tobox)
[![pipeline status](https://gitlab.com/os85/tobox/badges/master/pipeline.svg)](https://gitlab.com/os85/tobox/pipelines?page=1&scope=all&ref=master)
[![coverage report](https://gitlab.com/os85/tobox/badges/master/coverage.svg?job=coverage)](https://os85.gitlab.io/tobox/#_AllFiles)
Simple, data-first events processing framework based on the [transactional outbox pattern](https://microservices.io/patterns/data/transactional-outbox.html).
- [Requirements](#requirements)
- [Installation](#installation)
- [Usage](#usage)
- [Configuration](#configuration)
- [Event](#event)
- [Plugins](#plugins)
- [Progress](#progress)
- [Event Grouping](#event-grouping)
- [Inbox](#inbox)
- [Zeitwerk](#zeitwerk)
- [Sentry](#sentry)
- [Datadog](#datadog)
- [Stats](#stats)
- [PG Notify](#pg-notify)
- [Advanced](#advanced)
- [Batch Events Handling](#batch-events)
- [Recommendations](#recommendations)
- [Supported Rubies](#supported-rubies)
- [Rails support](#rails-support)
- [Why?](#why)
- [Development](#development)
- [Contributing](#contributing)
## Requirements
`tobox` requires integration with RDBMS which supports `SKIP LOCKED` functionality. As of today, that's:
* PostgreSQL 9.5+
* MySQL 8+
* Oracle
* Microsoft SQL Server
## Installation
Add this line to your application's Gemfile:
```ruby
gem "tobox"
# You'll also need to add the right database client gem for the target RDBMS
# ex, for postgresql:
#
# gem "pg"
# see more: http://sequel.jeremyevans.net/rdoc/files/doc/opening_databases_rdoc.html
```
And then execute:
$ bundle install
Or install it yourself as:
$ gem install tobox
## Usage
1. create the `outbox` table in your application's database:
```ruby
# example migration using sequel
Sequel.migration do
up do
create_table(:outbox) do
primary_key :id
column :type, :varchar, null: false
column :data_before, :json, null: true
column :data_after, :json, null: true
column :created_at, "timestamp without time zone", null: false, default: Sequel::CURRENT_TIMESTAMP
column :attempts, :integer, null: false, default: 0
column :run_at, "timestamp without time zone", null: true
column :last_error, :text, null: true
column :metadata, :json, null: true
index Sequel.desc(:run_at)
end
end
down do
drop_table(:outbox)
end
end
```
2. create a `tobox.rb` config file in your project directory tree:
```ruby
# tobox
database Sequel.connect("postgres://user:pass@dbhost/database")
# table :outbox
# concurrency 8
on("user_created") do |event|
puts "created user #{event[:after]["id"]}"
DataLakeService.user_created(event)
BillingService.bill_user_account(event)
end
on("user_updated") do |event|
# ...
end
on("user_created", "user_updated") do |event|
# ...
end
```
3. Start the `tobox` process
```bash
> bundle exec tobox -C path/to/tobox.rb -r path/to/file_requiring_application_code.rb
```
There is no API for event production yet (still TODO). It's recommended you write directly into the "outbox" table via database triggers (i.e. *insert into users table -> add user_created event"). Alternatively you can use `sequel` directly (`DB[:outbox].insert(...)`).
4. Emit outbox events
Currently, `tobox` only deals with outbox events consumption. When it comes to producing, you can do it yourself. There essentially two alternatives:
4.1 Emit from application code
If you're using `sequel` as your ORM, you can use the dataset API:
```ruby
# Assuming DB points to your `Sequel::Database`, and defaults are used:
order = Order.new(
item_id: item.id,
price: 20_20,
currency: "EUR"
)
DB.transaction do
order.save
DB[:outbox].insert(event_type: "order_created", data_after: order.to_hash)
end
```
4.2 Emit from database trigger
This is how it could be done in PostgreSQL using trigger functions:
```sql
CREATE OR REPLACE FUNCTION order_created_outbox_event()
RETURNS TRIGGER
LANGUAGE PLPGSQL
AS
$$
BEGIN
INSERT INTO outbox(event_type, data_after)
VALUES('order_created', row_to_json(NEW.*));
RETURN NEW;
END;
$$
CREATE TRIGGER order_created_outbox_event
AFTER INSERT
ON orders
FOR EACH ROW
EXECUTE PROCEDURE order_created_outbox_event();
```
## Configuration
As mentioned above, configuration can be set in a particular file. The following options are configurable:
### `environment`
Sets the application environment (either "development" or "production"). Can be set directly, or via `APP_ENV` environment variable (defaults to "development").
### `database_uri`
Accepts a URI pointing to a database, [where scheme identifies the database adapter to be used](https://sequel.jeremyevans.net/rdoc/files/doc/opening_databases_rdoc.html):
```ruby
database_uri `"postgres://user:password@localhost/blog"`.
```
### `database_options`
Accepts an hash of options, [which are directly passed to Sequel.connect](https://sequel.jeremyevans.net/rdoc/files/doc/opening_databases_rdoc.html):
```ruby
database_options after_connect: -> (conn) { puts conn }
```
### `table`
the name of the database table where outbox events are stored (`:outbox` by default).
```ruby
table :outbox
```
### `visibility_column`
the name of the database column used to mark an event as invisible while being handled (`:run_at` by default).
The column type MUST be either a datetime (or timestamp, depending of your database) or a boolean (if your database supports it, MySQL doesn't for example).
If it's a datetime/timestamp column, this value will be used, along with the `visibility timeout` option, to mark the event as invisible for the given duration; this will ensure that the event will be picked up again in case of a crash eventually, in case of non-transactional event handling (via the `:progress` plugin); if it's a boolean column, the event is marked as invisible indefinitely, so in case of a crash, you'll need to recover it manually.
```ruby
visibility_column :run_at
```
### `attempts_column`
the name of the database column where the number of times an event was handled and failed (`:attempts` by default). If `nil`, events will be retried indefinitely.
### `created_at_column`
the name of the database column where the event creation timestamp is stored (`:created_at` by default).
When creating the outbox table, you're **recommended** to set this column default to `CURRENT_TIMESTAMP` (or the equivalent in your database), instead of passing it manually in the corresponding `INSERT` statements.
### `max_attempts`
Maximum number of times a failed attempt to process an event will be retried (`10` by default).
```ruby
max_attempts 4
```
**Note**: the new attempt will be retried in `n ** 4`, where `n` is the number of past attempts for that event.
### `exponential_retry_factor`
Factor by which the number of seconds until an event can be retried will be exponentially calculated, i.e. 2 seconds on first attempt, then 4, then 8, then 16 (`2` by default).
```ruby
exponential_retry_factor 2
```
**Note**: the new attempt will be retried in `n ** 4`, where `n` is the number of past attempts for that event.
### `batch_size`
Number of events fetched in each outbox loop.
```ruby
batch_size 10
```
**Note**: event handlers will receive all events covered by the given callback in its arguments:
```ruby
on(:event1, :event2) do |*events| # at most 10 events, may contain events of type 1 and 2
```
### `concurrency`
Number of workers processing events.
```ruby
concurrency 4
```
**Note**: the default concurrency is adapted and different for each worker pool type, so make sure you understand how this tweak may affect you.
### `max_connections`
Number of database connections the workers will share to do their work.
In the (default) threaded worker mode, it'll default to the number of workers (set by the `concurrency` configuration).
```ruby
# 10 workers sharing 4 database connections
max_connections 4
concurrency 10
```
This can be useful when the database presents overhead in managing write intensive workload (such as the one an outbox generates), and you want to be able to scale without putting more work on the database.
### `worker`
Type of the worker used to process events. Can be `:thread` (default), `:fiber`, or a class implementing the `Tobox::Pool` protocol (TBD: define what this protocol is).
### `wait_for_events_delay`
Time (in seconds) to wait before checking again for events in the outbox.
### `shutdown_timeout`
Time (in seconds) to wait for events to finishing processing, before hard-killing the process.
### `grace_shutdown_timeout`
Grace period (in seconds) to wait after, hard-killing the work in progress, and before exiting the process.
### `on(event_type) { |before, after| }`
callback executed when processing an event of the given type. By default, it'll yield the state of data before and after the event (unless `message_to_arguments` is set).
```ruby
on("order_created") { |event| puts "order created: #{event[:after]}" }
on("order_updated") { |event| puts "order created: was #{event[:before]}, now is #{event[:after]}" }
# ...
```
### `on_before_event { |event| }`
callback executed right before proocessing an event.
```ruby
on_before_event { |event| start_trace(event[:id]) }
```
### `on_after_event { |event| }`
callback executed right after proocessing an event.
```ruby
on_before_event { |event| finish_trace(event[:id]) }
```
### `on_error_event { |event, error| }`
callback executed when an exception was raised while processing an event.
```ruby
on_error_event { |event, exception| Sentry.capture_exception(exception) }
```
### `on_error_worker { |error| }`
callback executed when an exception was raised in the worker, before processing events.
```ruby
on_error_worker { |exception| Sentry.capture_exception(exception) }
```
### `on_database_connect { |db| }`
Callback executed right after initializing the `sequel` database object. This can be used, for example, to load database-level extensions and plugins, and set parameters (such as connection pool tweaks). This callback will also be used by plugins which instantiate its own separate database objects (such as in the case of the [stats](#stats) plugin).
This callback won't be executed if the database object is created outside of `tobox` configuration parameters.
```ruby
on_database_connect do |db|
db.extension(:connection_validator)
db.pool.connection_validation_timeout = -1
end
```
### `message_to_arguments { |event| }`
if exposing raw data to the `on` handlers is not what you'd want, you can always override the behaviour by providing an alternative "before/after fetcher" implementation.
```ruby
# if you'd like to yield the ORM object only
message_to_arguments do |event|
case event_type
when "order_created", "order_updated"
Order.get(after[:id])
when "payment_created", "payment_processed", "payment_reconciled"
Payment.get(after[:id])
else
super(event)
end
on("order_created") { |order| puts "order created: #{order}" }
# ...
on("payment_created") { |payment| puts "payment created: #{payment}" }
# ...
```
### logger
Overrides the internal logger (an instance of `Logger`).
### log_level
Overrides the default log level ("info" when in "production" environment, "debug" otherwise).
## Event
The event is composed of the following properties:
* `:id`: unique event identifier
* `:type`: label identifying the event (i.e. `"order_created"`)
* `:before`: hash of the associated event data before event is emitted (can be `nil`)
* `:after`: hash of the associated event data after event is emitted (can be `nil`)
* `:created_at`: timestamp of when the event is emitted
(*NOTE*: The event is also composed of other properties which are only relevant for `tobox`.)
## Plugins
`tobox` ships with a very simple plugin system. (TODO: add docs).
Plugins can be loaded in the config via `plugin`:
```ruby
# tobox.rb
plugin(:plugin_name)
```
### Progress
By default, the database transaction used to consume the event is kept open while the event is handled. While this ensures atomic event consumption, it may also cause overhead related to transaction management given enough load, particularly in cases where event handling time varies (i.e. throttled HTTP requests).
The `:progress` plugin fixes this by releasing the databaase transaction after fetching the event. It does so by making the fetched event "invisible" for a certain period, during which the event must be successfully handled.
Here's how to use it:
```ruby
# in your tobox.rb
plugin :progress
visibility_timeout 90 # default: 30
```
3. insert related outbox events with the same group id
```ruby
order = Order.new(
item_id: item.id,
price: 20_20,
currency: "EUR"
)
DB.transaction do
order.save
DB[:outbox].insert(event_type: "order_created", group_id: order.id, data_after: order.to_hash)
DB[:outbox].insert(event_type: "billing_event_started", group_id: order.id, data_after: order.to_hash)
end
# "order_created" will be processed first
# "billing_event_created" will only start processing once "order_created" finishes
```
#### Configuration
##### `visibility_timeout`
Timeout (in seconds) after which a previously marked-for-consumption event can be retried (default: `30`)
### Event grouping
By default, events are taken and processed from the "outbox" table concurrently by workers, which means that, while worker A may process the most recent event, and worker B takes the following, worker B may process it faster than worker A. This may be an issue if the consumer expects events from a certain context to arrive in a certain order.
One solution is to have a single worker processing the "outbox" events. Another is to use the `:event_grouping` plugin.
All you have to do is:
1. add a "group id" column to the "outbox" table
```ruby
create_table(:outbox) do
primary_key :id
column :group_id, :integer # The type is irrelevant, could also be :string, :uuid...
# ..
index :group_id
```
2. Enable the plugin
```ruby
# in your tobox.rb
plugin :event_grouping
group_column :group_id # by default already `:group_id`
```
3. insert related outbox events with the same group id
```ruby
order = Order.new(
item_id: item.id,
price: 20_20,
currency: "EUR"
)
DB.transaction do
order.save
DB[:outbox].insert(event_type: "order_created", group_id: order.id, data_after: order.to_hash)
DB[:outbox].insert(event_type: "billing_event_started", group_id: order.id, data_after: order.to_hash)
end
# "order_created" will be processed first
# "billing_event_created" will only start processing once "order_created" finishes
```
#### Configuration
##### `group_column`
Defines the database column to be used for event grouping (`:group_id` by default).
### Inbox
Via the `:inbox` plugin, `tobox` also supports the [inbox pattern](https://event-driven.io/en/outbox_inbox_patterns_and_delivery_guarantees_explained/), to ensure "exactly-once" processing of events. This is achieved by "tagging" events with a unique identifier, and registering them in the inbox before processing (and if they're there, ignoring it altogether).
In order to do so, you'll have to:
1. add an "inbox" table in the database and the unique id reference in the outbox table:
```ruby
create_table(:inbox) do
column :inbox_id, :varchar, null: true, primary_key: true # it can also be a uuid, you decide
column :created_at, "timestamp without time zone", null: false, default: Sequel::CURRENT_TIMESTAMP
end
create_table(:outbox) do
primary_key :id
column :type, :varchar, null: false
column :inbox_id, :varchar, null: true
# ...
foreign_key :inbox_id, :inbox
```
2. Load the plugin and reference them in the configuration
```ruby
# tobox.rb
plugin :inbox
inbox_table :inbox # :inbox by default already
inbox_column :inbox_id # :inbox_id by default already
```
3. insert related outbox events with an inbox id
```ruby
order = Order.new(
item_id: item.id,
price: 20_20,
currency: "EUR"
)
DB.transaction do
order.save
DB[:outbox].insert(event_type: "order_created", inbox_id: "ord_crt_#{order.id}", data_after: order.to_hash)
DB[:outbox].insert(event_type: "billing_event_started", inbox_id: "bil_evt_std_#{order.id}", data_after: order.to_hash)
end
# assuming this bit above runs two times in two separate workers, each will be processed by tobox only once.
```
#### Configuration
##### inbox table
Defines the name of the table to be used for inbox (`:inbox` by default).
##### inbox column
Defines the column in the outbox table which references the inbox table (`:inbox_id` by default).
**NOTE**: make sure you keep cleaning the inbox periodically from older messages, once there's no more danger of receiving them again.
It ships with the following integrations.
### Zeitwerk
(requires the `zeitwerk` gem.)
Plugin for the [zeitwerk](https://github.com/fxn/zeitwerk) auto-loader. It allows to set the autoload dirs, and seamlessly integrates code reloading in "development", and eagerloading in "production":
```ruby
# tobox.rb
plugin(:zeitwerk)
zeitwerk_loader do |loader|
loader.push_dir("path/to/handlers")
end
```
### Sentry
(requires the `sentry-ruby` gem.)
Plugin for the [sentry](https://github.com/getsentry/sentry-ruby) ruby SDK for error tracking. It'll send all errors happening while processing events to Sentry.
```ruby
# tobox.rb
plugin(:sentry)
on_sentry_init do |sentry_cfg|
sentry.dsn = ENV["SENTRY_DSN"]
end
```
### Datadog
(requires the `ddtrace` gem.)
Plugin for [datadog](https://github.com/DataDog/dd-trace-rb) ruby SDK. It'll generate traces for event handling.
```ruby
# tobox.rb
plugin(:datadog)
# or, if you want to pass options to tracing call:
plugin(:datadog, enabled: false)
# or, if you want to access the datadog configuration:
plugin(:datadog) do |c|
c.tracing.instrument :smth_else
end
```
`datadog` tracing functionality can also be enabled/disabled via environment variables, namely the following:
* `DD_TOBOX_ENABLED`: enables/disables tobox tracing (defaults to `true`)
* `DD_TOBOX_ANALYTICS_ENABLED`: enables/disables tobox analytics (defaults to `true`)
* `DD_TRACE_TOBOX_ANALYTICS_SAMPLE_RATE`: sets tobox tracing sample rate (defaults to `1.0`)
### Stats
The `stats` plugin collects statistics related with the outbox table periodically, and exposes them to app code (which can then relay them to a statsD collector, or similar tool).
```ruby
plugin(:stats)
on_stats(5) do |stats_collector| # every 5 seconds
stats = stats_collector.collect
#
# stats => {
# pending_count: number of new events in the outbox table
# failing_count: number of events which have failed processing but haven't reached the threshold
# failed_count: number of events which have failed the max number of tries
# inbox_count: (if used) number of events marked as received in the inbox table
# }
#
# now you can send them to your statsd collector
#
StatsD.gauge('outbox_pending_backlog', stats[:pending_count])
StatsD.gauge('outbox_oldest_message_age', stats[:oldest_event_age_in_seconds])
end
```
#### Bring your own leader election
The stats collection runs on every `tobox` initiated. If you're launching it in multiple servers / containers / pods, this means you'll be collecting statistics about the same database on all of these instances. This may not be desirable, and you may want to do this collection in a single instance. This is not a problem that `tobox` can solve by itself, so you'll have to take care of that yourself. Still, here are some cheap recommendations.
##### Postgres advisory locks
If your database is PostgreSQL, you can leverage session-level [advisory locks](https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS) to ensure single-instance access to this functionality. `tobox` also exposes the database instance to the `on_stats` callback:
```ruby
c.on_stats(5) do |stats_collector, db|
if db.get(Sequel.function(:pg_try_advisory_lock, 1))
stats = stats_collector.collect
StatsD.gauge('outbox_pending_backlog', stats[:pending_count])
end
end
```
If a server goes down, one of the remaining ones will acquire the lock and ensure stats processing.
##### Redis distributed locks
If you're already using [redis](https://redis.io/), you can use its distributed lock feature to achieve the goal:
```ruby
# using redlock
c.on_stats(5) do |stats_collector, db|
begin
lock_info = lock_manager.lock("outbox", 5000)
stats = stats_collector.collect
StatsD.gauge('outbox_pending_backlog', stats[:pending_count])
# extend to hold the lock for the next loop
lock_info = lock_manager.lock("outbox", 5000, extend: lock_info)
rescue Redlock::LockError
# some other server already has the lock, try later
end
end
```
### PG Notify
The `pg_notify` plugin is a **PostgreSLQ only** plugin, which uses the [LISTEN](https://www.postgresql.org/docs/current/sql-listen.html) statement to pause the workers when no work is available in the outbox table, until the producer says so, by using the [NOTIFY](https://www.postgresql.org/docs/current/sql-notify.html) statement to notify the channel the workers are listening to.
It reduces the `SELECT ... FOR UPDATE SKIP LOCKED` statements to the bare minimum required; without this plugin, these may, given enough load, become the cause of overhead in the master replica, considering that they're handled as "write statements", i.e. resources must be allocated, high frequency affects applying changes on (and using) indexes on the outbox table, which may make subsequent queries fall back to table scan, which will hold dead tuples from used transaction xids for longer, which won't be vacuumed fast, which increases replication lag, which... you get the gist.
```ruby
plugin(:pg_notify)
notifier_channel :outbox_notifications # default
# that's it
```
**NOTE**: this plugin can't be used with `jruby`.
#### Configuration
##### `notifier_channel`
Identifies the name of the channel the `LISTEN` and `NOTIFY` SQL statements will refer to (`:outbox_notifications` by default).
## Advanced
### Batch Events Handling
You may start hitting a scale where the workload generated by `tobox` puts the master replica under water. Particularly with PostgreSQL, which isn't optimized for writes, this manifests in CPU usage spiking due to index bypasses, or locks on accessing shared buffers.
A way to aleviate this is by hnadling events in batches. By handling N events at a time, the database can drain events more efficiently, while you can either still handle them one by one, or batch them, if possible. For instance, the AWS SDK contains batching alternatives of several APIs, including the SNS publish API.
You can do so by setting a batch size in your configuration, and spread the arguments in the event handler:
```ruby
# tobox.rb
batch_size 10 # fetching 10 events at a time
on("user_created", "user_updated") do |*events| # 10 events at most
if events.size == 1
DataLakeService.user_created(events.first)
else
DataLakeService.batch_users_created(events)
end
end
```
In case you're using a batch API solution which may fail for a subset of events, you are able to communicate which events from the batch failed by using `Tobox.raise_batch_errors` API:
```ruby
on("user_created", "user_updated") do |*events| # 10 events at most
if events.size == 1
DataLakeService.user_created(events.first)
else
success, failed_events_with_errors = DataLakeService.batch_users_created(events)
# handle success first
batch_errors = failed_events_with_errors.to_h do |event, exception|
[
events.index(event),
exception
]
end
# events identified by the batch index will be retried.
Tobox.raise_batch_errors(batch_errors)
end
end
```
## Recommendations
There is no free lunch. Having a transactional outbox has a cost. Throughput is sacrificed in order to guarantee the processing of the event. The cost has to be reasonable, however.
### PostgreSQL
PostgreSQL is the most popular database around, and for good reason: extensible, feature-rich, and quite performant for most workloads. It does have some known drawbacks though: its implementation of MVCC, with creation of tuples for UPDATEs and DELETEs, along with the requirement for indexes to point to the address of the most recent tuple, and WAL logs having to bookkeep all of that (which impacts, among other things, disk usage and replication), highly impacts the performance of transaction management. This phenomenon is known as "write amplification".
Considering the additional overhead that a transactional outbox introduces to the same database your main application uses, certain issues may escalate badly, and it'll be up to you to apply strategies to mitigate them. Here are some recommendations.
### Tweak `max_connections`
By default, a `tobox` consumer process will have as many database connections as there are workers (each worker polls the outbox table individually). As the system scales out to cope with more traffic, you may see that, as more workers are added, so will query latency (and database CPU usage).
One way to address that is to limit the number of database connections that can be used by the workers in a `tobox` consumer process, by setting the `max_connections` configuration option to a number lower than `concurrency`, i.e. 1/3 or 1/4. As a result, workers will wait for an available connection to fetch work from, when none is available.
#### Caveats
This is not the main source of query latency overhead, you may start seeing "pool timeout" errors as a result, so do monitor their performance and apply other mitigations accordingly.
### Handling events in batches
By default, each worker will fetch-and-handle-then-delete events one by one. As surges happen and volume increases, the database will spend way more time and resources managing the transaction, than doing the actual work you need, thereby affecting overall turnaround time. In the case of PostgreSQL, the constant DELETEs and UPDATEs may result in the query planner deciding not to use indexes to find an event, and instead fallback to table scan, if an index is assumed to be "behind" due to a large queue of pending updates from valid transactions.
A way to mitigate this is to [handle more events at once](#batch-events). It's a strategy that makes sense if the event handler APIs support batching. For instance, if all your event handler is doing is relaying to AWS SNS, you can use the [PublishBatch](https://docs.aws.amazon.com/sns/latest/api/API_PublishBatch.html) API (and adjust the batching window to the max threshold you're able to handle at once).
#### Caveats
As per above, it makes sense to use this if events can be handled as a batch; if that's not the case, and the handling block iterates across the batch one by one, this will cause significant variance in single event TaT metrics, as a "slow to handle" event will delay subsequent events in the batch. Delays can also cause visibility timeouts to expire, and make events visible to other handlers earlier than expected.
Recovering from errors in a batch is also more convoluted, (see `Tobox.raise_batch_errors`).
### Disable retries and ordering
The `tobox` default configuration expects the `visibility_column` to be a datetime column ( default is `:run_at`), which is therefore used as a "visibility timeout", and along the `attempts` column, used to retry failed events gracefully with an exponential backoff interval.
As a consequence, and in order to ensure reliable performance of the worker polling query, a sorted index is recommended; in PostgreSQL, it's `CREATE INDEX ... (id, run_at DESC NULLS FIRST)`, which ensures that new events get handled before retries, which can append ` WHERE attempts < 10` to the index statement, in order to rule out events which have exhausted attempts.
This comes at the cost of increased overhead per event: when producing it via `INSERT` statement, the sorted index will have to be rebalanced. When picking it up, setting the "visibility timeout" before handling it will rebalance it again; and after handling it, whether successfully or not, it'll rebalance it again. This will increase the backlog associated with index management, which may have other consequences (described somewhere else in this section).
You may observe in your systems that your handler either never fails, or when it does, it's the type of transient error which can be retried immediately after, and at a marginal cost. In such situations, the default "planning for failure" exponential backoff strategy described above imposes too much weight for little gain.
You can improve this by setting `visibility_column` to a boolean column, with default set to `false`:
```ruby
# in migration
column :in_progress, :boolean, default: false
# tobox
visibility_column :in_progress
# and, if you require unbounded retries
attempts_column nil
```
this should improve the performance of the main polling query, by **not requiring a sorted index on the visibility column** (i.e. the primary key index is all you need), and rely on conditional boolean statements (instead of the more expensive datetime logical operators).
#### Caveats
While using a boolean column as the `visibility_column` may improve the performance of most queries and reduce the overhead of writes, event handling will not be protected against database crashes, so you'll have to monitor idle events and recover them manually (by resetting the `visibility_column` to `false`).
### Do not poll, get notified
The database must allocate resources and bookkeep some data on each transaction. In some cases (i.e. PostgreSQL), some of that bookkeeping does not happen **until** the first write statement is processed. However, due to the usage of locks via `SELECT ... FOR UPDATE`, most databases will consider the polling statement as a write statement, which means that, in a `tobox` process, transaction overhead is ever present. In a high availability configuration scenario, transactional resources will need to be maintained and replicated to read replica nodes, which given enough replication lag and inability to vacuum data, may snowball resource usage in the master replica, which may trigger autoscaling, causing more workers to poll the database for more work, and eventually bringing the whole system down.
This can be mitigated by either adjusting polling intervals (via `wait_for_events_delay` option), or replacing polling by asynchronously notifying workers of when there's work to do. For PostgreSQL, you can use the [pg_notify](#pg-notify) plugin, which will use the PostgreSQL-only `LISTEN`/`NOTIFY` statements for that effect.
#### Caveats
Using `LISTEN` requires maintaining a long-lived most-idle separate database connection; this approach may not be compatible with your setup, such as if you're using a connection pooler with a particular configuration. For instance, if you're using the popular [pgbouncer](https://www.pgbouncer.org/features.html), this plugin will be incompatible with transaction pooling.
There will be a slight race condition between the moment that a worker wasn't able to fetch an event, and the moment it starts listening to the notification channel; if an event arrives meanwhile, and the notification is broadcasted before the worker starts listening, the worker won't pick up this work immediately. Given enough entropy and workers, this should be a non-scenario, but a theoretical one still.
### Unlogged tables
By design (storing the event in the same transaction where the associated changes happen), a transactional outbox consumer requires access that the outbox table is stored in the same database the application uses, and accesses it via the master replica. As already mentioned, this means associated bookkeeping overhead in the master replica, including WAL logs and replication lag, which under extreme load, leads to all kind of issues to guarantee data consistency, despite the outbox table being unused and irrelevant in read nodes.
In such cases, you may want to set the outbox table as [unlogged](https://www.postgresql.org/docs/current/sql-createtable.html#SQL-CREATETABLE-UNLOGGED), which ensures that associated write statements aren't part of WAL logs, and aren't replicated either. This will massively improve throughput of associated traffic, while preserving most of the desired transactional properties of using a transactional outbox solution, i.e. writing events along with associated data, making it visible to consumers only after transaction commits, and **in case of a clean shutdown**, ensure that data is flushed to disk.
#### Caveats
The last statement leads to the biggest shortcoming of this recommendation: by choosing to do unlog the outbox table, your database cannot ensure 100% consistency for its data in case of a database crash or unclean shutdown, which means you may lose events in that event. And while outbox data should not be business critical,having less than 100% event handling may be unacceptable to you.
You may decide to do it temporarily though whenever you expect the level of traffic that justifies foregoing 100% consistency, but be aware that an `ALTER TABLE ... SET UNLOGGED` statement **rewrites the table**, so bear in mind of that, if you try to do this during an ongoing traffic surge / incident; the recommendation is to do this **before** the surge happens, such as a thursday before a black friday.
### WAL outbox consumer (Debezium/Kafka)
It takes a lot of write statements to both produce and consume from the outbox table, in the manner in which it is implemented in `tobox`.In PostgreSQL, considering each write statement on a given row will just generate a new tuple, per event, that amounts to at least 3 tuples. In the long run, and given enough volume, the health of the whole database will be limited by how quickly dead tuples are vacuumed from the outbox table.
An alternative way to consume outbox events which does not require consuming events via SQL is by using a broker which is able to relay outbox events directly from the WAL logs. One such alternative is [Debezium](https://debezium.io/documentation/reference/stable/integrations/outbox.html), which relays them into Kafka streams.
This solution means not using `tobox` anymore.
#### Caveats
This solution is, at least at the time of writing, limited to Kafka streams; if events are to be relayed to other alternatives (AWS SNS, RabbitMQ...), or there's more to your event handler than relaying, this solution will not work for you either.
There are also several shortcomings to consider when using Kafka streams; for once, events are consumed one at a time, which will affect event handling turnaround time.
## Supported Rubies
All Rubies greater or equal to 2.7, and always latest JRuby and Truffleruby.
## Rails support
Rails is supported out of the box by adding the [sequel-activerecord_connection](https://github.com/janko/sequel-activerecord_connection) gem into your Gemfile, and requiring the rails application in the `tobox` cli call:
```bash
> bundle exec tobox -C path/to/tobox.rb -r path/to/rails_app/config/environment.rb
```
In the `tobox` config, you can set the environment:
```ruby
environment Rails.env
```
## Why?
### Simple and lightweight, framework (and programming language) agnostic
`tobox` event callbacks yield the data in ruby primitive types, rather than heavy ORM instances. This is by design, as callbacks may not rely on application code being loaded.
This allows `tobox` to process events dispatched from an application done in another programmming language, as an example.
### No second storage system
While `tobox` does not advertise itself as a background job framework, it can be used as such.
Most tiered applications already have an RDBMS. Popular background job solutions, like `"sidekiq"` and `"shoryuken"`, usually require integrating with a separate message broker (Redis, SQS, RabbitMQ...). This increases the overhead in deployment and operations, as these brokers need to be provisioned, monitored, scaled separately, and billed differently.
`tobox` only requires the database you usually need to account for anyway, allowing you to delay buying into more complicated setups until you have to and have budget for.
However, it can work well in tandem with such solutions:
```ruby
# process event by scheduling an active job
on("order_created") { |event| SendOrderMailJob.perform_later(event[:after]["id"]) }
```
### Atomic processing via database transactions
When scheduling work, one needs to ensure that data is committed to the database before scheduling. This is a very frequent bug when using non-RDBMS background job frameworks, such as [Sidekiq, which has a FAQ entry for this](https://github.com/mperham/sidekiq/wiki/FAQ#why-am-i-seeing-a-lot-of-cant-find-modelname-with-id12345-errors-with-sidekiq) .
But even if you do that, the system can go down **after** the data is committed in the database and **before** the job is enqueued to the broker. Failing to address this behaviour makes the [job delivery guarantee "at most once"](https://brandur.org/job-drain). This may or may not be a problem depending on what your job does (if it bills a customer, it probably is).
By using the database as the message broker, `tobox` can rely on good old transactions to ensure that data committed to the database has a corresponding event. This makes the delivery guarantee "exactly once".
(The actual processing may change this to "at least once", as issues may happen before the event is successfully deleted from the outbox. Still, "at least once" is acceptable and solvable using idempotency mechanisms).
## Development
After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake test` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment.
## Contributing
Bug reports and pull requests are welcome on GitHub at https://gitlab.com/os85/tobox.