Sha256: 14b7178051ac0d845f6fc3aa48c8da907a52596effa29848d3d61742cf9ef37c

Contents?: true

Size: 911 Bytes

Versions: 2

Compression:

Stored size: 911 Bytes

Contents

require 'parallel'
require 'datapipes/composable'
require 'datapipes/source'
require 'datapipes/tube'
require 'datapipes/sink'
require 'datapipes/pipe'
require 'datapipes/version'

Thread.abort_on_exception = true

class Datapipes
  def initialize(source, sink, tube: Tube.new, pipe: Pipe.new)
    @source = source
    @tube = tube
    @sink = sink
    @pipe = pipe
  end

  def run_resource
    @source.pipe = @pipe
    runners = @source.run_all

    consumer = run_comsumer
    runners.each(&:join)

    notify_resource_ending
    consumer.join
  end

  private

  def run_comsumer
    Thread.new do
      loop do
        data = @pipe.pull
        break if resource_ended?(data)

        @sink.run_all(@tube.run(data))
      end
    end
  end

  def notify_resource_ending
    @pipe.recieve Notification.new
  end

  def resource_ended?(data)
    data.is_a? Notification
  end

  Notification = Class.new
end

Version data entries

2 entries across 2 versions & 1 rubygems

Version Path
datapipes-0.1.3 lib/datapipes.rb
datapipes-0.1.2 lib/datapipes.rb