# encoding: utf-8 require "thread" require "stud/interval" require "concurrent" require "logstash/namespace" require "logstash/errors" require "logstash/event" require "logstash/config/file" require "logstash/filters/base" require "logstash/inputs/base" require "logstash/outputs/base" require "logstash/shutdown_watcher" require "logstash/util/wrapped_synchronous_queue" require "logstash/pipeline_reporter" require "logstash/instrument/metric" require "logstash/instrument/namespaced_metric" require "logstash/instrument/null_metric" require "logstash/instrument/collector" require "logstash/output_delegator" require "logstash/filter_delegator" module LogStash; class Pipeline attr_reader :inputs, :filters, :outputs, :worker_threads, :events_consumed, :events_filtered, :reporter, :pipeline_id, :logger, :started_at, :thread, :config_str, :settings attr_accessor :metric MAX_INFLIGHT_WARN_THRESHOLD = 10_000 RELOAD_INCOMPATIBLE_PLUGINS = [ "LogStash::Inputs::Stdin" ] def initialize(config_str, settings = LogStash::SETTINGS) @config_str = config_str @logger = Cabin::Channel.get(LogStash) @settings = settings @pipeline_id = @settings.get_value("pipeline.id") || self.object_id @reporter = LogStash::PipelineReporter.new(@logger, self) @inputs = nil @filters = nil @outputs = nil @worker_threads = [] # Metric object should be passed upstream, multiple pipeline share the same metric # and collector only the namespace will changes. # If no metric is given, we use a `NullMetric` for all internal calls. # We also do this to make the changes backward compatible with previous testing of the # pipeline. # # This needs to be configured before we evaluate the code to make # sure the metric instance is correctly send to the plugin. # NOTE: It is the responsibility of the Agent to set this externally with a setter # if there's an intent of this not being a NullMetric @metric = Instrument::NullMetric.new grammar = LogStashConfigParser.new @config = grammar.parse(config_str) if @config.nil? raise LogStash::ConfigurationError, grammar.failure_reason end # This will compile the config to ruby and evaluate the resulting code. # The code will initialize all the plugins and define the # filter and output methods. code = @config.compile @code = code # The config code is hard to represent as a log message... # So just print it. if @settings.get("config.debug") && logger.debug? logger.debug("Compiled pipeline code", :code => code) end begin eval(code) rescue => e raise end @input_queue = LogStash::Util::WrappedSynchronousQueue.new @events_filtered = Concurrent::AtomicFixnum.new(0) @events_consumed = Concurrent::AtomicFixnum.new(0) # We generally only want one thread at a time able to access pop/take/poll operations # from this queue. We also depend on this to be able to block consumers while we snapshot # in-flight buffers @input_queue_pop_mutex = Mutex.new @input_threads = [] # @ready requires thread safety since it is typically polled from outside the pipeline thread @ready = Concurrent::AtomicBoolean.new(false) @running = Concurrent::AtomicBoolean.new(false) @flushing = Concurrent::AtomicReference.new(false) end # def initialize def ready? @ready.value end def safe_pipeline_worker_count default = @settings.get_default("pipeline.workers") pipeline_workers = @settings.get("pipeline.workers") #override from args "-w 8" or config safe_filters, unsafe_filters = @filters.partition(&:threadsafe?) plugins = unsafe_filters.collect { |f| f.config_name } return pipeline_workers if unsafe_filters.empty? if @settings.set?("pipeline.workers") if pipeline_workers > 1 @logger.warn("Warning: Manual override - there are filters that might not work with multiple worker threads", :worker_threads => pipeline_workers, :filters => plugins) end else # user did not specify a worker thread count # warn if the default is multiple if default > 1 @logger.warn("Defaulting pipeline worker threads to 1 because there are some filters that might not work with multiple worker threads", :count_was => default, :filters => plugins) return 1 # can't allow the default value to propagate if there are unsafe filters end end pipeline_workers end def filters? return @filters.any? end def run @started_at = Time.now @thread = Thread.current LogStash::Util.set_thread_name("[#{pipeline_id}]-pipeline-manager") start_workers @logger.log("Pipeline #{@pipeline_id} started") # Block until all inputs have stopped # Generally this happens if SIGINT is sent and `shutdown` is called from an external thread transition_to_running start_flusher # Launches a non-blocking thread for flush events wait_inputs transition_to_stopped @logger.info("Input plugins stopped! Will shutdown filter/output workers.") shutdown_flusher shutdown_workers @logger.log("Pipeline #{@pipeline_id} has been shutdown") # exit code return 0 end # def run def transition_to_running @running.make_true end def transition_to_stopped @running.make_false end def running? @running.true? end def stopped? @running.false? end def start_workers @inflight_batches = {} @worker_threads.clear # In case we're restarting the pipeline begin start_inputs @outputs.each {|o| o.register } @filters.each {|f| f.register } pipeline_workers = safe_pipeline_worker_count batch_size = @settings.get("pipeline.batch.size") batch_delay = @settings.get("pipeline.batch.delay") max_inflight = batch_size * pipeline_workers @logger.info("Starting pipeline", "id" => self.pipeline_id, "pipeline.workers" => pipeline_workers, "pipeline.batch.size" => batch_size, "pipeline.batch.delay" => batch_delay, "pipeline.max_inflight" => max_inflight) if max_inflight > MAX_INFLIGHT_WARN_THRESHOLD @logger.warn "CAUTION: Recommended inflight events max exceeded! Logstash will run with up to #{max_inflight} events in memory in your current configuration. If your message sizes are large this may cause instability with the default heap size. Please consider setting a non-standard heap size, changing the batch size (currently #{batch_size}), or changing the number of pipeline workers (currently #{pipeline_workers})" end pipeline_workers.times do |t| @worker_threads << Thread.new do LogStash::Util.set_thread_name("[#{pipeline_id}]>worker#{t}") worker_loop(batch_size, batch_delay) end end ensure # it is important to garantee @ready to be true after the startup sequence has been completed # to potentially unblock the shutdown method which may be waiting on @ready to proceed @ready.make_true end end # Main body of what a worker thread does # Repeatedly takes batches off the queue, filters, then outputs them def worker_loop(batch_size, batch_delay) running = true namespace_events = metric.namespace([:stats, :events]) namespace_pipeline = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :events]) while running # To understand the purpose behind this synchronize please read the body of take_batch input_batch, signal = @input_queue_pop_mutex.synchronize { take_batch(batch_size, batch_delay) } running = false if signal == LogStash::SHUTDOWN @events_consumed.increment(input_batch.size) namespace_events.increment(:in, input_batch.size) namespace_pipeline.increment(:in, input_batch.size) filtered_batch = filter_batch(input_batch) if signal # Flush on SHUTDOWN or FLUSH flush_options = (signal == LogStash::SHUTDOWN) ? {:final => true} : {} flush_filters_to_batch(filtered_batch, flush_options) end @events_filtered.increment(filtered_batch.size) namespace_events.increment(:filtered, filtered_batch.size) namespace_pipeline.increment(:filtered, filtered_batch.size) output_batch(filtered_batch) namespace_events.increment(:out, filtered_batch.size) namespace_pipeline.increment(:out, filtered_batch.size) inflight_batches_synchronize { set_current_thread_inflight_batch(nil) } end end def take_batch(batch_size, batch_delay) batch = [] # Since this is externally synchronized in `worker_look` wec can guarantee that the visibility of an insight batch # guaranteed to be a full batch not a partial batch set_current_thread_inflight_batch(batch) signal = false batch_size.times do |t| event = (t == 0) ? @input_queue.take : @input_queue.poll(batch_delay) if event.nil? next elsif event == LogStash::SHUTDOWN || event == LogStash::FLUSH # We MUST break here. If a batch consumes two SHUTDOWN events # then another worker may have its SHUTDOWN 'stolen', thus blocking # the pipeline. We should stop doing work after flush as well. signal = event break else batch << event end end [batch, signal] end def filter_batch(batch) batch.reduce([]) do |acc,e| if e.is_a?(LogStash::Event) filtered = filter_func(e) filtered.each {|fe| acc << fe unless fe.cancelled?} end acc end rescue Exception => e # Plugins authors should manage their own exceptions in the plugin code # but if an exception is raised up to the worker thread they are considered # fatal and logstash will not recover from this situation. # # Users need to check their configuration or see if there is a bug in the # plugin. @logger.error("Exception in pipelineworker, the pipeline stopped processing new events, please check your filter configuration and restart Logstash.", "exception" => e, "backtrace" => e.backtrace) raise end # Take an array of events and send them to the correct output def output_batch(batch) # Build a mapping of { output_plugin => [events...]} outputs_events = batch.reduce(Hash.new { |h, k| h[k] = [] }) do |acc, event| # We ask the AST to tell us which outputs to send each event to # Then, we stick it in the correct bin # output_func should never return anything other than an Array but we have lots of legacy specs # that monkeypatch it and return nil. We can deprecate "|| []" after fixing these specs outputs_for_event = output_func(event) || [] outputs_for_event.each { |output| acc[output] << event } acc end # Now that we have our output to event mapping we can just invoke each output # once with its list of events outputs_events.each { |output, events| output.multi_receive(events) } end def set_current_thread_inflight_batch(batch) @inflight_batches[Thread.current] = batch end def inflight_batches_synchronize @input_queue_pop_mutex.synchronize do yield(@inflight_batches) end end def wait_inputs @input_threads.each(&:join) end def start_inputs moreinputs = [] @inputs.each do |input| if input.threadable && input.threads > 1 (input.threads - 1).times do |i| moreinputs << input.clone end end end @inputs += moreinputs @inputs.each do |input| input.register start_input(input) end end def start_input(plugin) @input_threads << Thread.new { inputworker(plugin) } end def inputworker(plugin) LogStash::Util::set_thread_name("[#{pipeline_id}]<#{plugin.class.config_name}") begin plugin.run(@input_queue) rescue => e if plugin.stop? @logger.debug("Input plugin raised exception during shutdown, ignoring it.", :plugin => plugin.class.config_name, :exception => e, :backtrace => e.backtrace) return end # otherwise, report error and restart if @logger.debug? @logger.error(I18n.t("logstash.pipeline.worker-error-debug", :plugin => plugin.inspect, :error => e.to_s, :exception => e.class, :stacktrace => e.backtrace.join("\n"))) else @logger.error(I18n.t("logstash.pipeline.worker-error", :plugin => plugin.inspect, :error => e)) end # Assuming the failure that caused this exception is transient, # let's sleep for a bit and execute #run again sleep(1) retry ensure plugin.do_close end end # def inputworker # initiate the pipeline shutdown sequence # this method is intended to be called from outside the pipeline thread # @param before_stop [Proc] code block called before performing stop operation on input plugins def shutdown(&before_stop) # shutdown can only start once the pipeline has completed its startup. # avoid potential race conditoon between the startup sequence and this # shutdown method which can be called from another thread at any time sleep(0.1) while !ready? # TODO: should we also check against calling shutdown multiple times concurently? before_stop.call if block_given? @logger.info "Closing inputs" @inputs.each(&:do_stop) @logger.info "Closed inputs" end # def shutdown # After `shutdown` is called from an external thread this is called from the main thread to # tell the worker threads to stop and then block until they've fully stopped # This also stops all filter and output plugins def shutdown_workers # Each worker thread will receive this exactly once! @worker_threads.each do |t| @logger.debug("Pushing shutdown", :thread => t.inspect) @input_queue.push(LogStash::SHUTDOWN) end @worker_threads.each do |t| @logger.debug("Shutdown waiting for worker thread #{t}") t.join end @filters.each(&:do_close) @outputs.each(&:do_close) end def plugin(plugin_type, name, *args) args << {} if args.empty? pipeline_scoped_metric = metric.namespace([:stats, :pipelines, pipeline_id.to_s.to_sym, :plugins]) klass = LogStash::Plugin.lookup(plugin_type, name) if plugin_type == "output" LogStash::OutputDelegator.new(@logger, klass, @settings.get("pipeline.output.workers"), pipeline_scoped_metric.namespace(:outputs), *args) elsif plugin_type == "filter" LogStash::FilterDelegator.new(@logger, klass, pipeline_scoped_metric.namespace(:filters), *args) else klass.new(*args) end end # for backward compatibility in devutils for the rspec helpers, this method is not used # in the pipeline anymore. def filter(event, &block) # filter_func returns all filtered events, including cancelled ones filter_func(event).each { |e| block.call(e) } end # perform filters flush and yeild flushed event to the passed block # @param options [Hash] # @option options [Boolean] :final => true to signal a final shutdown flush def flush_filters(options = {}, &block) flushers = options[:final] ? @shutdown_flushers : @periodic_flushers flushers.each do |flusher| flusher.call(options, &block) end end def start_flusher # Invariant to help detect improper initialization raise "Attempted to start flusher on a stopped pipeline!" if stopped? @flusher_thread = Thread.new do while Stud.stoppable_sleep(5, 0.1) { stopped? } flush break if stopped? end end end def shutdown_flusher @flusher_thread.join end def flush if @flushing.compare_and_set(false, true) @logger.debug? && @logger.debug("Pushing flush onto pipeline") @input_queue.push(LogStash::FLUSH) end end # Calculate the uptime in milliseconds # # @return [Fixnum] Uptime in milliseconds, 0 if the pipeline is not started def uptime return 0 if started_at.nil? ((Time.now.to_f - started_at.to_f) * 1000.0).to_i end # perform filters flush into the output queue # @param options [Hash] # @option options [Boolean] :final => true to signal a final shutdown flush def flush_filters_to_batch(batch, options = {}) flush_filters(options) do |event| unless event.cancelled? @logger.debug? and @logger.debug("Pushing flushed events", :event => event) batch << event end end @flushing.set(false) end # flush_filters_to_output! def plugin_threads_info input_threads = @input_threads.select {|t| t.alive? } worker_threads = @worker_threads.select {|t| t.alive? } (input_threads + worker_threads).map {|t| LogStash::Util.thread_info(t) } end def stalling_threads_info plugin_threads_info .reject {|t| t["blocked_on"] } # known benign blocking statuses .each {|t| t.delete("backtrace") } .each {|t| t.delete("blocked_on") } .each {|t| t.delete("status") } end def non_reloadable_plugins (inputs + filters + outputs).select do |plugin| RELOAD_INCOMPATIBLE_PLUGINS.include?(plugin.class.name) end end # Sometimes we log stuff that will dump the pipeline which may contain # sensitive information (like the raw syntax tree which can contain passwords) # We want to hide most of what's in here def inspect { :pipeline_id => @pipeline_id, :settings => @settings.inspect, :ready => @ready, :running => @running, :flushing => @flushing } end end end