Sha256: b1dea53aea7f86cd38e07408ec9a23297a810b78d2f6c57e540b0faa5967c123
Contents?: true
Size: 1.46 KB
Versions: 2
Compression:
Stored size: 1.46 KB
Contents
# frozen_string_literal: true module ElasticAPM module Transport # @api private class Worker include Logging # @api private class StopMessage; end # @api private class FlushMessage; end def initialize( config, queue, serializers:, filters:, conn_adapter: Connection ) @config = config @queue = queue @stopping = false @connection = conn_adapter.new(config) @serializers = serializers @filters = filters end attr_reader :queue, :filters, :name, :connection, :serializers def stop @stopping = true end def stopping? @stopping end # rubocop:disable Metrics/MethodLength, Metrics/AbcSize def work_forever while (msg = queue.pop) case msg when StopMessage stop else process msg end next unless stopping? debug 'Stopping worker -- %s', self @connection.flush break end rescue Exception => e warn 'Worker died with exception: %s', e.inspect debug e.backtrace.join("\n") end # rubocop:enable Metrics/MethodLength, Metrics/AbcSize private def process(resource) serialized = serializers.serialize(resource) @filters.apply!(serialized) @connection.write(serialized.to_json) end end end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
elastic-apm-2.0.1 | lib/elastic_apm/transport/worker.rb |
elastic-apm-2.0.0 | lib/elastic_apm/transport/worker.rb |