# frozen_string_literal: true module Ductr module ETL # # Glues ETL controls and the associated fibers together. # class FiberControl extend Forwardable # # @!method resume # Resumes the control's fiber. # @param [Object] row The row to pass to right fiber controls # @return [void] def_delegators :fiber, :resume # @return [Array<FiberControl>] The next fiber controls attr_accessor :right # @return [Control] The ETL control instance attr_reader :control # # Creates a new fiber control with the given control and control type. # # @param [Control] control The ETL control to work with in the fiber # @param [Symbol] type The ETL control type, one of [:source, :transform, :destination] # def initialize(control, type:) @control = control @type = type @right = [] end # # Memoizes the fiber to be associated with the ETL control based on its type. # # @return [Fiber] The fiber in charge of executing the control's logic # def fiber @fiber ||= send(@type) end private # # Creates the fiber to run ETL sources. # # @return [Fiber] # def source Fiber.new do control.each do |row| resume_right_fibers(row) end resume_right_fibers(:end) end end # # Creates the fiber to run ETL transforms. # # @return [Fiber] # def transform resume_control(Fiber.new do loop do row_in = Fiber.yield next close_transform if row_in == :end row_out = control.process(row_in) do |r| resume_right_fibers(r) end resume_right_fibers(row_out) if row_out end end) end # # Creates the fiber to run ETL Destinations. # # @return [Fiber] # def destination resume_control(Fiber.new do loop do row = Fiber.yield next control.close if row == :end control.write(row) end end) end # # Call #close on control, resume resulting rows then ends following fibers. # # @return [void] # def close_transform control.close do |row| resume_right_fibers(row) end resume_right_fibers(:end) end # # Resumes all fibers at the right of the current one. # # @param [Object] row The row to pass to the next fibers # # @return [void] # def resume_right_fibers(row) right.each do |fiber| fiber.resume(row) end end # # Resumes the given fiber and returns it. # # @param [Fiber] fiber The fiber to resume # # @return [Fiber] The resumed fiber # def resume_control(fiber) fiber.resume fiber end end end end