Sha256: 35191a5cafebdead5a19dc5f885dbe56bc2561f6e934e641c585c366c725c44d

Contents?: true

Size: 1.24 KB

Versions: 1

Compression:

Stored size: 1.24 KB

Contents

require_relative "blocked_pipe"

module Plumbing
  # A pipe that filters events from a source pipe
  class Filter < BlockedPipe
    class InvalidFilter < Error; end

    # Chain this pipe to the source pipe
    # @param source [Plumbing::BlockedPipe]
    # @param accepts [Array[String]] event types that this filter will allow through (or pass [] to allow all)
    # @param rejects [Array[String]] event types that this filter will not allow through
    def initialize source:, accepts: [], rejects: []
      super()
      raise InvalidFilter.new "source must be a Plumbing::BlockedPipe descendant" unless source.is_a? Plumbing::BlockedPipe
      raise InvalidFilter.new "accepts and rejects must be arrays" unless accepts.is_a?(Array) && rejects.is_a?(Array)
      @accepted_event_types = accepts
      @rejected_event_types = rejects
      source.add_observer do |event|
        filter_and_republish(event)
      end
    end

    private

    def filter_and_republish event
      raise InvalidEvent.new "event is not a Plumbing::Event" unless event.is_a? Plumbing::Event
      return nil if @accepted_event_types.any? && !@accepted_event_types.include?(event.type)
      return nil if @rejected_event_types.include? event.type
      dispatch event
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
standard-procedure-plumbing-0.1.2 lib/plumbing/filter.rb