Sha256: c85c54f22fdda33a0b2cc9eeda8b555de6bf91947e9d14eee1e890bdf7d518d1
Contents?: true
Size: 1.92 KB
Versions: 4
Compression:
Stored size: 1.92 KB
Contents
module ConcurrentStream attr_accessor :threads, :pids, :callback, :abort_callback, :filename, :joined def self.setup(stream, options = {}, &block) threads, pids, callback, filename = Misc.process_options options, :threads, :pids, :callback, :filename stream.extend ConcurrentStream unless ConcurrentStream === stream stream.threads ||= [] stream.pids ||= [] stream.threads.concat(Array === threads ? threads : [threads]) unless threads.nil? stream.pids.concat(Array === pids ? pids : [pids]) unless pids.nil? or pids.empty? callback = block if block_given? if stream.callback and callback old_callback = stream.callback stream.callback = Proc.new do old_callback.call callback.call end else stream.callback = callback end stream.filename = filename unless filename.nil? stream end def joined? @joined end def join_threads if @threads and @threads.any? @threads.each do |t| begin ensure t.join unless t == Thread.current end end @threads = [] end end def join_pids if @pids and @pids.any? @pids.each do |pid| begin Process.waitpid(pid, Process::WUNTRACED) raise "Error joining process #{pid} in #{self.inspect}" unless $?.success? rescue Errno::ECHILD end end @pids = [] end end def join_callback if @callback and not joined? @callback.call @callback = nil end end def join join_threads join_pids join_callback @joined = true end def abort_threads @threads.each{|t| t.raise Aborted.new unless t == Thread.current } if @threads end def abort_pids @pids.each{|pid| Process.kill :INT, pid } if @pids end def abort abort_threads abort_pids @abort_callback.call if @abort_callback @abort_callback = nil @callback = nil end end
Version data entries
4 entries across 4 versions & 1 rubygems