# encoding: utf-8 require 'logstash/outputs/base' require 'logstash/namespace' require 'logstash/errors' class LogStash::Outputs::Kusto < LogStash::Outputs::Base ## # This handles the overall logic and communication with Kusto # class Ingestor require 'logstash-output-kusto_jars' RETRY_DELAY_SECONDS = 3 DEFAULT_THREADPOOL = Concurrent::ThreadPoolExecutor.new( min_threads: 1, max_threads: 8, max_queue: 1, fallback_policy: :caller_runs ) LOW_QUEUE_LENGTH = 3 FIELD_REF = /%\{[^}]+\}/ def initialize(ingest_url, app_id, app_key, app_tenant, managed_identity_id, database, table, json_mapping, delete_local, proxy_host , proxy_port , proxy_protocol,logger, threadpool = DEFAULT_THREADPOOL) @workers_pool = threadpool @logger = logger validate_config(database, table, json_mapping,proxy_protocol,app_id, app_key, managed_identity_id) @logger.info('Preparing Kusto resources.') kusto_java = Java::com.microsoft.azure.kusto apache_http = Java::org.apache.http # kusto_connection_string = kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) # If there is managed identity, use it. This means the AppId and AppKey are empty/nil is_managed_identity = (app_id.nil? && app_key.nil?) # If it is system managed identity, propagate the system identity is_system_assigned_managed_identity = is_managed_identity && 0 == "system".casecmp(managed_identity_id) # Is it direct connection is_direct_conn = (proxy_host.nil? || proxy_host.empty?) # Create a connection string kusto_connection_string = if is_managed_identity if is_system_assigned_managed_identity @logger.info('Using system managed identity.') kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url) else @logger.info('Using user managed identity.') kusto_java.data.auth.ConnectionStringBuilder.createWithAadManagedIdentity(ingest_url, managed_identity_id) end else kusto_java.data.auth.ConnectionStringBuilder.createWithAadApplicationCredentials(ingest_url, app_id, app_key.value, app_tenant) end # @logger.debug(Gem.loaded_specs.to_s) # Unfortunately there's no way to avoid using the gem/plugin name directly... name_for_tracing = "logstash-output-kusto:#{Gem.loaded_specs['logstash-output-kusto']&.version || "unknown"}" @logger.debug("Client name for tracing: #{name_for_tracing}") tuple_utils = Java::org.apache.commons.lang3.tuple # kusto_connection_string.setClientVersionForTracing(name_for_tracing) version_for_tracing=Gem.loaded_specs['logstash-output-kusto']&.version || "unknown" kusto_connection_string.setConnectorDetails("Logstash",version_for_tracing.to_s,"","",false,"", tuple_utils.Pair.emptyArray()); @kusto_client = begin if is_direct_conn kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string) else kusto_http_client_properties = kusto_java.data.HttpClientProperties.builder().proxy(apache_http.HttpHost.new(proxy_host,proxy_port,proxy_protocol)).build() kusto_java.ingest.IngestClientFactory.createClient(kusto_connection_string, kusto_http_client_properties) end end @ingestion_properties = kusto_java.ingest.IngestionProperties.new(database, table) @ingestion_properties.setIngestionMapping(json_mapping, kusto_java.ingest.IngestionMapping::IngestionMappingKind::JSON) @ingestion_properties.setDataFormat(kusto_java.ingest.IngestionProperties::DataFormat::JSON) @delete_local = delete_local @logger.debug('Kusto resources are ready.') end def validate_config(database, table, json_mapping, proxy_protocol, app_id, app_key, managed_identity_id) # Add an additional validation and fail this upfront if app_id.nil? && app_key.nil? && managed_identity_id.nil? @logger.error('managed_identity_id is not provided and app_id/app_key is empty.') raise LogStash::ConfigurationError.new('managed_identity_id is not provided and app_id/app_key is empty.') end if database =~ FIELD_REF @logger.error('database config value should not be dynamic.', database) raise LogStash::ConfigurationError.new('database config value should not be dynamic.') end if table =~ FIELD_REF @logger.error('table config value should not be dynamic.', table) raise LogStash::ConfigurationError.new('table config value should not be dynamic.') end if json_mapping =~ FIELD_REF @logger.error('json_mapping config value should not be dynamic.', json_mapping) raise LogStash::ConfigurationError.new('json_mapping config value should not be dynamic.') end if not(["https", "http"].include? proxy_protocol) @logger.error('proxy_protocol has to be http or https.', proxy_protocol) raise LogStash::ConfigurationError.new('proxy_protocol has to be http or https.') end end def upload_async(path, delete_on_success) if @workers_pool.remaining_capacity <= LOW_QUEUE_LENGTH @logger.warn("Ingestor queue capacity is running low with #{@workers_pool.remaining_capacity} free slots.") end @workers_pool.post do LogStash::Util.set_thread_name("Kusto to ingest file: #{path}") upload(path, delete_on_success) end rescue Exception => e @logger.error('StandardError.', exception: e.class, message: e.message, path: path, backtrace: e.backtrace) raise e end def upload(path, delete_on_success) file_size = File.size(path) @logger.debug("Sending file to kusto: #{path}. size: #{file_size}") # TODO: dynamic routing # file_metadata = path.partition('.kusto.').last # file_metadata_parts = file_metadata.split('.') # if file_metadata_parts.length == 3 # # this is the number we expect - database, table, json_mapping # database = file_metadata_parts[0] # table = file_metadata_parts[1] # json_mapping = file_metadata_parts[2] # local_ingestion_properties = Java::KustoIngestionProperties.new(database, table) # local_ingestion_properties.addJsonMappingName(json_mapping) # end if file_size > 0 file_source_info = Java::com.microsoft.azure.kusto.ingest.source.FileSourceInfo.new(path, 0); # 0 - let the sdk figure out the size of the file @kusto_client.ingestFromFile(file_source_info, @ingestion_properties) else @logger.warn("File #{path} is an empty file and is not ingested.") end File.delete(path) if delete_on_success @logger.debug("File #{path} sent to kusto.") rescue Errno::ENOENT => e @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace) rescue Java::JavaNioFile::NoSuchFileException => e @logger.error("File doesn't exist! Unrecoverable error.", exception: e.class, message: e.message, path: path, backtrace: e.backtrace) rescue => e # When the retry limit is reached or another error happen we will wait and retry. # # Thread might be stuck here, but I think its better than losing anything # its either a transient errors or something bad really happened. @logger.error('Uploading failed, retrying.', exception: e.class, message: e.message, path: path, backtrace: e.backtrace) sleep RETRY_DELAY_SECONDS retry end def stop @workers_pool.shutdown @workers_pool.wait_for_termination(nil) # block until its done end end end