Sha256: be45d5cca69e6c90ea30e87c42ab85ebfdc5e7401da3d6feb9607109d014d99d
Contents?: true
Size: 1021 Bytes
Versions: 2
Compression:
Stored size: 1021 Bytes
Contents
require 'datapipes/composable' require 'datapipes/source' require 'datapipes/tube' require 'datapipes/sink' require 'datapipes/pipe' require 'datapipes/version' class Datapipes def initialize(source, tube, sink, pipe) @source = source @tube = tube @sink = sink @pipe = pipe Thread.abort_on_exception = true @flag = Queue.new end def run_resource @source.pipe = @pipe runners = @source.run_all consumer = run_comsumer runners.each(&:join) notify_resource_ending graceful_down(consumer) end private def run_comsumer Thread.new do loop do break if resource_ended? && @pipe.empty? data = @tube.run_all(@pipe.pull) @sink.run_all(data) end Thread.current.kill end end def notify_resource_ending @flag.enq true Thread.pass end def resource_ended? !@flag.empty? end def graceful_down(consumer) sleep 0.1 consumer.kill if consumer.status == 'sleep' consumer.join end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
datapipes-0.0.4 | lib/datapipes.rb |
datapipes-0.0.3 | lib/datapipes.rb |