# fluent-plugin-bigquery [Fluentd](http://fluentd.org) output plugin to load/insert data into Google BigQuery. * insert data over streaming inserts * for continuous real-time insertions, under many limitations * https://developers.google.com/bigquery/streaming-data-into-bigquery#usecases * (NOT IMPLEMENTED) load data * for data loading as batch jobs, for big amount of data * https://developers.google.com/bigquery/loading-data-into-bigquery Current version of this plugin supports Google API with Service Account Authentication, but does not support OAuth flow for installed applications. ## Configuration ### Streming inserts Configure insert specifications with target table schema, with your credentials. This is minimum configurations: ```apache type bigquery method insert # default auth_method private_key # default email xxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxx@developer.gserviceaccount.com private_key_path /home/username/.keys/00000000000000000000000000000000-privatekey.p12 # private_key_passphrase notasecret # default project yourproject_id dataset yourdataset_id table tablename time_format %s time_field time field_integer time,status,bytes field_string rhost,vhost,path,method,protocol,agent,referer field_float requestime field_boolean bot_access,loginsession ``` For high rate inserts over streaming inserts, you should specify flush intervals and buffer chunk options: ```apache type bigquery method insert # default flush_interval 1 # flush as frequent as possible buffer_chunk_records_limit 300 # default rate limit for users is 100 buffer_queue_limit 10240 # 1MB * 10240 -> 10GB! num_threads 16 auth_method private_key # default email xxxxxxxxxxxx-xxxxxxxxxxxxxxxxxxxxxx@developer.gserviceaccount.com private_key_path /home/username/.keys/00000000000000000000000000000000-privatekey.p12 # private_key_passphrase notasecret # default project yourproject_id dataset yourdataset_id tables accesslog1,accesslog2,accesslog3 time_format %s time_field time field_integer time,status,bytes field_string rhost,vhost,path,method,protocol,agent,referer field_float requestime field_boolean bot_access,loginsession ``` Important options for high rate events are: * `tables` * 2 or more tables are available with ',' separator * `out_bigquery` uses these tables for Table Sharding inserts * these must have same schema * `buffer_chunk_records_limit` * number of records over streaming inserts API call is limited as 100, per second, per table * default average rate limit is 100, and spike rate limit is 1000 * `out_bigquery` flushes buffer with 100 records for 1 inserts API call * `buffer_queue_limit` * BigQuery streaming inserts needs very small buffer chunks * for high-rate events, `buffer_queue_limit` should be configured with big number * Max 1GB memory may be used under network problem in default configuration * `buffer_chunk_limit (default 1MB)` x `buffer_queue_limit (default 1024)` * `num_threads` * threads for insert api calls in parallel * specify this option for 100 or more records per seconds * 10 or more threads seems good for inserts over internet * less threads may be good for Google Compute Engine instances (with low latency for BigQuery) * `flush_interval` * `1` is lowest value, without patches on Fluentd v0.10.41 or earlier * see `patches` below ### Authentication There are two methods supported to fetch access token for the service account. 1. Public-Private key pair 2. Predefined access token (Compute Engine only) The examples above use the first one. You first need to create a service account (client ID), download its private key and deploy the key with fluentd. On the other hand, you don't need to explicitly create a service account for fluentd when you run fluentd in Google Compute Engine. In this second authentication method, you need to add the API scope "https://www.googleapis.com/auth/bigquery" to the scope list of your Compute Engine instance, then you can configure fluentd like this. ```apache type bigquery auth_method compute_engine project yourproject_id dataset yourdataset_id table tablename time_format %s time_field time field_integer time,status,bytes field_string rhost,vhost,path,method,protocol,agent,referer field_float requestime field_boolean bot_access,loginsession ``` ### Table schema There are two methods to describe the schema of the target table. 1. List fields in fluent.conf 2. Load a schema file in JSON. The examples above use the first method. In this method, you can also specify nested fields by prefixing their belonging record fields. ```apache type bigquery ... time_format %s time_field time field_integer time,response.status,response.bytes field_string request.vhost,request.path,request.method,request.protocol,request.agent,request.referer,remote.host,remote.ip,remote.user field_float request.time field_boolean request.bot_access,request.loginsession ``` This schema accepts structured JSON data like: ```json { "request":{ "time":1391748126.7000976, "vhost":"www.example.com", "path":"/", "method":"GET", "protocol":"HTTP/1.1", "agent":"HotJava", "bot_access":false }, "remote":{ "ip": "192.0.2.1" }, "response":{ "status":200, "bytes":1024 } } ``` The second method is to specify a path to a BigQuery schema file instead of listing fields. In this case, your fluent.conf looks like: ```apache type bigquery ... time_format %s time_field time schema_path /path/to/httpd.schema field_integer time ``` where /path/to/httpd.schema is a path to the JSON-encoded schema file which you used for creating the table on BigQuery. NOTE: Since JSON does not define how to encode data of TIMESTAMP type, you are still recommended to specify JSON types for TIMESTAMP fields as "time" field does in the example. ### patches This plugin depends on `fluent-plugin-buffer-lightening`, and it includes monkey patch module for BufferedOutput plugin, to realize high rate and low latency flushing. With this patch, sub 1 second flushing available. To use this feature, execute fluentd with `-r fluent/plugin/output_try_flush_interval_patch` option. And configure `flush_interval` and `try_flush_interval` with floating point value. ```apache type bigquery method insert # default flush_interval 0.2 try_flush_interval 0.05 buffer_chunk_records_limit 300 # default rate limit for users is 100 buffer_queue_limit 10240 # 1MB * 10240 -> 10GB! num_threads 16 # credentials, project/dataset/table and schema specs. ``` With this configuration, flushing will be done in 0.25 seconds after record inputs in the worst case. ## TODO * support Load API * with automatically configured flush/buffer options * support optional data fields * support NULLABLE/REQUIRED/REPEATED field options * OAuth installed application credentials support * Google API discovery expiration * Error classes * check row size limits