/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package thrift import ( "context" "fmt" "strings" ) /* TMultiplexedProtocol is a protocol-independent concrete decorator that allows a Thrift client to communicate with a multiplexing Thrift server, by prepending the service name to the function name during function calls. NOTE: THIS IS NOT USED BY SERVERS. On the server, use TMultiplexedProcessor to handle request from a multiplexing client. This example uses a single socket transport to invoke two services: socket := thrift.NewTSocketFromAddrTimeout(addr, TIMEOUT) transport := thrift.NewTFramedTransport(socket) protocol := thrift.NewTBinaryProtocolTransport(transport) mp := thrift.NewTMultiplexedProtocol(protocol, "Calculator") service := Calculator.NewCalculatorClient(mp) mp2 := thrift.NewTMultiplexedProtocol(protocol, "WeatherReport") service2 := WeatherReport.NewWeatherReportClient(mp2) err := transport.Open() if err != nil { t.Fatal("Unable to open client socket", err) } fmt.Println(service.Add(2,2)) fmt.Println(service2.GetTemperature()) */ type TMultiplexedProtocol struct { TProtocol serviceName string } const MULTIPLEXED_SEPARATOR = ":" func NewTMultiplexedProtocol(protocol TProtocol, serviceName string) *TMultiplexedProtocol { return &TMultiplexedProtocol{ TProtocol: protocol, serviceName: serviceName, } } func (t *TMultiplexedProtocol) WriteMessageBegin(ctx context.Context, name string, typeId TMessageType, seqid int32) error { if typeId == CALL || typeId == ONEWAY { return t.TProtocol.WriteMessageBegin(ctx, t.serviceName+MULTIPLEXED_SEPARATOR+name, typeId, seqid) } else { return t.TProtocol.WriteMessageBegin(ctx, name, typeId, seqid) } } /* TMultiplexedProcessor is a TProcessor allowing a single TServer to provide multiple services. To do so, you instantiate the processor and then register additional processors with it, as shown in the following example: var processor = thrift.NewTMultiplexedProcessor() firstProcessor := processor.RegisterProcessor("FirstService", firstProcessor) processor.registerProcessor( "Calculator", Calculator.NewCalculatorProcessor(&CalculatorHandler{}), ) processor.registerProcessor( "WeatherReport", WeatherReport.NewWeatherReportProcessor(&WeatherReportHandler{}), ) serverTransport, err := thrift.NewTServerSocketTimeout(addr, TIMEOUT) if err != nil { t.Fatal("Unable to create server socket", err) } server := thrift.NewTSimpleServer2(processor, serverTransport) server.Serve(); */ type TMultiplexedProcessor struct { serviceProcessorMap map[string]TProcessor DefaultProcessor TProcessor } func NewTMultiplexedProcessor() *TMultiplexedProcessor { return &TMultiplexedProcessor{ serviceProcessorMap: make(map[string]TProcessor), } } // ProcessorMap returns a mapping of "{ProcessorName}{MULTIPLEXED_SEPARATOR}{FunctionName}" // to TProcessorFunction for any registered processors. If there is also a // DefaultProcessor, the keys for the methods on that processor will simply be // "{FunctionName}". If the TMultiplexedProcessor has both a DefaultProcessor and // other registered processors, then the keys will be a mix of both formats. // // The implementation differs with other TProcessors in that the map returned is // a new map, while most TProcessors just return their internal mapping directly. // This means that edits to the map returned by this implementation of ProcessorMap // will not affect the underlying mapping within the TMultiplexedProcessor. func (t *TMultiplexedProcessor) ProcessorMap() map[string]TProcessorFunction { processorFuncMap := make(map[string]TProcessorFunction) for name, processor := range t.serviceProcessorMap { for method, processorFunc := range processor.ProcessorMap() { processorFuncName := name + MULTIPLEXED_SEPARATOR + method processorFuncMap[processorFuncName] = processorFunc } } if t.DefaultProcessor != nil { for method, processorFunc := range t.DefaultProcessor.ProcessorMap() { processorFuncMap[method] = processorFunc } } return processorFuncMap } // AddToProcessorMap updates the underlying TProcessor ProccessorMaps depending on // the format of "name". // // If "name" is in the format "{ProcessorName}{MULTIPLEXED_SEPARATOR}{FunctionName}", // then it sets the given TProcessorFunction on the inner TProcessor with the // ProcessorName component using the FunctionName component. // // If "name" is just in the format "{FunctionName}", that is to say there is no // MULTIPLEXED_SEPARATOR, and the TMultiplexedProcessor has a DefaultProcessor // configured, then it will set the given TProcessorFunction on the DefaultProcessor // using the given name. // // If there is not a TProcessor available for the given name, then this function // does nothing. This can happen when there is no TProcessor registered for // the given ProcessorName or if all that is given is the FunctionName and there // is no DefaultProcessor set. func (t *TMultiplexedProcessor) AddToProcessorMap(name string, processorFunc TProcessorFunction) { components := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2) if len(components) != 2 { if t.DefaultProcessor != nil && len(components) == 1 { t.DefaultProcessor.AddToProcessorMap(components[0], processorFunc) } return } processorName := components[0] funcName := components[1] if processor, ok := t.serviceProcessorMap[processorName]; ok { processor.AddToProcessorMap(funcName, processorFunc) } } // verify that TMultiplexedProcessor implements TProcessor var _ TProcessor = (*TMultiplexedProcessor)(nil) func (t *TMultiplexedProcessor) RegisterDefault(processor TProcessor) { t.DefaultProcessor = processor } func (t *TMultiplexedProcessor) RegisterProcessor(name string, processor TProcessor) { if t.serviceProcessorMap == nil { t.serviceProcessorMap = make(map[string]TProcessor) } t.serviceProcessorMap[name] = processor } func (t *TMultiplexedProcessor) Process(ctx context.Context, in, out TProtocol) (bool, TException) { name, typeId, seqid, err := in.ReadMessageBegin(ctx) if err != nil { return false, NewTProtocolException(err) } if typeId != CALL && typeId != ONEWAY { return false, NewTProtocolException(fmt.Errorf("Unexpected message type %v", typeId)) } //extract the service name v := strings.SplitN(name, MULTIPLEXED_SEPARATOR, 2) if len(v) != 2 { if t.DefaultProcessor != nil { smb := NewStoredMessageProtocol(in, name, typeId, seqid) return t.DefaultProcessor.Process(ctx, smb, out) } return false, NewTProtocolException(fmt.Errorf( "Service name not found in message name: %s. Did you forget to use a TMultiplexProtocol in your client?", name, )) } actualProcessor, ok := t.serviceProcessorMap[v[0]] if !ok { return false, NewTProtocolException(fmt.Errorf( "Service name not found: %s. Did you forget to call registerProcessor()?", v[0], )) } smb := NewStoredMessageProtocol(in, v[1], typeId, seqid) return actualProcessor.Process(ctx, smb, out) } //Protocol that use stored message for ReadMessageBegin type storedMessageProtocol struct { TProtocol name string typeId TMessageType seqid int32 } func NewStoredMessageProtocol(protocol TProtocol, name string, typeId TMessageType, seqid int32) *storedMessageProtocol { return &storedMessageProtocol{protocol, name, typeId, seqid} } func (s *storedMessageProtocol) ReadMessageBegin(ctx context.Context) (name string, typeId TMessageType, seqid int32, err error) { return s.name, s.typeId, s.seqid, nil }