# Tobox: Transactional outbox pattern implementation in ruby [![Gem Version](https://badge.fury.io/rb/tobox.svg)](http://rubygems.org/gems/tobox) [![pipeline status](https://gitlab.com/os85/tobox/badges/master/pipeline.svg)](https://gitlab.com/os85/tobox/pipelines?page=1&scope=all&ref=master) [![coverage report](https://gitlab.com/os85/tobox/badges/master/coverage.svg?job=coverage)](https://os85.gitlab.io/tobox/#_AllFiles) Simple, data-first events processing framework based on the [transactional outbox pattern](https://microservices.io/patterns/data/transactional-outbox.html). - [Requirements](#requirements) - [Installation](#installation) - [Usage](#usage) - [Configuration](#configuration) - [Event](#event) - [Features](#features) - [Ordered event processing](#ordered-event-processing) - [Inbox](#inbox) - [Plugins](#plugins) - [Zeitwerk](#zeitwerk) - [Sentry](#sentry) - [Datadog](#datadog) - [Stats](#stats) - [Supported Rubies](#supported-rubies) - [Rails support](#rails-support) - [Why?](#why) - [Development](#development) - [Contributing](#contributing) ## Requirements `tobox` requires integration with RDBMS which supports `SKIP LOCKED` functionality. As of today, that's: * PostgreSQL 9.5+ * MySQL 8+ * Oracle * Microsoft SQL Server ## Installation Add this line to your application's Gemfile: ```ruby gem "tobox" # You'll also need to add the right database client gem for the target RDBMS # ex, for postgresql: # # gem "pg" # see more: http://sequel.jeremyevans.net/rdoc/files/doc/opening_databases_rdoc.html ``` And then execute: $ bundle install Or install it yourself as: $ gem install tobox ## Usage 1. create the `outbox` table in your application's database: ```ruby # example migration using sequel Sequel.migration do up do create_table(:outbox) do primary_key :id column :type, :varchar, null: false column :data_before, :json, null: true column :data_after, :json, null: true column :created_at, "timestamp without time zone", null: false, default: Sequel::CURRENT_TIMESTAMP column :attempts, :integer, null: false, default: 0 column :run_at, "timestamp without time zone", null: true column :last_error, :text, null: true column :metadata, :json, null: true index Sequel.desc(:run_at) end end down do drop_table(:outbox) end end ``` 2. create a `tobox.rb` config file in your project directory tree: ```ruby # tobox database Sequel.connect("postgres://user:pass@dbhost/database") # table :outbox # concurrency 8 on("user_created") do |event| puts "created user #{event[:after]["id"]}" DataLakeService.user_created(user_data_hash) BillingService.bill_user_account(user_data_hash) end on("user_updated") do |event| # ... end on("user_created", "user_updated") do |event| # ... end ``` 3. Start the `tobox` process ```bash > bundle exec tobox -C path/to/tobox.rb -r path/to/file_requiring_application_code.rb ``` There is no API for event production yet (still TODO). It's recommended you write directly into the "outbox" table via database triggers (i.e. *insert into users table -> add user_created event"). Alternatively you can use `sequel` directly (`DB[:outbox].insert(...)`). 4. Emit outbox events Currently, `tobox` only deals with outbox events consumption. When it comes to producing, you can do it yourself. There essentially two alternatives: 4.1 Emit from application code If you're using `sequel` as your ORM, you can use the dataset API: ```ruby # Assuming DB points to your `Sequel::Database`, and defaults are used: order = Order.new( item_id: item.id, price: 20_20, currency: "EUR" ) DB.transaction do order.save DB[:outbox].insert(event_type: "order_created", data_after: order.to_hash) end ``` 4.2 Emit from database trigger This is how it could be done in PostgreSQL using trigger functions: ```sql CREATE OR REPLACE FUNCTION order_created_outbox_event() RETURNS TRIGGER LANGUAGE PLPGSQL AS $$ BEGIN INSERT INTO outbox(event_type, data_after) VALUES('order_created', row_to_json(NEW.*)); RETURN NEW; END; $$ CREATE TRIGGER order_created_outbox_event AFTER INSERT ON orders FOR EACH ROW EXECUTE PROCEDURE order_created_outbox_event(); ``` ## Configuration As mentioned above, configuration can be set in a particular file. The following options are configurable: ### `environment` Sets the application environment (either "development" or "production"). Can be set directly, or via `APP_ENV` environment variable (defaults to "development"). ### `database_uri` Accepts a URI pointing to a database, [where scheme identifies the database adapter to be used](https://sequel.jeremyevans.net/rdoc/files/doc/opening_databases_rdoc.html): ```ruby database_uri `"postgres://user:password@localhost/blog"`. ``` ### `table` the name of the database table where outbox events are stored (`:outbox` by default). ```ruby table :outbox ``` ### `max_attempts` Maximum number of times a failed attempt to process an event will be retried (`10` by default). ```ruby concurrency 4 ``` **Note**: the new attempt will be retried in `n ** 4`, where `n` is the number of past attempts for that event. ### `concurrency` Number of workers processing events. ```ruby concurrency 4 ``` **Note**: the default concurrency is adapted and different for each worker pool type, so make sure you understand how this tweak may affect you. ### `worker` Type of the worker used to process events. Can be `:thread` (default), `:fiber`, or a class implementing the `Tobox::Pool` protocol (TBD: define what this protocol is). ### `wait_for_events_delay` Time (in seconds) to wait before checking again for events in the outbox. ### `shutdown_timeout` Time (in seconds) to wait for events to finishing processing, before hard-killing the process. ### `on(event_type) { |before, after| }` callback executed when processing an event of the given type. By default, it'll yield the state of data before and after the event (unless `message_to_arguments` is set). ```ruby on("order_created") { |event| puts "order created: #{event[:after]}" } on("order_updated") { |event| puts "order created: was #{event[:before]}, now is #{event[:after]}" } # ... ``` ### `on_before_event { |event| }` callback executed right before proocessing an event. ```ruby on_before_event { |event| start_trace(event[:id]) } ``` ### `on_after_event { |event| }` callback executed right after proocessing an event. ```ruby on_before_event { |event| finish_trace(event[:id]) } ``` ### `on_error_event { |event, error| }` callback executed when an exception was raised while processing an event. ```ruby on_error_event { |event, exception| Sentry.capture_exception(exception) } ``` ### `on_error_worker { |error| }` callback executed when an exception was raised in the worker, before processing events. ```ruby on_error_worker { |exception| Sentry.capture_exception(exception) } ``` ### `on_database_connect { |db| }` Callback executed right after initializing the `sequel` database object. This can be used, for example, to load database-level extensions and plugins, and set parameters (such as connection pool tweaks). This callback will also be used by plugins which instantiate its own separate database objects (such as in the case of the [stats](#stats) plugin). This callback won't be executed if the database object is created outside of `tobox` configuration parameters. ```ruby on_database_connect do |db| db.extension(:connection_validator) db.pool.connection_validation_timeout = -1 end ``` ### `message_to_arguments { |event| }` if exposing raw data to the `on` handlers is not what you'd want, you can always override the behaviour by providing an alternative "before/after fetcher" implementation. ```ruby # if you'd like to yield the ORM object only message_to_arguments do |event| case event_type when "order_created", "order_updated" Order.get(after[:id]) when "payment_created", "payment_processed", "payment_reconciled" Payment.get(after[:id]) else super(event) end on("order_created") { |order| puts "order created: #{order}" } # ... on("payment_created") { |payment| puts "payment created: #{payment}" } # ... ``` ### logger Overrides the internal logger (an instance of `Logger`). ### log_level Overrides the default log level ("info" when in "production" environment, "debug" otherwise). ### group_column Defines the column to be used for event grouping, when [ordered processing of events is a requirement](#ordered-event-processing). ### inbox table Defines the name of the table to be used for inbox, when [inbox usage is a requirement](#inbox). ### inbox column Defines the column in the outbox table which references the inbox table, when one is set. ## Event The event is composed of the following properties: * `:id`: unique event identifier * `:type`: label identifying the event (i.e. `"order_created"`) * `:before`: hash of the associated event data before event is emitted (can be `nil`) * `:after`: hash of the associated event data after event is emitted (can be `nil`) * `:created_at`: timestamp of when the event is emitted (*NOTE*: The event is also composed of other properties which are only relevant for `tobox`.) ## Features There are a few extra features you can run on top a "vanilla" transactional outbox implementation. This is how you can accomplish them using `tobox`. ### Ordered event processing By default, events are taken and processed from the "outbox" table concurrently by workers, which means that, while worker A may process the most recent event, and worker B takes the following, worker B may process it faster than worker A. This may be an issue if the consumer expects events from a certain context to arrive in a certain order. One solution is to have a single worker processing the "outbox" events. Another is to use the `group_column` configuration. What you have to do is: 1. add a "group id" column to the "outbox" table ```ruby create_table(:outbox) do primary_key :id column :group_id, :integer # The type is irrelevant, could also be :string, :uuid... # .. ``` 2. set the "group_column" configuration ```ruby # in your tobox.rb group_column :group_id index :group_id ``` 3. insert related outbox events with the same group id ```ruby order = Order.new( item_id: item.id, price: 20_20, currency: "EUR" ) DB.transaction do order.save DB[:outbox].insert(event_type: "order_created", group_id: order.id, data_after: order.to_hash) DB[:outbox].insert(event_type: "billing_event_started", group_id: order.id, data_after: order.to_hash) end # "order_created" will be processed first # "billing_event_created" will only start processing once "order_created" finishes ``` ### Inbox `tobox` also supports the [inbox pattern](https://event-driven.io/en/outbox_inbox_patterns_and_delivery_guarantees_explained/), to ensure "exactly-once" processing of events. This is achieved by "tagging" events with a unique identifier, and registering them in the inbox before processing (and if they're there, ignoring it altogether). In order to do so, you'll have to: 1. add an "inbox" table in the database ```ruby create_table(:inbox) do column :inbox_id, :varchar, null: true, primary_key: true # it can also be a uuid, you decide column :created_at, "timestamp without time zone", null: false, default: Sequel::CURRENT_TIMESTAMP end ``` 2. add the unique id reference in the outbox table: ```ruby create_table(:outbox) do primary_key :id column :type, :varchar, null: false column :inbox_id, :varchar, null: true # ... foreign_key :inbox_id, :inbox ``` 3. reference them in the configuration ```ruby # tobox.rb inbox_table :inbox inbox_column :inbox_id ``` 4. insert related outbox events with an inbox id ```ruby order = Order.new( item_id: item.id, price: 20_20, currency: "EUR" ) DB.transaction do order.save DB[:outbox].insert(event_type: "order_created", inbox_id: "ord_crt_#{order.id}", data_after: order.to_hash) DB[:outbox].insert(event_type: "billing_event_started", inbox_id: "bil_evt_std_#{order.id}", data_after: order.to_hash) end # assuming this bit above runs two times in two separate workers, each will be processed by tobox only once. ``` **NOTE**: make sure you keep cleaning the inbox periodically from older messages, once there's no more danger of receiving them again. ## Plugins `tobox` ships with a very simple plugin system. (TODO: add docs). Plugins can be loaded in the config via `plugin`: ```ruby # tobox.rb plugin(:plugin_name) ``` It ships with the following integrations. ### Zeitwerk (requires the `zeitwerk` gem.) Plugin for the [zeitwerk](https://github.com/fxn/zeitwerk) auto-loader. It allows to set the autoload dirs, and seamlessly integrates code reloading in "development", and eagerloading in "production": ```ruby # tobox.rb plugin(:zeitwerk) zeitwerk_loader do |loader| loader.push_dir("path/to/handlers") end ``` ### Sentry (requires the `sentry-ruby` gem.) Plugin for the [sentry](https://github.com/getsentry/sentry-ruby) ruby SDK for error tracking. It'll send all errors happening while processing events to Sentry. ```ruby # tobox.rb plugin(:sentry) on_sentry_init do |sentry_cfg| sentry.dsn = ENV["SENTRY_DSN"] end ``` ### Datadog (requires the `ddtrace` gem.) Plugin for [datadog](https://github.com/DataDog/dd-trace-rb) ruby SDK. It'll generate traces for event handling. ```ruby # you can init the datadog config in another file to load: Datadog.configure do |c| c.tracing.instrument :tobox end # tobox.rb plugin(:datadog) ``` ### Stats The `stats` plugin collects statistics related with the outbox table periodically, and exposes them to app code (which can then relay them to a statsD collector, or similar tool). ```ruby plugin(:stats) on_stats(5) do |stats_collector| # every 5 seconds stats = stats_collector.collect # # stats => { # pending_count: number of new events in the outbox table # failing_count: number of events which have failed processing but haven't reached the threshold # failed_count: number of events which have failed the max number of tries # inbox_count: (if used) number of events marked as received in the inbox table # } # # now you can send them to your statsd collector # StatsD.gauge('outbox_pending_backlog', stats[:pending_count]) end ``` #### Bring your own leader election The stats collection runs on every `tobox` initiated. If you're launching it in multiple servers / containers / pods, this means you'll be collecting statistics about the same database on all of these instances. This may not be desirable, and you may want to do this collection in a single instance. This is not a problem that `tobox` can solve by itself, so you'll have to take care of that yourself. Still, here are some cheap recommendations. ##### Postgres advisory locks If your database is PostgreSQL, you can leverage session-level [advisory locks](https://www.postgresql.org/docs/current/explicit-locking.html#ADVISORY-LOCKS) to ensure single-instance access to this functionality. `tobox` also exposes the database instance to the `on_stats` callback: ```ruby c.on_stats(5) do |stats_collector, db| if db.get(Sequel.function(:pg_try_advisory_lock, 1)) stats = stats_collector.collect StatsD.gauge('outbox_pending_backlog', stats[:pending_count]) end end ``` If a server goes down, one of the remaining ones will acquire the lock and ensure stats processing. ##### Redis distributed locks If you're already using [redis](https://redis.io/), you can use its distributed lock feature to achieve the goal: ```ruby # using redlock c.on_stats(5) do |stats_collector, db| begin lock_info = lock_manager.lock("outbox", 5000) stats = stats_collector.collect StatsD.gauge('outbox_pending_backlog', stats[:pending_count]) # extend to hold the lock for the next loop lock_info = lock_manager.lock("outbox", 5000, extend: lock_info) rescue Redlock::LockError # some other server already has the lock, try later end end ``` ## Supported Rubies All Rubies greater or equal to 2.6, and always latest JRuby and Truffleruby. ## Rails support Rails is supported out of the box by adding the [sequel-activerecord_connection](https://github.com/janko/sequel-activerecord_connection) gem into your Gemfile, and requiring the rails application in the `tobox` cli call: ```bash > bundle exec tobox -C path/to/tobox.rb -r path/to/rails_app/config/environment.rb ``` In the `tobox` config, you can set the environment: ```ruby environment Rails.env ``` ## Why? ### Simple and lightweight, framework (and programming language) agnostic `tobox` event callbacks yield the data in ruby primitive types, rather than heavy ORM instances. This is by design, as callbacks may not rely on application code being loaded. This allows `tobox` to process events dispatched from an application done in another programmming language, as an example. ### No second storage system While `tobox` does not advertise itself as a background job framework, it can be used as such. Most tiered applications already have an RDBMS. Popular background job solutions, like `"sidekiq"` and `"shoryuken"`, usually require integrating with a separate message broker (Redis, SQS, RabbitMQ...). This increases the overhead in deployment and operations, as these brokers need to be provisioned, monitored, scaled separately, and billed differently. `tobox` only requires the database you usually need to account for anyway, allowing you to delay buying into more complicated setups until you have to and have budget for. However, it can work well in tandem with such solutions: ```ruby # process event by scheduling an active job on("order_created") { |event| SendOrderMailJob.perform_later(event[:after]["id"]) } ``` ### Atomic processing via database transactions When scheduling work, one needs to ensure that data is committed to the database before scheduling. This is a very frequent bug when using non-RDBMS background job frameworks, such as [Sidekiq, which has a FAQ entry for this](https://github.com/mperham/sidekiq/wiki/FAQ#why-am-i-seeing-a-lot-of-cant-find-modelname-with-id12345-errors-with-sidekiq) . But even if you do that, the system can go down **after** the data is committed in the database and **before** the job is enqueued to the broker. Failing to address this behaviour makes the [job delivery guarantee "at most once"](https://brandur.org/job-drain). This may or may not be a problem depending on what your job does (if it bills a customer, it probably is). By using the database as the message broker, `tobox` can rely on good old transactions to ensure that data committed to the database has a corresponding event. This makes the delivery guarantee "exactly once". (The actual processing may change this to "at least once", as issues may happen before the event is successfully deleted from the outbox. Still, "at least once" is acceptable and solvable using idempotency mechanisms). ## Development After checking out the repo, run `bin/setup` to install dependencies. Then, run `rake test` to run the tests. You can also run `bin/console` for an interactive prompt that will allow you to experiment. ## Contributing Bug reports and pull requests are welcome on GitHub at https://gitlab.com/os85/tobox.