Sha256: 118ae1a75684d21d585eebd99a5b8ed39ad607952e155013af4c8ad112e927cc

Contents?: true

Size: 758 Bytes

Versions: 11

Compression:

Stored size: 758 Bytes

Contents

class RFlow
  module Components
    class RubyProcFilter < Component
      input_port :in
      output_port :filtered
      output_port :dropped
      output_port :errored

      def configure!(config)
        @filter_proc = eval("lambda {|message| #{config['filter_proc_string']} }")
      end

      def process_message(input_port, input_port_key, connection, message)
        begin
          if @filter_proc.call(message)
            filtered.send_message message
          else
            dropped.send_message message
          end
        rescue Exception => e
          RFlow.logger.debug "#{self.class} Message caused exception: #{e.class}: #{e.message}: #{e.backtrace}"
          errored.send_message message
        end
      end
    end
  end
end

Version data entries

11 entries across 11 versions & 1 rubygems

Version Path
rflow-1.3.0 lib/rflow/components/ruby_proc_filter.rb
rflow-1.3.0a1 lib/rflow/components/ruby_proc_filter.rb
rflow-1.2.0 lib/rflow/components/ruby_proc_filter.rb
rflow-1.1.0 lib/rflow/components/ruby_proc_filter.rb
rflow-1.0.1 lib/rflow/components/ruby_proc_filter.rb
rflow-1.0.0 lib/rflow/components/ruby_proc_filter.rb
rflow-1.0.0a6 lib/rflow/components/ruby_proc_filter.rb
rflow-1.0.0a5 lib/rflow/components/ruby_proc_filter.rb
rflow-1.0.0a4 lib/rflow/components/ruby_proc_filter.rb
rflow-1.0.0a3 lib/rflow/components/ruby_proc_filter.rb
rflow-1.0.0a2 lib/rflow/components/ruby_proc_filter.rb