lib/rdkafka/callbacks.rb in karafka-rdkafka-0.14.0 vs lib/rdkafka/callbacks.rb in karafka-rdkafka-0.14.1
- old
+ new
@@ -21,10 +21,100 @@
new(result_pointer)
end
end
end
+ class GroupResult
+ attr_reader :result_error, :error_string, :result_name
+ def initialize(group_result_pointer)
+ native_error = Rdkafka::Bindings.rd_kafka_group_result_error(group_result_pointer)
+
+ if native_error.null?
+ @result_error = 0
+ @error_string = FFI::Pointer::NULL
+ else
+ @result_error = native_error[:code]
+ @error_string = native_error[:errstr]
+ end
+
+ @result_name = Rdkafka::Bindings.rd_kafka_group_result_name(group_result_pointer)
+ end
+ def self.create_group_results_from_array(count, array_pointer)
+ (1..count).map do |index|
+ result_pointer = (array_pointer + (index - 1)).read_pointer
+ new(result_pointer)
+ end
+ end
+ end
+
+ # Extracts attributes of rd_kafka_acl_result_t
+ #
+ # @private
+ class CreateAclResult
+ attr_reader :result_error, :error_string
+
+ def initialize(acl_result_pointer)
+ rd_kafka_error_pointer = Bindings.rd_kafka_acl_result_error(acl_result_pointer)
+ @result_error = Rdkafka::Bindings.rd_kafka_error_code(rd_kafka_error_pointer)
+ @error_string = Rdkafka::Bindings.rd_kafka_error_string(rd_kafka_error_pointer)
+ end
+
+ def self.create_acl_results_from_array(count, array_pointer)
+ (1..count).map do |index|
+ result_pointer = (array_pointer + (index - 1)).read_pointer
+ new(result_pointer)
+ end
+ end
+ end
+
+ # Extracts attributes of rd_kafka_DeleteAcls_result_response_t
+ #
+ # @private
+ class DeleteAclResult
+ attr_reader :result_error, :error_string, :matching_acls, :matching_acls_count
+
+ def initialize(acl_result_pointer)
+ @matching_acls=[]
+ rd_kafka_error_pointer = Rdkafka::Bindings.rd_kafka_DeleteAcls_result_response_error(acl_result_pointer)
+ @result_error = Rdkafka::Bindings.rd_kafka_error_code(rd_kafka_error_pointer)
+ @error_string = Rdkafka::Bindings.rd_kafka_error_string(rd_kafka_error_pointer)
+ if @result_error == 0
+ # Get the number of matching acls
+ pointer_to_size_t = FFI::MemoryPointer.new(:int32)
+ @matching_acls = Rdkafka::Bindings.rd_kafka_DeleteAcls_result_response_matching_acls(acl_result_pointer, pointer_to_size_t)
+ @matching_acls_count = pointer_to_size_t.read_int
+ end
+ end
+
+ def self.delete_acl_results_from_array(count, array_pointer)
+ (1..count).map do |index|
+ result_pointer = (array_pointer + (index - 1)).read_pointer
+ new(result_pointer)
+ end
+ end
+ end
+
+ # Extracts attributes of rd_kafka_DeleteAcls_result_response_t
+ #
+ # @private
+ class DescribeAclResult
+ attr_reader :result_error, :error_string, :matching_acls, :matching_acls_count
+
+ def initialize(event_ptr)
+ @matching_acls=[]
+ @result_error = Rdkafka::Bindings.rd_kafka_event_error(event_ptr)
+ @error_string = Rdkafka::Bindings.rd_kafka_event_error_string(event_ptr)
+ if @result_error == 0
+ acl_describe_result = Rdkafka::Bindings.rd_kafka_event_DescribeAcls_result(event_ptr)
+ # Get the number of matching acls
+ pointer_to_size_t = FFI::MemoryPointer.new(:int32)
+ @matching_acls = Rdkafka::Bindings.rd_kafka_DescribeAcls_result_acls(acl_describe_result, pointer_to_size_t)
+ @matching_acls_count = pointer_to_size_t.read_int
+ end
+ end
+ end
+
# FFI Function used for Create Topic and Delete Topic callbacks
BackgroundEventCallbackFunction = FFI::Function.new(
:void, [:pointer, :pointer, :pointer]
) do |client_ptr, event_ptr, opaque_ptr|
BackgroundEventCallback.call(client_ptr, event_ptr, opaque_ptr)
@@ -38,10 +128,18 @@
process_create_topic(event_ptr)
elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT
process_delete_topic(event_ptr)
elsif event_type == Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT
process_create_partitions(event_ptr)
+ elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT
+ process_create_acl(event_ptr)
+ elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT
+ process_delete_acl(event_ptr)
+ elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT
+ process_describe_acl(event_ptr)
+ elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT
+ process_delete_groups(event_ptr)
end
end
private
@@ -60,10 +158,27 @@
create_topic_handle[:result_name] = create_topic_results[0].result_name
create_topic_handle[:pending] = false
end
end
+ def self.process_delete_groups(event_ptr)
+ delete_groups_result = Rdkafka::Bindings.rd_kafka_event_DeleteGroups_result(event_ptr)
+
+ # Get the number of delete group results
+ pointer_to_size_t = FFI::MemoryPointer.new(:size_t)
+ delete_group_result_array = Rdkafka::Bindings.rd_kafka_DeleteGroups_result_groups(delete_groups_result, pointer_to_size_t)
+ delete_group_results = GroupResult.create_group_results_from_array(pointer_to_size_t.read_int, delete_group_result_array) # TODO fix this
+ delete_group_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)
+
+ if (delete_group_handle = Rdkafka::Admin::DeleteGroupsHandle.remove(delete_group_handle_ptr.address))
+ delete_group_handle[:response] = delete_group_results[0].result_error
+ delete_group_handle[:error_string] = delete_group_results[0].error_string
+ delete_group_handle[:result_name] = delete_group_results[0].result_name
+ delete_group_handle[:pending] = false
+ end
+ end
+
def self.process_delete_topic(event_ptr)
delete_topics_result = Rdkafka::Bindings.rd_kafka_event_DeleteTopics_result(event_ptr)
# Get the number of topic results
pointer_to_size_t = FFI::MemoryPointer.new(:int32)
@@ -93,10 +208,61 @@
create_partitions_handle[:error_string] = create_partitions_results[0].error_string
create_partitions_handle[:result_name] = create_partitions_results[0].result_name
create_partitions_handle[:pending] = false
end
end
+
+ def self.process_create_acl(event_ptr)
+ create_acls_result = Rdkafka::Bindings.rd_kafka_event_CreateAcls_result(event_ptr)
+
+ # Get the number of acl results
+ pointer_to_size_t = FFI::MemoryPointer.new(:int32)
+ create_acl_result_array = Rdkafka::Bindings.rd_kafka_CreateAcls_result_acls(create_acls_result, pointer_to_size_t)
+ create_acl_results = CreateAclResult.create_acl_results_from_array(pointer_to_size_t.read_int, create_acl_result_array)
+ create_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)
+
+ if create_acl_handle = Rdkafka::Admin::CreateAclHandle.remove(create_acl_handle_ptr.address)
+ create_acl_handle[:response] = create_acl_results[0].result_error
+ create_acl_handle[:response_string] = create_acl_results[0].error_string
+ create_acl_handle[:pending] = false
+ end
+ end
+
+ def self.process_delete_acl(event_ptr)
+ delete_acls_result = Rdkafka::Bindings.rd_kafka_event_DeleteAcls_result(event_ptr)
+
+ # Get the number of acl results
+ pointer_to_size_t = FFI::MemoryPointer.new(:int32)
+ delete_acl_result_responses = Rdkafka::Bindings.rd_kafka_DeleteAcls_result_responses(delete_acls_result, pointer_to_size_t)
+ delete_acl_results = DeleteAclResult.delete_acl_results_from_array(pointer_to_size_t.read_int, delete_acl_result_responses)
+ delete_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)
+
+ if delete_acl_handle = Rdkafka::Admin::DeleteAclHandle.remove(delete_acl_handle_ptr.address)
+ delete_acl_handle[:response] = delete_acl_results[0].result_error
+ delete_acl_handle[:response_string] = delete_acl_results[0].error_string
+ delete_acl_handle[:pending] = false
+ if delete_acl_results[0].result_error == 0
+ delete_acl_handle[:matching_acls] = delete_acl_results[0].matching_acls
+ delete_acl_handle[:matching_acls_count] = delete_acl_results[0].matching_acls_count
+ end
+ end
+ end
+
+ def self.process_describe_acl(event_ptr)
+ describe_acl = DescribeAclResult.new(event_ptr)
+ describe_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr)
+
+ if describe_acl_handle = Rdkafka::Admin::DescribeAclHandle.remove(describe_acl_handle_ptr.address)
+ describe_acl_handle[:response] = describe_acl.result_error
+ describe_acl_handle[:response_string] = describe_acl.error_string
+ describe_acl_handle[:pending] = false
+ if describe_acl.result_error == 0
+ describe_acl_handle[:acls] = describe_acl.matching_acls
+ describe_acl_handle[:acls_count] = describe_acl.matching_acls_count
+ end
+ end
+ end
end
# FFI Function used for Message Delivery callbacks
DeliveryCallbackFunction = FFI::Function.new(
@@ -125,7 +291,8 @@
opaque.call_delivery_callback(Rdkafka::Producer::DeliveryReport.new(message[:partition], message[:offset], topic_name, message[:err]), delivery_handle)
end
end
end
end
+
end
end