Sha256: 9937ef06410aa5ad772a50e7332f24a96156119839cd7d7b6599526dfa4fb89e

Contents?: true

Size: 1.31 KB

Versions: 1

Compression:

Stored size: 1.31 KB

Contents

module Aggro
  # Private: Handles invoking events on a subscriber object.
  class Subscription
    attr_reader :caught_up

    def initialize(topic, subscriber, namespace, filters, at_version)
      @topic = topic
      @subscriber = subscriber
      @namespace = namespace
      @filters = filters
      @at_version = at_version
      @canceled = false
    end

    def cancel
      Aggro.event_bus.unsubscribe @topic, self unless @canceled
      @canceled = true
    end

    def handle_event(event)
      return if @canceled

      invoke(event) if handles_event?(event) && matches_filter?(event)
    end

    def notify_subscription_caught_up
      @caught_up = true

      return unless @subscriber.handles_event? :caught_up, @namespace

      @subscriber.send "#{@namespace}_caught_up"
    end

    private

    def expand_event(event)
      event.details.merge now: event.occured_at, today: event.occured_at.to_date
    end

    def handles_event?(event)
      @subscriber.handles_event? event.name, @namespace
    end

    def invoke(event)
      Invokr.invoke method: "#{@namespace}_#{event.name}", on: @subscriber,
                    using: expand_event(event)
    end

    def matches_filter?(event)
      @filters.all? do |filter_key, filter_value|
        event.details[filter_key] == filter_value
      end
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
aggro-0.0.4 lib/aggro/subscription.rb