Sha256: 595e2184c90ee5f223236ef99f24bf0398cd30610b526ce70f5aa57c1308540c

Contents?: true

Size: 1.96 KB

Versions: 31

Compression:

Stored size: 1.96 KB

Contents

module HybridPlatformsConductor

  # Simple router of IO and queue streams from som inputs to outputs, asynchronous
  class IoRouter

    # Create an IO router and make sure it is freed when client code has finished
    #
    # Parameters::
    # * *routes* (Hash<IO or Queue, Array<IO> >):  List of destination IOs that should receive content per source IO.
    # * Proc: Client code
    def self.with_io_router(routes)
      io_router = IoRouter.new(routes)
      begin
        io_router.start
        yield
      ensure
        io_router.stop
      end
    end

    # Constructor
    #
    # Parameters::
    # * *routes* (Hash<IO or Queue, Array<IO> >):  List of destination IOs that should receive content per source IO.
    def initialize(routes)
      @routes = routes
      @reading_thread = nil
    end

    # Start routing messages asynchronously
    def start
      raise 'IO router is already started. Can\'t start it again.' unless @reading_thread.nil?

      @end_read = false
      # Create a thread to handle routes asynchronously
      @reading_thread = Thread.new do
        loop do
          need_to_stop = @end_read.clone
          data_found = false
          @routes.each do |src_io, dst_ios|
            raise "Unknown type of source IO: #{src_io}" unless src_io.is_a?(Queue)

            queue_size = src_io.size
            next unless queue_size.positive?

            # There is data to be read from src_io
            data_found = true
            data_chunk_str = queue_size.times.map { src_io.pop }.join
            dst_ios.each do |dst_io|
              dst_io << data_chunk_str
              dst_io.flush if dst_io.respond_to?(:flush)
            end
          end
          break if need_to_stop && !data_found

          sleep 0.1
        end
      end
    end

    # Stop routing messages asynchronously
    def stop
      raise 'IO router is not started. Can\'t stop it.' if @reading_thread.nil?

      @end_read = true
      @reading_thread.join
    end

  end

end

Version data entries

31 entries across 31 versions & 1 rubygems

Version Path
hybrid_platforms_conductor-33.9.5 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.9.4 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.9.2 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.9.1 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.9.0 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.8.4 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.8.3 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.8.2 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.8.1 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.8.0 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.7.4 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.7.3 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.7.2 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.7.1 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.7.0 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.6.0 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.5.1 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.5.0 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.4.0 lib/hybrid_platforms_conductor/io_router.rb
hybrid_platforms_conductor-33.3.0 lib/hybrid_platforms_conductor/io_router.rb