Sha256: 54222056a51f8bceeea68de6dd164af31e8814bef7a4584249dc662b98d45780

Contents?: true

Size: 840 Bytes

Versions: 4

Compression:

Stored size: 840 Bytes

Contents

module Streamer
  # StreamBuilder takes a stream configuration and builds the stream defined
  # in it. StreamBuilder then performs the stream, given a document.
  class StreamBuilder
    attr_reader :config, :payload
    def initialize(config)
      @config = config
    end

    def process(payload)
      @payload = payload
      stream.payload['filter_value'] = filter_value
      transform if stream.payload['filter_value']
      stream
    end

    def transform
      config[:transform].each do |tx|
        tx.each do |k, v|
          stream.send(k, v)
        end
      end
    end

    def filter_value
      config[:filter].inject(true) do |val, f|
        val &&= stream.filter(function: f[:function]).payload['filter_value']
        val
      end
    end

    def stream
      @stream ||= Stream.new(payload)
    end
  end
end

Version data entries

4 entries across 4 versions & 1 rubygems

Version Path
streamer-0.2.1 lib/streamer/stream_builder.rb
streamer-0.2.0 lib/streamer/stream_builder.rb
streamer-0.1.1 lib/streamer/stream_builder.rb
streamer-0.1.0 lib/streamer/stream_builder.rb