Sha256: 469a53b1a28c813d4f98c3e8f2c3fd55b6060390dd388a886c6c74a759edeed6

Contents?: true

Size: 1001 Bytes

Versions: 3

Compression:

Stored size: 1001 Bytes

Contents

# frozen_string_literal: true

class Async::Bus
  include Async::Logger
  # A tiny wrapper around ac ActiveSupport::Notifications

  # BLOCKING unless subscribers run in tasks
  def publish(name, *args, **params)
    ActiveSupport::Notifications.instrument(name, payload: (args.first || params))
  rescue StandardError => e
    log_error(name, e)
  end

  # NON-BLOCKING
  def subscribe(pattern)
    ActiveSupport::Notifications.subscribe(pattern) do |name, _start, _finish, _id, params|
      yield params[:payload], name
    end
  end

  # NON-BLOCKING, runs subscriber in a task
  def async_subscribe(name, parent: Async::Task.current)
    subscribe(name) do |event|
      parent.async  do
        yield(event)
      rescue StandardError => e
        log_error(name, e)
      end
    end
  end

  def convert(from_event, to_event) = subscribe(from_event) { publish(to_event, **yield(_1)) }

  private

  def log_error(name, e) = warn("Subscriber for #{name.inspect} failed with exception.", e)
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
async-tools-0.2.7 lib/async/bus.rb
async-tools-0.2.6 lib/async/bus.rb
async-tools-0.2.5 lib/async/bus.rb