Sha256: 0b7ced4da849a5b277d35204e372e24a92d354348666fcd44561d811aa246635

Contents?: true

Size: 1.92 KB

Versions: 23

Compression:

Stored size: 1.92 KB

Contents

# frozen_string_literal: true

module Mutant
  # Pipe abstraction
  class Pipe
    include Adamantium, Anima.new(:reader, :writer)

    # Run block with pipe in binmode
    #
    # @return [undefined]
    def self.with(io)
      io.pipe(binmode: true) do |(reader, writer)|
        yield new(reader: reader, writer: writer)
      end
    end

    def self.from_io(io)
      reader, writer = io.pipe(binmode: true)
      new(reader: reader, writer: writer)
    end

    # Writer end of the pipe
    #
    # @return [IO]
    def to_writer
      reader.close
      writer
    end

    # Parent reader end of the pipe
    #
    # @return [IO]
    def to_reader
      writer.close
      reader
    end

    class Connection
      include Anima.new(:marshal, :reader, :writer)

      Error = Class.new(RuntimeError)

      class Frame
        include Concord.new(:io)

        HEADER_FORMAT = 'N'
        MAX_BYTES     = (2**32).pred
        HEADER_SIZE   = 4

        def receive_value
          header = read(HEADER_SIZE)
          read(Util.one(header.unpack(HEADER_FORMAT)))
        end

        def send_value(body)
          bytesize = body.bytesize

          fail Error, 'message to big' if bytesize > MAX_BYTES

          io.binmode
          io.write([bytesize].pack(HEADER_FORMAT))
          io.write(body)
        end

      private

        def read(bytes)
          io.binmode
          io.read(bytes) or fail Error, 'Unexpected EOF'
        end
      end

      def call(payload)
        send_value(payload)
        receive_value
      end

      def receive_value
        marshal.load(reader.receive_value)
      end

      def send_value(value)
        writer.send_value(marshal.dump(value))
        self
      end

      def self.from_pipes(marshal:, reader:, writer:)
        new(
          marshal: marshal,
          reader:  Frame.new(reader.to_reader),
          writer:  Frame.new(writer.to_writer)
        )
      end
    end
  end # Pipe
end # Mutant

Version data entries

23 entries across 23 versions & 1 rubygems

Version Path
mutant-0.11.18 lib/mutant/pipe.rb
mutant-0.11.17 lib/mutant/pipe.rb
mutant-0.11.16 lib/mutant/pipe.rb
mutant-0.11.15 lib/mutant/pipe.rb
mutant-0.11.14 lib/mutant/pipe.rb
mutant-0.11.13 lib/mutant/pipe.rb
mutant-0.11.12 lib/mutant/pipe.rb
mutant-0.11.11 lib/mutant/pipe.rb
mutant-0.11.10 lib/mutant/pipe.rb
mutant-0.11.9 lib/mutant/pipe.rb
mutant-0.11.8 lib/mutant/pipe.rb
mutant-0.11.7 lib/mutant/pipe.rb
mutant-0.11.6 lib/mutant/pipe.rb
mutant-0.11.5 lib/mutant/pipe.rb
mutant-0.11.4 lib/mutant/pipe.rb
mutant-0.11.3 lib/mutant/pipe.rb
mutant-0.11.2 lib/mutant/pipe.rb
mutant-0.11.1 lib/mutant/pipe.rb
mutant-0.11.0 lib/mutant/pipe.rb
mutant-0.10.35 lib/mutant/pipe.rb