Class: Triglav::Agent::Base::Processor
- Inherits:
-
Object
- Object
- Triglav::Agent::Base::Processor
- Defined in:
- lib/triglav/agent/base/processor.rb
Overview
Triglav agent processor class.
An instance is created for a `resource_uri_prefix`.
You usually do not need to customize this class, but if you want to implement your original, configure
Triglav::Agent::Configuration.processor_class
Instance Attribute Summary collapse
-
#resource_uri_prefix ⇒ Object
readonly
Returns the value of attribute resource_uri_prefix.
-
#worker ⇒ Object
readonly
Returns the value of attribute worker.
Class Method Summary collapse
Instance Method Summary collapse
-
#initialize(worker, resource_uri_prefix) ⇒ Processor
constructor
A new instance of Processor.
- #process ⇒ Object
- #total_count ⇒ Object
Constructor Details
#initialize(worker, resource_uri_prefix) ⇒ Processor
Returns a new instance of Processor
17 18 19 20 21 22 23 24 25 26 27 |
# File 'lib/triglav/agent/base/processor.rb', line 17 def initialize(worker, resource_uri_prefix) @worker = worker @resource_uri_prefix = resource_uri_prefix @connection_pool = ConnectionPool.new(connection_pool_opts) { connection_class.new(get_connection_info(resource_uri_prefix)) } @api_client_pool = ConnectionPool.new(connection_pool_opts) { ApiClient.new # renew connection } @mutex = Mutex.new end |
Instance Attribute Details
#resource_uri_prefix ⇒ Object (readonly)
Returns the value of attribute resource_uri_prefix
15 16 17 |
# File 'lib/triglav/agent/base/processor.rb', line 15 def resource_uri_prefix @resource_uri_prefix end |
#worker ⇒ Object (readonly)
Returns the value of attribute worker
15 16 17 |
# File 'lib/triglav/agent/base/processor.rb', line 15 def worker @worker end |
Class Method Details
.max_consecuitive_error_count ⇒ Object
29 30 31 |
# File 'lib/triglav/agent/base/processor.rb', line 29 def self.max_consecuitive_error_count 3 end |
Instance Method Details
#process ⇒ Object
33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
# File 'lib/triglav/agent/base/processor.rb', line 33 def process success_count = 0 consecutive_error_count = 0 Parallel.each(resources, parallel_opts) do |resource| raise Parallel::Break if stopped? events = nil begin @connection_pool.with do |connection| monitor = monitor_class.new(connection, resource) monitor.process do |_events| events = _events $logger.info { "send_messages:#{events.map(&:to_hash).to_json}" } @api_client_pool.with {|api_client| api_client.(events) } end end @mutex.synchronize do success_count += 1 consecutive_error_count = 0 end rescue => e log_error(e) $logger.info { "failed_events:#{events.map(&:to_hash).to_json}" } if events @mutex.synchronize do raise TooManyError if (consecutive_error_count += 1) > self.class.max_consecuitive_error_count end end end success_count end |
#total_count ⇒ Object
63 64 65 |
# File 'lib/triglav/agent/base/processor.rb', line 63 def total_count resources.size end |