lib/avro/ipc.rb in avro-1.7.2 vs lib/avro/ipc.rb in avro-1.7.3
- old
+ new
@@ -240,19 +240,19 @@
protocol_cache[local_hash] = local_protocol
end
# Called by a server to deserialize a request, compute and serialize
# a response or error. Compare to 'handle()' in Thrift.
- def respond(call_request)
+ def respond(call_request, transport=nil)
buffer_decoder = Avro::IO::BinaryDecoder.new(StringIO.new(call_request))
buffer_writer = StringIO.new('', 'w+')
buffer_encoder = Avro::IO::BinaryEncoder.new(buffer_writer)
error = nil
response_metadata = {}
begin
- remote_protocol = process_handshake(buffer_decoder, buffer_encoder)
+ remote_protocol = process_handshake(buffer_decoder, buffer_encoder, transport)
# handshake failure
unless remote_protocol
return buffer_writer.string
end
@@ -300,11 +300,14 @@
self.write_error(SYSTEM_ERROR_SCHEMA, error, buffer_encoder)
end
buffer_writer.string
end
- def process_handshake(decoder, encoder)
+ def process_handshake(decoder, encoder, connection=nil)
+ if connection && connection.is_connected?
+ return connection.protocol
+ end
handshake_request = HANDSHAKE_RESPONDER_READER.read(decoder)
handshake_response = {}
# determine the remote protocol
client_hash = handshake_request['clientHash']
@@ -336,10 +339,15 @@
handshake_response['serverProtocol'] = local_protocol.to_s
handshake_response['serverHash'] = local_hash
end
HANDSHAKE_RESPONDER_WRITER.write(handshake_response, encoder)
+
+ if connection && handshake_response['match'] != 'NONE'
+ connection.protocol = remote_protocol
+ end
+
remote_protocol
end
def call(local_message, request)
# Actual work done by server: cf. handler in thrift.
@@ -364,12 +372,18 @@
class SocketTransport
# A simple socket-based Transport implementation.
attr_reader :sock, :remote_name
+ attr_accessor :protocol
def initialize(sock)
@sock = sock
+ @protocol = nil
+ end
+
+ def is_connected?()
+ !!@protocol
end
def transceive(request)
write_framed_message(request)
read_framed_message