lib/kafka/protocol.rb in ruby-kafka-0.5.2 vs lib/kafka/protocol.rb in ruby-kafka-0.5.3
- old
+ new
@@ -24,10 +24,13 @@
LEAVE_GROUP_API = 13
SYNC_GROUP_API = 14
SASL_HANDSHAKE_API = 17
API_VERSIONS_API = 18
CREATE_TOPICS_API = 19
+ DELETE_TOPICS_API = 20
+ DESCRIBE_CONFIGS_API = 32
+ CREATE_PARTITIONS_API = 37
# A mapping from numeric API keys to symbolic API names.
APIS = {
PRODUCE_API => :produce,
FETCH_API => :fetch,
@@ -41,10 +44,13 @@
LEAVE_GROUP_API => :leave_group,
SYNC_GROUP_API => :sync_group,
SASL_HANDSHAKE_API => :sasl_handshake,
API_VERSIONS_API => :api_versions,
CREATE_TOPICS_API => :create_topics,
+ DELETE_TOPICS_API => :delete_topics,
+ DESCRIBE_CONFIGS_API => :describe_configs_api,
+ CREATE_PARTITIONS_API => :create_partitions
}
# A mapping from numeric error codes to exception classes.
ERRORS = {
-1 => UnknownError,
@@ -85,23 +91,42 @@
40 => InvalidConfig,
41 => NotController,
42 => InvalidRequest
}
+ # A mapping from int to corresponding resource type in symbol.
+ # https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/resource/ResourceType.java
+ RESOURCE_TYPE_UNKNOWN = 0
+ RESOURCE_TYPE_ANY = 1
+ RESOURCE_TYPE_TOPIC = 2
+ RESOURCE_TYPE_GROUP = 3
+ RESOURCE_TYPE_CLUSTER = 4
+ RESOURCE_TYPE_TRANSACTIONAL_ID = 5
+ RESOURCE_TYPE_DELEGATION_TOKEN = 6
+ RESOURCE_TYPES = {
+ RESOURCE_TYPE_UNKNOWN => :unknown,
+ RESOURCE_TYPE_ANY => :any,
+ RESOURCE_TYPE_TOPIC => :topic,
+ RESOURCE_TYPE_GROUP => :group,
+ RESOURCE_TYPE_CLUSTER => :cluster,
+ RESOURCE_TYPE_TRANSACTIONAL_ID => :transactional_id,
+ RESOURCE_TYPE_DELEGATION_TOKEN => :delegation_token,
+ }
+
# Handles an error code by either doing nothing (if there was no error) or
# by raising an appropriate exception.
#
# @param error_code Integer
# @raise [ProtocolError]
# @return [nil]
- def self.handle_error(error_code)
+ def self.handle_error(error_code, error_message = nil)
if error_code == 0
# No errors, yay!
elsif error = ERRORS[error_code]
- raise error
+ raise error, error_message
else
- raise UnknownError, "Unknown error with code #{error_code}"
+ raise UnknownError, "Unknown error with code #{error_code} #{error_message}"
end
end
# Returns the symbolic name for an API key.
#
@@ -139,5 +164,11 @@
require "kafka/protocol/api_versions_response"
require "kafka/protocol/sasl_handshake_request"
require "kafka/protocol/sasl_handshake_response"
require "kafka/protocol/create_topics_request"
require "kafka/protocol/create_topics_response"
+require "kafka/protocol/delete_topics_request"
+require "kafka/protocol/delete_topics_response"
+require "kafka/protocol/describe_configs_request"
+require "kafka/protocol/describe_configs_response"
+require "kafka/protocol/create_partitions_request"
+require "kafka/protocol/create_partitions_response"