/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package thrift import ( "bufio" "context" "io" ) // StreamTransport is a Transport made of an io.Reader and/or an io.Writer type StreamTransport struct { io.Reader io.Writer isReadWriter bool closed bool } type StreamTransportFactory struct { Reader io.Reader Writer io.Writer isReadWriter bool } func (p *StreamTransportFactory) GetTransport(trans TTransport) (TTransport, error) { if trans != nil { t, ok := trans.(*StreamTransport) if ok { if t.isReadWriter { return NewStreamTransportRW(t.Reader.(io.ReadWriter)), nil } if t.Reader != nil && t.Writer != nil { return NewStreamTransport(t.Reader, t.Writer), nil } if t.Reader != nil && t.Writer == nil { return NewStreamTransportR(t.Reader), nil } if t.Reader == nil && t.Writer != nil { return NewStreamTransportW(t.Writer), nil } return &StreamTransport{}, nil } } if p.isReadWriter { return NewStreamTransportRW(p.Reader.(io.ReadWriter)), nil } if p.Reader != nil && p.Writer != nil { return NewStreamTransport(p.Reader, p.Writer), nil } if p.Reader != nil && p.Writer == nil { return NewStreamTransportR(p.Reader), nil } if p.Reader == nil && p.Writer != nil { return NewStreamTransportW(p.Writer), nil } return &StreamTransport{}, nil } func NewStreamTransportFactory(reader io.Reader, writer io.Writer, isReadWriter bool) *StreamTransportFactory { return &StreamTransportFactory{Reader: reader, Writer: writer, isReadWriter: isReadWriter} } func NewStreamTransport(r io.Reader, w io.Writer) *StreamTransport { return &StreamTransport{Reader: bufio.NewReader(r), Writer: bufio.NewWriter(w)} } func NewStreamTransportR(r io.Reader) *StreamTransport { return &StreamTransport{Reader: bufio.NewReader(r)} } func NewStreamTransportW(w io.Writer) *StreamTransport { return &StreamTransport{Writer: bufio.NewWriter(w)} } func NewStreamTransportRW(rw io.ReadWriter) *StreamTransport { bufrw := bufio.NewReadWriter(bufio.NewReader(rw), bufio.NewWriter(rw)) return &StreamTransport{Reader: bufrw, Writer: bufrw, isReadWriter: true} } func (p *StreamTransport) IsOpen() bool { return !p.closed } // implicitly opened on creation, can't be reopened once closed func (p *StreamTransport) Open() error { if !p.closed { return NewTTransportException(ALREADY_OPEN, "StreamTransport already open.") } else { return NewTTransportException(NOT_OPEN, "cannot reopen StreamTransport.") } } // Closes both the input and output streams. func (p *StreamTransport) Close() error { if p.closed { return NewTTransportException(NOT_OPEN, "StreamTransport already closed.") } p.closed = true closedReader := false if p.Reader != nil { c, ok := p.Reader.(io.Closer) if ok { e := c.Close() closedReader = true if e != nil { return e } } p.Reader = nil } if p.Writer != nil && (!closedReader || !p.isReadWriter) { c, ok := p.Writer.(io.Closer) if ok { e := c.Close() if e != nil { return e } } p.Writer = nil } return nil } // Flushes the underlying output stream if not null. func (p *StreamTransport) Flush(ctx context.Context) error { if p.Writer == nil { return NewTTransportException(NOT_OPEN, "Cannot flush null outputStream") } f, ok := p.Writer.(Flusher) if ok { err := f.Flush() if err != nil { return NewTTransportExceptionFromError(err) } } return nil } func (p *StreamTransport) Read(c []byte) (n int, err error) { n, err = p.Reader.Read(c) if err != nil { err = NewTTransportExceptionFromError(err) } return } func (p *StreamTransport) ReadByte() (c byte, err error) { f, ok := p.Reader.(io.ByteReader) if ok { c, err = f.ReadByte() } else { c, err = readByte(p.Reader) } if err != nil { err = NewTTransportExceptionFromError(err) } return } func (p *StreamTransport) Write(c []byte) (n int, err error) { n, err = p.Writer.Write(c) if err != nil { err = NewTTransportExceptionFromError(err) } return } func (p *StreamTransport) WriteByte(c byte) (err error) { f, ok := p.Writer.(io.ByteWriter) if ok { err = f.WriteByte(c) } else { err = writeByte(p.Writer, c) } if err != nil { err = NewTTransportExceptionFromError(err) } return } func (p *StreamTransport) WriteString(s string) (n int, err error) { f, ok := p.Writer.(stringWriter) if ok { n, err = f.WriteString(s) } else { n, err = p.Writer.Write([]byte(s)) } if err != nil { err = NewTTransportExceptionFromError(err) } return } func (p *StreamTransport) RemainingBytes() (num_bytes uint64) { const maxSize = ^uint64(0) return maxSize // the truth is, we just don't know unless framed is used } // SetTConfiguration implements TConfigurationSetter for propagation. func (p *StreamTransport) SetTConfiguration(conf *TConfiguration) { PropagateTConfiguration(p.Reader, conf) PropagateTConfiguration(p.Writer, conf) } var _ TConfigurationSetter = (*StreamTransport)(nil)