# # Fluentd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. # require 'fluent/config' require 'fluent/plugin' module Fluent # Static Analysis means analysing all plugins and Fluent::Element without invokeing Plugin#configure class StaticConfigAnalysis module Elem Input = Struct.new(:plugin, :config) Output = Struct.new(:plugin, :config) Filter = Struct.new(:plugin, :config) Label = Struct.new(:name, :config, :nodes) Worker = Struct.new(:ids, :config, :nodes) end Result = Struct.new(:tree, :outputs, :inputs, :filters, :labels) do def all_plugins (outputs + inputs + filters).map(&:plugin) end end # @param workers [Integer] Number of workers # @return [Fluent::StaticConfigAnalysis::Result] def self.call(conf, workers: 1) new(workers).call(conf) end def initialize(workers) @workers = workers reset end def call(config) reset tree = [ static_worker_analyse(config), static_label_analyse(config), static_filter_and_output_analyse(config), static_input_analyse(config), ].flatten Result.new(tree, @outputs, @inputs, @filters, @labels.values) end private def reset @outputs = [] @inputs = [] @filters = [] @labels = {} end def static_worker_analyse(conf) available_worker_ids = [*0...@workers] ret = [] conf.elements(name: 'worker').each do |config| ids = parse_worker_id(config) ids.each do |id| if available_worker_ids.include?(id) available_worker_ids.delete(id) else raise Fluent::ConfigError, "specified worker_id<#{id}> collisions is detected on directive. Available worker id(s): #{available_worker_ids}" end end config.elements.each do |elem| unless %w[source match filter label].include?(elem.name) raise Fluent::ConfigError, " section cannot have <#{elem.name}> directive" end end nodes = [ static_label_analyse(config), static_filter_and_output_analyse(config), static_input_analyse(config), ].flatten ret << Elem::Worker.new(ids, config, nodes) end ret end def parse_worker_id(conf) worker_id_str = conf.arg if worker_id_str.empty? raise Fluent::ConfigError, 'Missing worker id on directive' end l, r = begin worker_id_str.split('-', 2).map { |v| Integer(v) } rescue TypeError, ArgumentError raise Fluent::ConfigError, "worker id should be integer: #{worker_id_str}" end if l < 0 || l >= @workers raise Fluent::ConfigError, "worker id #{l} specified by directive is not allowed. Available worker id is between 0 and #{@workers-1}" end # e.g. specified one worker id like `` if r.nil? return [l] end if r < 0 || r >= @workers raise Fluent::ConfigError, "worker id #{r} specified by directive is not allowed. Available worker id is between 0 and #{@workers-1}" end if l > r raise Fluent::ConfigError, "greater first_worker_id<#{l}> than last_worker_id<#{r}> specified by directive is not allowed. Available multi worker assign syntax is -" end [l, r] end def static_label_analyse(conf) ret = [] conf.elements(name: 'label').each do |e| name = e.arg if name.empty? raise ConfigError, 'Missing symbol argument on