Sha256: 763ef4f9bb5d2c927205657b4585d35769064232d223f688dd33550e8371057d
Contents?: true
Size: 1010 Bytes
Versions: 3
Compression:
Stored size: 1010 Bytes
Contents
# frozen_string_literal: true class Async::Bus include Async::Logger # A tiny wrapper around ac ActiveSupport::Notifications # BLOCKING unless subscribers run in tasks def publish(name, *args, **params) ActiveSupport::Notifications.instrument(name, payload: (args.first || params)) rescue StandardError => e log_error(name, e) end # NON-BLOCKING def subscribe(pattern) ActiveSupport::Notifications.subscribe(pattern) do |name, _start, _finish, _id, params| yield params[:payload], name end end # NON-BLOCKING, runs subscriber in a task def async_subscribe(pattern, parent: Async::Task.current) subscribe(pattern) do |event| parent.async do yield(event) rescue StandardError => e log_error(pattern, e) end end end def convert(from_event, to_event) = subscribe(from_event) { publish(to_event, **yield(_1)) } private def log_error(name, e) = warn("Subscriber for #{name.inspect} failed with exception.", e) end
Version data entries
3 entries across 3 versions & 1 rubygems
Version | Path |
---|---|
async-tools-0.2.10 | lib/async/bus.rb |
async-tools-0.2.9 | lib/async/bus.rb |
async-tools-0.2.8 | lib/async/bus.rb |