Sha256: 6fed7e5f2dcc7fbe3fa43e749e6870fab75b3215d90e0c3cfd5f387ea6b529f2
Contents?: true
Size: 1.91 KB
Versions: 3
Compression:
Stored size: 1.91 KB
Contents
# encoding: utf-8 require "logstash/pipeline_action/base" require "logstash/pipeline" require "logstash/java_pipeline" require "logstash/converge_result" require "logstash/util/loggable" module LogStash module PipelineAction class Create < Base include LogStash::Util::Loggable # We currently pass around the metric object again this # is needed to correctly create a pipeline, in a future # PR we could pass a factory to create the pipeline so we pass the logic # to create the pipeline instead. def initialize(pipeline_config, metric) @pipeline_config = pipeline_config @metric = metric end def pipeline_id @pipeline_config.pipeline_id end # Make sure we execution system pipeline like the monitoring # before any user defined pipelines, system pipeline register hooks into the system that will be # triggered by the user defined pipeline. def execution_priority default_priority = super @pipeline_config.system? ? default_priority * -1 : default_priority end # The execute assume that the thread safety access of the pipeline # is managed by the caller. def execute(agent, pipelines) pipeline = if @pipeline_config.settings.get_value("pipeline.java_execution") LogStash::JavaPipeline.new(@pipeline_config, @metric, agent) else LogStash::Pipeline.new(@pipeline_config, @metric, agent) end status = nil pipelines.compute(pipeline_id) do |id,value| if value LogStash::ConvergeResult::ActionResult.create(self, true) end status = pipeline.start # block until the pipeline is correctly started or crashed pipeline # The pipeline is successfully started we can add it to the map end LogStash::ConvergeResult::ActionResult.create(self, status) end def to_s "PipelineAction::Create<#{pipeline_id}>" end end end end
Version data entries
3 entries across 3 versions & 1 rubygems