lib/rdkafka/callbacks.rb in karafka-rdkafka-0.14.0 vs lib/rdkafka/callbacks.rb in karafka-rdkafka-0.14.1

- old
+ new

@@ -21,10 +21,100 @@ new(result_pointer) end end end + class GroupResult + attr_reader :result_error, :error_string, :result_name + def initialize(group_result_pointer) + native_error = Rdkafka::Bindings.rd_kafka_group_result_error(group_result_pointer) + + if native_error.null? + @result_error = 0 + @error_string = FFI::Pointer::NULL + else + @result_error = native_error[:code] + @error_string = native_error[:errstr] + end + + @result_name = Rdkafka::Bindings.rd_kafka_group_result_name(group_result_pointer) + end + def self.create_group_results_from_array(count, array_pointer) + (1..count).map do |index| + result_pointer = (array_pointer + (index - 1)).read_pointer + new(result_pointer) + end + end + end + + # Extracts attributes of rd_kafka_acl_result_t + # + # @private + class CreateAclResult + attr_reader :result_error, :error_string + + def initialize(acl_result_pointer) + rd_kafka_error_pointer = Bindings.rd_kafka_acl_result_error(acl_result_pointer) + @result_error = Rdkafka::Bindings.rd_kafka_error_code(rd_kafka_error_pointer) + @error_string = Rdkafka::Bindings.rd_kafka_error_string(rd_kafka_error_pointer) + end + + def self.create_acl_results_from_array(count, array_pointer) + (1..count).map do |index| + result_pointer = (array_pointer + (index - 1)).read_pointer + new(result_pointer) + end + end + end + + # Extracts attributes of rd_kafka_DeleteAcls_result_response_t + # + # @private + class DeleteAclResult + attr_reader :result_error, :error_string, :matching_acls, :matching_acls_count + + def initialize(acl_result_pointer) + @matching_acls=[] + rd_kafka_error_pointer = Rdkafka::Bindings.rd_kafka_DeleteAcls_result_response_error(acl_result_pointer) + @result_error = Rdkafka::Bindings.rd_kafka_error_code(rd_kafka_error_pointer) + @error_string = Rdkafka::Bindings.rd_kafka_error_string(rd_kafka_error_pointer) + if @result_error == 0 + # Get the number of matching acls + pointer_to_size_t = FFI::MemoryPointer.new(:int32) + @matching_acls = Rdkafka::Bindings.rd_kafka_DeleteAcls_result_response_matching_acls(acl_result_pointer, pointer_to_size_t) + @matching_acls_count = pointer_to_size_t.read_int + end + end + + def self.delete_acl_results_from_array(count, array_pointer) + (1..count).map do |index| + result_pointer = (array_pointer + (index - 1)).read_pointer + new(result_pointer) + end + end + end + + # Extracts attributes of rd_kafka_DeleteAcls_result_response_t + # + # @private + class DescribeAclResult + attr_reader :result_error, :error_string, :matching_acls, :matching_acls_count + + def initialize(event_ptr) + @matching_acls=[] + @result_error = Rdkafka::Bindings.rd_kafka_event_error(event_ptr) + @error_string = Rdkafka::Bindings.rd_kafka_event_error_string(event_ptr) + if @result_error == 0 + acl_describe_result = Rdkafka::Bindings.rd_kafka_event_DescribeAcls_result(event_ptr) + # Get the number of matching acls + pointer_to_size_t = FFI::MemoryPointer.new(:int32) + @matching_acls = Rdkafka::Bindings.rd_kafka_DescribeAcls_result_acls(acl_describe_result, pointer_to_size_t) + @matching_acls_count = pointer_to_size_t.read_int + end + end + end + # FFI Function used for Create Topic and Delete Topic callbacks BackgroundEventCallbackFunction = FFI::Function.new( :void, [:pointer, :pointer, :pointer] ) do |client_ptr, event_ptr, opaque_ptr| BackgroundEventCallback.call(client_ptr, event_ptr, opaque_ptr) @@ -38,10 +128,18 @@ process_create_topic(event_ptr) elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETETOPICS_RESULT process_delete_topic(event_ptr) elsif event_type == Rdkafka::Bindings::RD_KAFKA_ADMIN_OP_CREATEPARTITIONS_RESULT process_create_partitions(event_ptr) + elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_CREATEACLS_RESULT + process_create_acl(event_ptr) + elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEACLS_RESULT + process_delete_acl(event_ptr) + elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DESCRIBEACLS_RESULT + process_describe_acl(event_ptr) + elsif event_type == Rdkafka::Bindings::RD_KAFKA_EVENT_DELETEGROUPS_RESULT + process_delete_groups(event_ptr) end end private @@ -60,10 +158,27 @@ create_topic_handle[:result_name] = create_topic_results[0].result_name create_topic_handle[:pending] = false end end + def self.process_delete_groups(event_ptr) + delete_groups_result = Rdkafka::Bindings.rd_kafka_event_DeleteGroups_result(event_ptr) + + # Get the number of delete group results + pointer_to_size_t = FFI::MemoryPointer.new(:size_t) + delete_group_result_array = Rdkafka::Bindings.rd_kafka_DeleteGroups_result_groups(delete_groups_result, pointer_to_size_t) + delete_group_results = GroupResult.create_group_results_from_array(pointer_to_size_t.read_int, delete_group_result_array) # TODO fix this + delete_group_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) + + if (delete_group_handle = Rdkafka::Admin::DeleteGroupsHandle.remove(delete_group_handle_ptr.address)) + delete_group_handle[:response] = delete_group_results[0].result_error + delete_group_handle[:error_string] = delete_group_results[0].error_string + delete_group_handle[:result_name] = delete_group_results[0].result_name + delete_group_handle[:pending] = false + end + end + def self.process_delete_topic(event_ptr) delete_topics_result = Rdkafka::Bindings.rd_kafka_event_DeleteTopics_result(event_ptr) # Get the number of topic results pointer_to_size_t = FFI::MemoryPointer.new(:int32) @@ -93,10 +208,61 @@ create_partitions_handle[:error_string] = create_partitions_results[0].error_string create_partitions_handle[:result_name] = create_partitions_results[0].result_name create_partitions_handle[:pending] = false end end + + def self.process_create_acl(event_ptr) + create_acls_result = Rdkafka::Bindings.rd_kafka_event_CreateAcls_result(event_ptr) + + # Get the number of acl results + pointer_to_size_t = FFI::MemoryPointer.new(:int32) + create_acl_result_array = Rdkafka::Bindings.rd_kafka_CreateAcls_result_acls(create_acls_result, pointer_to_size_t) + create_acl_results = CreateAclResult.create_acl_results_from_array(pointer_to_size_t.read_int, create_acl_result_array) + create_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) + + if create_acl_handle = Rdkafka::Admin::CreateAclHandle.remove(create_acl_handle_ptr.address) + create_acl_handle[:response] = create_acl_results[0].result_error + create_acl_handle[:response_string] = create_acl_results[0].error_string + create_acl_handle[:pending] = false + end + end + + def self.process_delete_acl(event_ptr) + delete_acls_result = Rdkafka::Bindings.rd_kafka_event_DeleteAcls_result(event_ptr) + + # Get the number of acl results + pointer_to_size_t = FFI::MemoryPointer.new(:int32) + delete_acl_result_responses = Rdkafka::Bindings.rd_kafka_DeleteAcls_result_responses(delete_acls_result, pointer_to_size_t) + delete_acl_results = DeleteAclResult.delete_acl_results_from_array(pointer_to_size_t.read_int, delete_acl_result_responses) + delete_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) + + if delete_acl_handle = Rdkafka::Admin::DeleteAclHandle.remove(delete_acl_handle_ptr.address) + delete_acl_handle[:response] = delete_acl_results[0].result_error + delete_acl_handle[:response_string] = delete_acl_results[0].error_string + delete_acl_handle[:pending] = false + if delete_acl_results[0].result_error == 0 + delete_acl_handle[:matching_acls] = delete_acl_results[0].matching_acls + delete_acl_handle[:matching_acls_count] = delete_acl_results[0].matching_acls_count + end + end + end + + def self.process_describe_acl(event_ptr) + describe_acl = DescribeAclResult.new(event_ptr) + describe_acl_handle_ptr = Rdkafka::Bindings.rd_kafka_event_opaque(event_ptr) + + if describe_acl_handle = Rdkafka::Admin::DescribeAclHandle.remove(describe_acl_handle_ptr.address) + describe_acl_handle[:response] = describe_acl.result_error + describe_acl_handle[:response_string] = describe_acl.error_string + describe_acl_handle[:pending] = false + if describe_acl.result_error == 0 + describe_acl_handle[:acls] = describe_acl.matching_acls + describe_acl_handle[:acls_count] = describe_acl.matching_acls_count + end + end + end end # FFI Function used for Message Delivery callbacks DeliveryCallbackFunction = FFI::Function.new( @@ -125,7 +291,8 @@ opaque.call_delivery_callback(Rdkafka::Producer::DeliveryReport.new(message[:partition], message[:offset], topic_name, message[:err]), delivery_handle) end end end end + end end