lib/wukong/driver.rb in wukong-3.0.1 vs lib/wukong/driver.rb in wukong-4.0.0
- old
+ new
@@ -1,103 +1,214 @@
module Wukong
+ # A Driver is a class including the DriverMethods module which
+ # connects a Dataflow or Processor to the external world of inputs
+ # and outputs.
+ #
+ # @example Minimal Driver class
+ #
+ # class MinimalDriver
+ # include Wukong::DriverMethods
+ # def initialize(label, settings)
+ # construct_dataflow(label, settings)
+ # end
+ # def process record
+ # puts record
+ # end
+ # end
+ #
+ # The MinimalDriver#send_through_dataflow method can be called on an
+ # instance of MinimalDriver with any input record.
+ #
+ # This record will be passed through the dataflow, starting from its
+ # root, and each record yielded at the leaves of the dataflow will
+ # be passed to the driver's #process method.
+ #
+ # The #process method of an implementing driver should *not* yield,
+ # unlike the process method of a Processor class. Instead, it
+ # should treat its argument as an output of the dataflow and do
+ # something appropriate to the driver (write to file, database,
+ # terminal, &c.).
+ #
+ # Drivers are also responsible for implementing the lifecycle of
+ # processors and dataflows they drive. A more complete version of
+ # the above driver class would:
+ #
+ # * call the #setup_dataflow method when ready to trigger the
+ # Processor#setup method on each processor in the dataflow
+ #
+ # * call the #finalize_dataflow method when indicating that the
+ # dataflow should consider a batch of records complete
+ #
+ # * call the #finalize_and_stop_dataflow method to indicate the
+ # last batch of records and to trigger the Processor#stop method
+ # on each processor in the dataflow
+ #
+ # Driver instances are started by Runners which should delegate to
+ # the `start` method driver class itself.
+ #
+ # @see Wukong::Local::StdioDriver for a complete example of a driver.
+ # @see Wukong::Local::Runner for an example of how runners call drivers.
module DriverMethods
- attr_accessor :dataflow
+ attr_accessor :label
attr_accessor :settings
+ attr_accessor :dataflow
- def driver
- @driver ||=
+ # Classes including DriverMethods should override this method with
+ # some way of handling the `output_record` that is appropriate for
+ # the driver.
+ #
+ # @param [Object] output_record
+ def process output_record
+ raise"Define the #{self.class}#process method to handle output records from the dataflow")
- def lookup(label)
- raise"could not find definition for <#{label}>") unless Wukong.registry.registered?(label.to_sym)
- Wukong.registry.retrieve(label.to_sym)
+ # Construct a dataflow from the given `label` and `settings`.
+ #
+ # This method does **not** cause Processor#setup to be called on
+ # any of the processors in the dataflow. Call the #setup_dataflow
+ # method to explicitly have setup occur. This distinction is
+ # useful for drivers which themselves need to do complex
+ # initialization before letting processors in the dataflow
+ # initialize.
+ #
+ # @param [Symbol] label the name of the dataflow (or processor) to build
+ # @param [Hash] settings
+ # @param settings [String] :to Serialize all output via the named serializer (json, tsv)
+ # @param settings [String] :from Deserialize all input via the named deserializer (json, tsv)
+ # @param settings [String] :as Recordize each input as instances of the given class
+ #
+ # @see #setup_dataflow
+ def construct_dataflow(label, settings={})
+ self.label = label
+ self.settings = settings
+ prepend(:recordize) if settings[:as]
+ prepend("from_#{settings[:from]}".to_sym) if settings[:from]
+ append("to_#{settings[:to]}".to_sym) if settings[:to]
+ build_dataflow
- def lookup_and_build(label, options = {})
- lookup(label).build(options)
+ # Set up this driver. Called before setting up any of the
+ # dataflow stages.
+ def setup
- def build_serializer(direction, label, options)
- lookup_and_build("#{direction}_#{label}", options)
- end
- def add_serialization(dataflow, direction, label, options)
- case direction
- when :to then dataflow.push build_serializer(direction, label, options)
- when :from then dataflow.unshift build_serializer(direction, label, options)
+ # Walks the dataflow and calls Processor#setup on each of the
+ # processors.
+ def setup_dataflow
+ setup
+ dataflow.each_stage do |stage|
+ stage.setup
- def setup_dataflow
- dataflow.each(&:setup)
+ # Send the given `record` through the dataflow.
+ #
+ # @param [Object] record
+ def send_through_dataflow(record)
+ wiring.start_with(dataflow.root).call(record)
+ # Perform finalization code for this driver. Runs after #setup
+ # and before #stop.
+ def finalize
+ end
+ # Indicate a full batch of records has already been sent through
+ # and any batch-oriented or accumulative operations should trigger
+ # (e.g. - counting).
+ #
+ # Walks the dataflow calling Processor#finalize on each processor.
+ #
+ # On the *last* batch, the #finalize_and_stop_dataflow method
+ # should be called instead.
+ #
+ # @see #finalize_and_stop_dataflow
def finalize_dataflow
- dataflow.each do |stage|
- stage.finalize(&driver.advance(stage)) if stage.respond_to?(:finalize)
+ finalize
+ dataflow.each_stage do |stage|
+ stage.finalize(&wiring.advance(stage))
+ # Works similar to #finalize_dataflow but calls Processor#stop
+ # after calling Processor#finalize on each processor.
def finalize_and_stop_dataflow
- dataflow.each do |stage|
- stage.finalize(&driver.advance(stage)) if stage.respond_to?(:finalize)
+ finalize
+ dataflow.each_stage do |stage|
+ stage.finalize(&wiring.advance(stage))
- end
+ end
+ stop
- # So pretty...
- def construct_dataflow(label, options)
- dataflow = lookup_and_build(label, options)
- dataflow = dataflow.respond_to?(:stages) ?{ |name| dataflow.stages[name] } : [ dataflow ]
- expected_input_model = (options[:consumes].constantize rescue nil) || dataflow.first.expected_record_type(:consumes)
- dataflow.unshift lookup_and_build(:recordize, model: expected_input_model) if expected_input_model
- expected_output_model = (options[:produces].constantize rescue nil) || dataflow.first.expected_record_type(:produces)
- dataflow.push lookup_and_build(:recordize, model: expected_output_model) if expected_output_model
- expected_input_serialization = options[:from] || dataflow.last.expected_serialization(:from)
- add_serialization(dataflow, :from, expected_input_serialization, options) if expected_input_serialization
- expected_output_serialization = options[:to] || dataflow.last.expected_serialization(:to)
- add_serialization(dataflow, :to, expected_output_serialization, options) if expected_output_serialization
- dataflow.push self
- end
- end
+ # Perform shutdown code for this driver. Called after #finalize
+ # and after all stages have been finalized and stopped.
+ def stop
+ end
- class Driver
- attr_accessor :dataflow
+ protected
- def initialize(dataflow)
- @dataflow = dataflow
+ # The builder for this driver's `label`, either for a Processor or
+ # a Dataflow.
+ #
+ # @return [Wukong::ProcessorBuilder, Wukong::DataflowBuilder]
+ def builder
+ return @builder if @builder
+ raise"could not find definition for <#{label}>") unless Wukong.registry.registered?(label.to_sym)
+ @builder = Wukong.registry.retrieve(label.to_sym)
- def to_proc
- return @wiring if @wiring
- @wiring = do |stage, record|
- stage.process(record, &advance(stage)) if stage
- end
+ # Return the builder for this driver's dataflow.
+ #
+ # Even if a Processor was originally named by this driver's
+ # `label`, a DataflowBuilder will be returned here. The
+ # DataflowBuilder is itself built from just the ProcessorBuilder
+ # alone.
+ #
+ # @return [Wukong::DataflowBuilder]
+ # @see #builder
+ def dataflow_builder
+ @dataflow_builder ||= (builder.is_a?(DataflowBuilder) ? builder : Wukong::DataflowBuilder.receive(for_class:, stages: {label.to_sym => builder}))
- def send_through_dataflow(record)
- start_with(dataflow.first).call(record)
+ # Build the dataflow using the #dataflow_builder and the supplied
+ # `settings`.
+ #
+ # @return [Wukong::Dataflow]
+ def build_dataflow
+ self.dataflow =
- def start_with(stage)
+ # Add the processor with the given `new_label` in front of this
+ # driver's dataflow, making it into the new root of the dataflow.
+ #
+ # @param [Symbol] new_label
+ def prepend new_label
+ raise"could not find processor <#{new_label}> to prepend") unless Wukong.registry.registered?(new_label)
+ dataflow_builder.prepend(Wukong.registry.retrieve(new_label))
- def advance(stage)
- next_stage = stage_iterator(stage)
- start_with(next_stage)
+ # Add the processor with the given `new_label` at the end of each
+ # of this driver's dataflow's leaves.
+ #
+ # @param [Symbol] new_label
+ def append new_label
+ raise"could not find processor <#{new_label}> to append") unless Wukong.registry.registered?(new_label)
+ dataflow_builder.append(Wukong.registry.retrieve(new_label))
- # This should properly be defined on dataflow/builder
- def stage_iterator(stage)
- position = dataflow.find_index(stage)
- dataflow[position + 1]
- end
- def call(*args)
+ # Returns the underlying Wiring object that will coordinate
+ # transfer of records from the driver to the dataflow and back to
+ # the driver.
+ #
+ # @return [Wiring]
+ def wiring
+ @wiring ||=, dataflow)