Sha256: 511472e7682f65e3a543a9c82837fa85009d2a0021817cae9b6195016e17d37e

Contents?: true

Size: 880 Bytes

Versions: 1

Compression:

Stored size: 880 Bytes

Contents

require 'datapipes/composable'
require 'datapipes/source'
require 'datapipes/tube'
require 'datapipes/sink'
require 'datapipes/pipe'
require 'datapipes/version'

class Datapipes
  def initialize(source, tube, sink, pipe)
    @source = source
    @tube = tube
    @sink = sink
    @pipe = pipe

    Thread.abort_on_exception = true
    @flag = Queue.new
  end

  def run_resource
    @source.pipe = @pipe
    runners = @source.run_all

    consumer = run_comsumer
    runners.each(&:join)

    notify_resource_ending
    consumer.join if consumer.status == "run"
  end

  private

  def run_comsumer
    Thread.new do
      loop do
        break if resource_ended? && @pipe.empty?

        data = @tube.run_all(@pipe.pull)
        @sink.run_all(data)
      end
    end
  end

  def notify_resource_ending
    @flag.enq true
  end

  def resource_ended?
    !@flag.empty?
  end
end

Version data entries

1 entries across 1 versions & 1 rubygems

Version Path
datapipes-0.0.2 lib/datapipes.rb