Class: Triglav::Agent::Base::Processor
- Inherits:
-
Object
- Object
- Triglav::Agent::Base::Processor
- Defined in:
- lib/triglav/agent/base/processor.rb
Overview
Triglav agent processor class.
An instance is created for a `resource_uri_prefix`.
You usually do not need to customize this class, but if you want to implement your original, configure
Triglav::Agent::Configuration.processor_class
Instance Attribute Summary collapse
-
#resource_uri_prefix ⇒ Object
readonly
Returns the value of attribute resource_uri_prefix.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(worker, resource_uri_prefix) ⇒ Processor
constructor
A new instance of Processor.
- #process ⇒ Object
- #total_count ⇒ Object
Constructor Details
#initialize(worker, resource_uri_prefix) ⇒ Processor
Returns a new instance of Processor
17 18 19 20 |
# File 'lib/triglav/agent/base/processor.rb', line 17 def initialize(worker, resource_uri_prefix) @worker = worker @resource_uri_prefix = resource_uri_prefix end |
Instance Attribute Details
#resource_uri_prefix ⇒ Object (readonly)
Returns the value of attribute resource_uri_prefix
15 16 17 |
# File 'lib/triglav/agent/base/processor.rb', line 15 def resource_uri_prefix @resource_uri_prefix end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker
15 16 17 |
# File 'lib/triglav/agent/base/processor.rb', line 15 def worker @worker end |
Class Method Details
.max_consecuitive_error_count ⇒ Object
22 23 24 |
# File 'lib/triglav/agent/base/processor.rb', line 22 def self.max_consecuitive_error_count 3 end |
Instance Method Details
#process ⇒ Object
26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 |
# File 'lib/triglav/agent/base/processor.rb', line 26 def process before_process success_count = 0 consecutive_error_count = 0 Parallel.each(resources, parallel_opts) do |resource| raise Parallel::Break if stopped? events = nil begin @connection_pool.with do |connection| monitor = monitor_class.new(connection, resource) monitor.process do |_events| events = _events $logger.info { "send_messages:#{events.map(&:to_hash).to_json}" } @api_client_pool.with {|api_client| api_client.(events) } end end @mutex.synchronize do success_count += 1 consecutive_error_count = 0 end rescue => e log_error(e) $logger.info { "failed_events:#{events.map(&:to_hash).to_json}" } if events @mutex.synchronize do raise TooManyError if (consecutive_error_count += 1) > self.class.max_consecuitive_error_count end end end success_count ensure after_process end |
#total_count ⇒ Object
59 60 61 |
# File 'lib/triglav/agent/base/processor.rb', line 59 def total_count resources.size end |