Sha256: 60e5a0a4d13061312b2829cdf21f7313951abb50391c1f52207da77018847b46

Contents?: true

Size: 1.72 KB

Versions: 1

Compression:

Stored size: 1.72 KB

Contents

require 'parallel'
require 'datapipes/composable'
require 'datapipes/source'
require 'datapipes/tube'
require 'datapipes/sink'
require 'datapipes/pipe'
require 'datapipes/version'

Thread.abort_on_exception = true

class Datapipes
  # Pass datapipes components instances.
  # Each component can be composed. See detail in examples.
  #
  # Pass parameters as a hash:
  #
  #   datapipe = Datapipes.new(source: my_source, sink: my_sink)
  #
  # Or pass all:
  #
  #   datapipe = Datapipes.new(
  #     source: my_source,
  #     sink: my_sink,
  #     tube: my_tube,
  #     pipe: my_pipe
  #   )
  #
  # All arguments are  optional. But in most case, you specify
  # source and sink.
  def initialize(args = {})
    @source, @sink, @tube, @pipe = *ArgParser.extract(args)
  end

  # Run sources, data flow via pipe, tubes and sinks work.
  # Everything work with just call this method.
  #
  # When all sources finished producing, and all sinks did their jobs,
  # this method returns.
  def run_resource
    @source.pipe = @pipe
    runners = @source.run_all

    sink = run_sink
    runners.each(&:join)

    notify_resource_ending
    sink.join
  end

  private

  def run_sink
    Thread.new do
      loop do
        data = @pipe.pour_out
        break if resource_ended?(data)

        @sink.run_all(@tube.run(data))
      end
    end
  end

  def notify_resource_ending
    @pipe.pour_in Notification.new
  end

  def resource_ended?(data)
    data.is_a? Notification
  end

  Notification = Class.new

  module ArgParser
    def self.extract(args)
      source = args[:source] || Source.new
      sink = args[:sink] || Sink.new
      tube = args[:tube] || Tube.new
      pipe = args[:pipe] || Pipe.new
      [source, sink, tube, pipe]
    end
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
datapipes-0.1.5 lib/datapipes.rb