Sha256: 21d9fb3e953ae2cd53ae3c981c26119ce99604ec255ba3b77c597a31d131e1b2

Contents?: true

Size: 1.57 KB

Versions: 3

Compression:

Stored size: 1.57 KB

Contents

# frozen_string_literal: true

module MultiProcess
  # Can handle input from multiple processes and run custom
  # actions on event and output.
  #
  class Receiver
    # Request a new pipe writer for given process and name.
    #
    # @param process [ Process ] Process requesting pipe.
    # @param name [ Symbol ] Name associated to pipe e.g.
    #   `:out` or `:err`.
    #
    def pipe(process, name)
      reader, writer = IO.pipe

      Loop.instance.watch(reader) do |action, monitor|
        case action
          when :registered
            connected(process, name)
          when :ready
            received(process, name, read(monitor.io))
          when :eof
            removed(process, name)
        end
      end

      writer
    end

    # Send a custom messages.
    #
    def message(process, name, message)
      received process, name, message
    end

    protected

    # Will be called when content is received for given
    # process and name.
    #
    # Must be overridden by subclass.
    #
    def received(_process, _name, _message)
      raise NotImplementedError.new 'Subclass responsibility.'
    end

    # Read content from pipe. Can be used to provide custom reading
    # like reading lines instead of byte ranges.
    #
    # Should be non blocking.
    #
    def read(reader)
      reader.read_nonblock 4096
    end

    # Called after pipe for process and name was removed because it
    # reached EOF.
    #
    def removed(_process, _name); end

    # Called after new pipe for process and name was created.
    #
    def connected(_process, _name); end
  end
end

Version data entries

3 entries across 3 versions & 1 rubygems

Version Path
multi_process-1.3.0 lib/multi_process/receiver.rb
multi_process-1.2.1 lib/multi_process/receiver.rb
multi_process-1.2.0 lib/multi_process/receiver.rb