Class: Triglav::Agent::Base::Processor

Inherits:
Object
  • Object
show all
Defined in:
lib/triglav/agent/base/processor.rb

Overview

Triglav agent processor class.

An instance is created for a `resource_uri_prefix`.

You usually do not need to customize this class, but if you want to implement your original, configure

Triglav::Agent::Configuration.processor_class

Instance Attribute Summary collapse

Class Method Summary collapse

Instance Method Summary collapse

Constructor Details

#initialize(worker, resource_uri_prefix) ⇒ Processor

Returns a new instance of Processor



17
18
19
20
21
22
23
24
25
26
27
# File 'lib/triglav/agent/base/processor.rb', line 17

def initialize(worker, resource_uri_prefix)
  @worker = worker
  @resource_uri_prefix = resource_uri_prefix
  @connection_pool = ConnectionPool.new(connection_pool_opts) {
    connection_class.new(get_connection_info(resource_uri_prefix))
  }
  @api_client_pool = ConnectionPool.new(connection_pool_opts) {
    ApiClient.new # renew connection
  }
  @mutex = Mutex.new
end

Instance Attribute Details

#resource_uri_prefixObject (readonly)

Returns the value of attribute resource_uri_prefix



15
16
17
# File 'lib/triglav/agent/base/processor.rb', line 15

def resource_uri_prefix
  @resource_uri_prefix
end

#workerObject (readonly)

Returns the value of attribute worker



15
16
17
# File 'lib/triglav/agent/base/processor.rb', line 15

def worker
  @worker
end

Class Method Details

.max_consecuitive_error_countObject



29
30
31
# File 'lib/triglav/agent/base/processor.rb', line 29

def self.max_consecuitive_error_count
  3
end

Instance Method Details

#processObject



33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# File 'lib/triglav/agent/base/processor.rb', line 33

def process
  success_count = 0
  consecutive_error_count = 0
  Parallel.each(resources, parallel_opts) do |resource|
    raise Parallel::Break if stopped?
    events = nil
    begin
      @connection_pool.with do |connection|
        monitor = monitor_class.new(connection, resource)
        monitor.process do |_events|
          events = _events
          $logger.info { "send_messages:#{events.map(&:to_hash).to_json}" }
          @api_client_pool.with {|api_client| api_client.send_messages(events) }
        end
      end
      @mutex.synchronize do
        success_count += 1
        consecutive_error_count = 0
      end
    rescue => e
      log_error(e)
      $logger.info { "failed_events:#{events.map(&:to_hash).to_json}" } if events
      @mutex.synchronize do
        raise TooManyError if (consecutive_error_count += 1) > self.class.max_consecuitive_error_count
      end
    end
  end
  success_count
end

#total_countObject



63
64
65
# File 'lib/triglav/agent/base/processor.rb', line 63

def total_count
  resources.size
end