Sha256: 469a53b1a28c813d4f98c3e8f2c3fd55b6060390dd388a886c6c74a759edeed6
Contents?: true
Size: 1001 Bytes
Versions: 3
Compression:
Stored size: 1001 Bytes
Contents
# frozen_string_literal: true class Async::Bus include Async::Logger # A tiny wrapper around ac ActiveSupport::Notifications # BLOCKING unless subscribers run in tasks def publish(name, *args, **params) ActiveSupport::Notifications.instrument(name, payload: (args.first || params)) rescue StandardError => e log_error(name, e) end # NON-BLOCKING def subscribe(pattern) ActiveSupport::Notifications.subscribe(pattern) do |name, _start, _finish, _id, params| yield params[:payload], name end end # NON-BLOCKING, runs subscriber in a task def async_subscribe(name, parent: Async::Task.current) subscribe(name) do |event| parent.async do yield(event) rescue StandardError => e log_error(name, e) end end end def convert(from_event, to_event) = subscribe(from_event) { publish(to_event, **yield(_1)) } private def log_error(name, e) = warn("Subscriber for #{name.inspect} failed with exception.", e) end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
async-tools-0.2.7 | lib/async/bus.rb |
async-tools-0.2.6 | lib/async/bus.rb |
async-tools-0.2.5 | lib/async/bus.rb |