Sha256: f2d7abe657a4fd881e119439e838d85c8a3171fbce1e14a14df9d3a7f799d6b8

Contents?: true

Size: 1.69 KB

Versions: 7

Compression:

Stored size: 1.69 KB

Contents

# frozen_string_literal: true

module Ductr
  module ETL
    #
    # Base class to implement buffered transforms.
    #
    class BufferedTransform < Transform
      # @return [Array] The row buffer
      attr_reader :buffer

      #
      # The buffer size option, default to 10_000.
      #
      # @return [Integer] The buffer size
      #
      def buffer_size
        @options[:buffer_size] || 10_000
      end

      #
      # Pushes the row inside the buffer or flushes it when full.
      #
      # @param [Object] row The row to process
      # @yield [row] The row yielder
      #
      # @return [nil] Returning nil to complies with kiba
      #
      def process(row, &)
        @buffer ||= []

        @buffer.push row
        flush_buffer(&) if @buffer.size == buffer_size

        # avoid returning a row, see
        # https://github.com/thbar/kiba/wiki/Implementing-ETL-transforms#generating-more-than-one-output-row-per-input-row-aka-yielding-transforms
        nil
      end

      #
      # Called when the last row is reached.
      #
      # @yield [row] The row yielder
      #
      # @return [void]
      #
      def close(&)
        flush_buffer(&) unless @buffer.empty?
        super
      end

      #
      # Calls #on_flush and reset the buffer.
      #
      # @yield [row] The row yielder
      #
      # @return [void]
      #
      def flush_buffer(&)
        on_flush(&)
        @buffer = []
      end

      #
      # Called each time the buffer have to be emptied.
      #
      # @yield [row] The row yielder
      #
      # @return [void]
      #
      def on_flush(&)
        raise NotImplementedError, "A buffered transform must implement the `#on_flush` method"
      end
    end
  end
end

Version data entries

7 entries across 7 versions & 1 rubygems

Version Path
ductr-0.2.3 lib/ductr/etl/controls/buffered_transform.rb
ductr-0.2.2 lib/ductr/etl/controls/buffered_transform.rb
ductr-0.2.1 lib/ductr/etl/controls/buffered_transform.rb
ductr-0.2.0 lib/ductr/etl/controls/buffered_transform.rb
ductr-0.1.2 lib/ductr/etl/controls/buffered_transform.rb
ductr-0.1.1 lib/ductr/etl/controls/buffered_transform.rb
ductr-0.1.0 lib/ductr/etl/controls/buffered_transform.rb