lib/elastic_apm/transport/worker.rb in elastic-apm-2.6.1 vs lib/elastic_apm/transport/worker.rb in elastic-apm-2.7.0
- old
+ new
@@ -20,56 +20,51 @@
conn_adapter: Connection
)
@config = config
@queue = queue
- @stopping = false
-
@serializers = serializers
@filters = filters
metadata = serializers.serialize(Metadata.new(config))
@connection = conn_adapter.new(config, metadata)
end
attr_reader :queue, :filters, :name, :connection, :serializers
- def stop
- @stopping = true
- end
-
- def stopping?
- @stopping
- end
-
# rubocop:disable Metrics/MethodLength
def work_forever
while (msg = queue.pop)
case msg
when StopMessage
- stop
+ debug 'Stopping worker -- %s', self
+ connection.flush(:halt)
+ break
else
process msg
end
-
- next unless stopping?
-
- debug 'Stopping worker -- %s', self
- @connection.flush
- break
end
rescue Exception => e
warn 'Worker died with exception: %s', e.inspect
debug e.backtrace.join("\n")
end
# rubocop:enable Metrics/MethodLength
+ def process(resource)
+ return unless (json = serialize_and_filter(resource))
+ connection.write(json)
+ end
+
private
- def process(resource)
+ def serialize_and_filter(resource)
serialized = serializers.serialize(resource)
@filters.apply!(serialized)
- @connection.write(serialized.to_json)
+ JSON.fast_generate(serialized)
+ rescue Exception
+ error format('Failed converting event to JSON: %s', resource.inspect)
+ error serialized.inspect
+ nil
end
end
end
end