# Fluent-plugin-openlineage, a plugin for [Fluentd](https://www.fluentd.org) [![Gem Version](https://badge.fury.io/rb/fluent-plugin-openlineage.svg)](https://badge.fury.io/rb/fluent-plugin-openlineage) fluent-plugin-openlineage is a Fluentd plugin that verifies if a JSON matches the OpenLineage schema. It is intended to be used together with a [Fluentd Application](https://github.com/fluent/fluentd). ## Requirements | fluent-plugin-prometheus | fluentd | ruby | |--------------------------|------------|--------| | 1.x.y | >= v1.9.1 | >= 2.4 | | 1.[0-7].y | >= v0.14.8 | >= 2.1 | | 0.x.y | >= v0.12.0 | >= 1.9 | ## Installation Add this line to your application's Gemfile: ```ruby gem 'fluent-plugin-openlineage' ``` And then execute: $ bundle Or install it yourself using one of the following: $ gem install fluent-plugin-openlineage $ fluent-gem install fluent-plugin-openlineage ## Usage fluentd-plugin-openlineage include only one plugin. - `openlineage` parse plugin ## Why are Fluentd and Openlineage a perfect match? ### This is part of the OpenLineage Project repository at: https://github.com/OpenLineage/OpenLineage/tree/main/proxy/fluentd Modern data collectors (Fluentd, Logstash, Vector, etc.) can be extremely useful when designing production-grade architectures for processing Openlineage events. They can be used for features such as: * A server-proxy in front of the Openlineage backend (like Marquez) to handle load spikes and buffer incoming events when the backend is down (e.g., due to a maintenance window). * The ability to copy the event to multiple backends such as HTTP, Kafka or cloud object storage. Data collectors implement that out-of-the-box. They have great potential except for a single missing feature: *the ability to parse and validate OpenLineage events at the point of HTTP input*. This is important as one would like to get a `Bad Request` response immediately when sending invalid OpenLineage events to an endpoint. Fortunately, this missing feature can be implemented as a plugin. We decided to implement an OpenLineage parser plugin for Fluentd because: * Fluentd has a small footprint in terms of resource utilization and does not require that JVM be installed, * Fluentd plugins can be installed from local files (no need to register in a plugin repository). As a side effect, the Fluentd integration can be also used as a OpenLineage HTTP validation backend for development purposes. ## Fluentd features Some interesting Fluentd features are available according to the [official documentation](https://docs.fluentd.org/): * [Buffering/retrying parameters](https://docs.fluentd.org/output#buffering-retrying-parameters), * Useful output plugins: * [Output Kafka plugin](https://docs.fluentd.org/output/kafka), * [Output S3 plugin](https://docs.fluentd.org/output/s3), * [Output copy plugin](https://docs.fluentd.org/output/copy), * [Output HTTP plugin](https://docs.fluentd.org/output/http) with options such as [retryable_response_codes](https://docs.fluentd.org/output/http#retryable_response_codes) to specify backend codes that should cause a retry, * [Buffer configuration](https://docs.fluentd.org/configuration/buffer-section), * [Embedding Ruby Expressions in config files to contain environment variables](https://docs.fluentd.org/configuration/config-file#embedding-ruby-expressions). The official Fluentd documentation does not mention guarantees about event ordering. However, retrieving Openlineage events and buffering in file/memory should be considered a millisecond-long operation, while any HTTP backend cannot guarantee ordering in such a case. On the other hand, by default the amount of threads to flush the buffer is set to 1 and configurable ([flush_thread_count](https://docs.fluentd.org/output#flush_thread_count)). ## Quickstart with Docker Please refer to the [`Dockerfile`](docker/Dockerfile) and [`fluent.conf`](docker/conf/fluent.conf) to see how to build and install the plugin with the example usage scenario provided in [`docker-compose.yml`](docker/docker-compose.yml). To run the example setup, go to the `docker` directory and execute the following command: ```shell docker-compose up ``` After all the containers have started, send some HTTP requests: ```shell curl -X POST \ -d '{"test":"test"}' \ -H 'Content-Type: application/json' \ http://localhost:9880/api/v1/lineage ``` In response, you should see the following message: `Openlineage validation failed: path "/": "run" is a required property, path "/": "job" is a required property, path "/": "eventTime" is a required property, path "/": "producer" is a required property, path "/": "schemaURL" is a required property` Next, send some valid requests: ```shell curl -X POST \ -d "$(cat test-start.json)" \ -H 'Content-Type: application/json' \ http://localhost:9880/api/v1/lineage ``` ```shell curl -X POST \ -d "$(cat test-complete.json)" \ -H 'Content-Type: application/json' \ http://localhost:9880/api/v1/lineage ``` After that you should see entities in Marquez (http://localhost:3000/) in the `my-namespace` namespace. To clean up, run ```shell docker-compose down ``` ### Configuration Although Openlineage event is specified according to Json-Schema, its real-life validation may vary and backends like Marquez may have less strict approach to validating certain types of facets. For example, Marquez allows a non-valid `DataQualityMetricsInputDatasetFacet`. To give more flexibility, fluentd parser allows following configuration parameters: ```ruby validate_input_dataset_facets => true/false validate_output_dataset_facets => true/false validate_dataset_facets => true/false validate_run_facets => true/false validate_job_facets => true/false ``` By default, only `validate_run_facets` and `validate_job_facets` are set to `true`/ ### Development To build dependencies: ```shell bundle install bundle ``` To run the tests: ```shell bundle exec rake test ``` #### Installation The easiest way to install the plugin is to install the main application Fluentd and along with it, these external packages for example in a Dockerfile: * `rusty_json_schema` installs a JSON validation library for Rust, * `fluent-plugin-out-http` allows non-bulk HTTP out requests (sending each OpenLineage event in a separate request). ```shell fluent-gem install rusty_json_schema fluent-gem install fluent-plugin-out-http fluent-gem install fluent-plugin-openlineage ``` ## Fluentd proxy setup ### Monitoring with Prometheus and some other configs you can try by running a separate fluent.conf file You may also want to check how Fluentd application itself is doing using Prometheus and for that, you may want to add the plugin: fluent-plugin-prometheus at https://github.com/fluent/fluent-plugin-prometheus and include the following setup in your prometheus.yml file: ```yml global: scrape_interval: 10s # Set the scrape interval to every 10 seconds. Default is every 1 minute. #### A scrape configuration containing exactly one endpoint to scrape: #### Here it's Prometheus itself. scrape_configs: - job_name: 'fluentd' static_configs: - targets: ['localhost:24231'] ```` You may also want to include the following additional parameters to your fluent.conf file: ```xml #### source @type forward bind 0.0.0.0 port 24224 #### count the number of incoming records per tag @type prometheus name fluentd_input_status_num_records_total type counter desc The total number of incoming records tag ${tag} hostname ${hostname} #### count the number of outgoing records per tag @type copy @type forward name myserver1 host 192.168.1.3 port 24224 weight 60 @type prometheus name fluentd_output_status_num_records_total type counter desc The total number of outgoing records tag ${tag} hostname ${hostname} #### expose metrics in prometheus format @type prometheus bind 0.0.0.0 port 24231 metrics_path /metrics @type prometheus_output_monitor interval 10 hostname ${hostname} ``` ### You can check the docker file for some other examples related to configurations For any additional information, you can check out Fluentd official documentation on https://docs.fluentd.org/monitoring-fluentd/monitoring-prometheus#example-prometheus-queries# fluentd-openlineage-parser