Sha256: 9937ef06410aa5ad772a50e7332f24a96156119839cd7d7b6599526dfa4fb89e
Contents?: true
Size: 1.31 KB
Versions: 1
Compression:
Stored size: 1.31 KB
Contents
module Aggro # Private: Handles invoking events on a subscriber object. class Subscription attr_reader :caught_up def initialize(topic, subscriber, namespace, filters, at_version) @topic = topic @subscriber = subscriber @namespace = namespace @filters = filters @at_version = at_version @canceled = false end def cancel Aggro.event_bus.unsubscribe @topic, self unless @canceled @canceled = true end def handle_event(event) return if @canceled invoke(event) if handles_event?(event) && matches_filter?(event) end def notify_subscription_caught_up @caught_up = true return unless @subscriber.handles_event? :caught_up, @namespace @subscriber.send "#{@namespace}_caught_up" end private def expand_event(event) event.details.merge now: event.occured_at, today: event.occured_at.to_date end def handles_event?(event) @subscriber.handles_event? event.name, @namespace end def invoke(event) Invokr.invoke method: "#{@namespace}_#{event.name}", on: @subscriber, using: expand_event(event) end def matches_filter?(event) @filters.all? do |filter_key, filter_value| event.details[filter_key] == filter_value end end end end
Version data entries
1 entries across 1 versions & 1 rubygems
Version | Path |
---|---|
aggro-0.0.4 | lib/aggro/subscription.rb |