Sha256: b1fcdab7d55c6b497e1bd038a818dbd9a22a742772438b9ee9ec079239a283d4

Contents?: true

Size: 1.9 KB

Versions: 2

Compression:

Stored size: 1.9 KB

Contents

class RFlow
  # Components.
  module Components
    # Component that filters messages based on Ruby defined in the RFlow config file.
    # Inbound messages will be sent out {filtered} if the predicate returns truthy,
    # {dropped} if it returns falsey, or {errored} if it raises an exception.
    #
    # Accept config parameter +filter_proc_string+ which is the text of a +lambda+
    # receiving a message +message+. For example, +message.data.data_object['foo'] > 2+.
    class RubyProcFilter < Component
      # @!attribute [r] in
      #   Receives {RFlow::Message}s.
      #   @return [Component::InputPort]
      input_port :in
      # @!attribute [r] filtered
      #   Outputs {RFlow::Message}s that pass the filter predicate.
      #   @return [Component::OutputPort]
      output_port :filtered
      # @!attribute [r] dropped
      #   Outputs {RFlow::Message}s that do not pass the filter predicate.
      #   @return [Component::OutputPort]
      output_port :dropped
      # @!attribute [r] errored
      #   Outputs {RFlow::Message}s that raise from the filter predicate.
      #   @return [Component::OutputPort]
      output_port :errored

      # RFlow-called method at startup.
      # @param config [Hash] configuration from the RFlow config file
      # @return [void]
      def configure!(config)
        @filter_proc = eval("lambda {|message| #{config['filter_proc_string']} }")
      end

      # RFlow-called method on message arrival.
      # @return [void]
      def process_message(input_port, input_port_key, connection, message)
        begin
          if @filter_proc.call(message)
            filtered.send_message message
          else
            dropped.send_message message
          end
        rescue Exception => e
          RFlow.logger.debug "#{self.class} Message caused exception: #{e.class}: #{e.message}: #{e.backtrace}"
          errored.send_message message
        end
      end
    end
  end
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
rflow-1.3.2 lib/rflow/components/ruby_proc_filter.rb
rflow-1.3.1 lib/rflow/components/ruby_proc_filter.rb