lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-0.1.3 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-1.0.0

- old
+ new

@@ -1,71 +1,163 @@ # encoding: utf-8 require "logstash/inputs/base" require "logstash/namespace" require "logstash/plugin_mixins/jdbc" +require "yaml" # persistence -# INFORMATION +# This plugin was created as a way to ingest data in any database +# with a JDBC interface into Logstash. You can periodically schedule ingestion +# using a cron syntax (see `schedule` setting) or run the query one time to load +# data into Logstash. Each row in the resultset becomes a single event. +# Columns in the resultset are converted into fields in the event. # -# This plugin was created as a way to iteratively ingest any database -# with a JDBC interface into Logstash. +# ==== Drivers # -# #### JDBC Mixin +# This plugin does not come packaged with JDBC driver libraries. The desired +# jdbc driver library must be explicitly passed in to the plugin using the +# `jdbc_driver_library` configuration option. +# +# ==== Scheduling # -# This plugin utilizes a mixin that helps Logstash plugins manage JDBC connections. -# The mixin provides its own set of configurations (some are required) to properly -# set up the connection to the appropriate database. +# Input from this plugin can be scheduled to run periodically according to a specific +# schedule. This scheduling syntax is powered by [rufus-scheduler](https://github.com/jmettraux/rufus-scheduler). +# The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ). # -# #### Predefined Parameters +# Examples: # -# Some parameters are built-in and can be used from within your queries. -# Here is the list: -# # |========================================================== -# |sql_last_start |The time the last query executed in plugin +# | * 5 * 1-3 * | will execute every minute of 5am every day of January through March. +# | 0 * * * * | will execute on the 0th minute of every hour every day. +# | 0 6 * * * America/Chicago | will execute at 6:00am (UTC/GMT -5) every day. # |========================================================== +# # -# #### Usage: -# This is an example logstash config +# Further documentation describing this syntax can be found [here](https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings) +# +# ==== State +# +# The plugin will persist the `sql_last_start` parameter in the form of a +# metadata file stored in the configured `last_run_metadata_path`. Upon shutting down, +# this file will be updated with the current value of `sql_last_start`. Next time +# the pipeline starts up, this value will be updated by reading from the file. If +# `clean_run` is set to true, this value will be ignored and `sql_last_start` will be +# set to Jan 1, 1970, as if no query has ever been executed. +# +# ==== Dealing With Large Result-sets +# +# Many JDBC drivers use the `fetch_size` parameter to limit how many +# results are pre-fetched at a time from the cursor into the client's cache +# before retrieving more results from the result-set. This is configured in +# this plugin using the `jdbc_fetch_size` configuration option. No fetch size +# is set by default in this plugin, so the specific driver's default size will +# be used. +# +# ==== Usage: +# +# Here is an example of setting up the plugin to fetch data from a MySQL database. +# First, we place the appropriate JDBC driver library in our current +# path (this can be placed anywhere on your filesystem). In this example, we connect to +# the 'mydb' database using the user: 'mysql' and wish to input all rows in the 'songs' +# table that match a specific artist. The following examples demonstrates a possible +# Logstash configuration for this. The `schedule` option in this example will +# instruct the plugin to execute this input statement on the minute, every minute. +# # [source,ruby] +# ---------------------------------- # input { # jdbc { -# jdbc_driver_class => "org.apache.derby.jdbc.EmbeddedDriver" (required; from mixin) -# jdbc_connection_string => "jdbc:derby:memory:testdb;create=true" (required; from mixin) -# jdbc_user => "username" (from mixin) -# jdbc_password => "mypass" (from mixin) -# statement => "SELECT * from table where created_at > :sql_last_start and id = :my_id" (required) -# parameters => { "my_id" => "231" } +# jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar" +# jdbc_driver_class => "com.mysql.jdbc.Driver" +# jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb" +# jdbc_user => "mysql" +# parameters => { "favorite_artist" => "Beethoven" } # schedule => "* * * * *" +# statement => "SELECT * from songs where artist = :favorite_artist" # } # } +# ---------------------------------- +# +# ==== Configuring SQL statement +# +# A sql statement is required for this input. This can be passed-in via a +# statement option in the form of a string, or read from a file (`statement_filepath`). File +# option is typically used when the SQL statement is large or cumbersome to supply in the config. +# The file option only supports one SQL statement. The plugin will only accept one of the options. +# It cannot read a statement from a file as well as from the `statement` configuration parameter. +# +# ==== Predefined Parameters +# +# Some parameters are built-in and can be used from within your queries. +# Here is the list: +# +# |========================================================== +# |sql_last_start | The last time a statement was executed. This is set to +# | | Thursday, 1 January 1970 before any query is run, and updated +# | | accordingly after first query is run. +# |========================================================== +# class LogStash::Inputs::Jdbc < LogStash::Inputs::Base include LogStash::PluginMixins::Jdbc config_name "jdbc" # If undefined, Logstash will complain, even if codec is unused. default :codec, "plain" # Statement to execute + # # To use parameters, use named parameter syntax. # For example: + # + # [source, ruby] + # ---------------------------------- # "SELECT * FROM MYTABLE WHERE id = :target_id" - # here ":target_id" is a named parameter + # ---------------------------------- # - config :statement, :validate => :string, :required => true + # here, ":target_id" is a named parameter. You can configure named parameters + # with the `parameters` setting. + config :statement, :validate => :string + # Path of file containing statement to execute + config :statement_filepath, :validate => :path + # Hash of query parameter, for example `{ "target_id" => "321" }` config :parameters, :validate => :hash, :default => {} # Schedule of when to periodically run statement, in Cron format # for example: "* * * * *" (execute query every minute, on the minute) + # + # There is no schedule by default. If no schedule is given, then the statement is run + # exactly once. config :schedule, :validate => :string + # Path to file with last run time + config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run" + + # Whether the previous run state should be preserved + config :clean_run, :validate => :boolean, :default => false + + # Whether to save state or not in last_run_metadata_path + config :record_last_run, :validate => :boolean, :default => true + public def register require "rufus/scheduler" - prepare_jdbc_connection() + prepare_jdbc_connection + + # load sql_last_start from file if exists + if @clean_run && File.exist?(@last_run_metadata_path) + File.delete(@last_run_metadata_path) + elsif File.exist?(@last_run_metadata_path) + @sql_last_start = YAML.load(File.read(@last_run_metadata_path)) + end + + unless @statement.nil? ^ @statement_filepath.nil? + raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.") + end + + @statement = File.read(@statement_filepath) if @statement_filepath end # def register def run(queue) if @schedule @scheduler = Rufus::Scheduler.new @@ -77,16 +169,21 @@ execute_query(queue) end end # def run def teardown - if @scheduler - @scheduler.stop + @scheduler.stop if @scheduler + + # update state file for next run + if @record_last_run + File.write(@last_run_metadata_path, YAML.dump(@sql_last_start)) end - close_jdbc_connection() + + close_jdbc_connection end # def teardown private + def execute_query(queue) # update default parameters @parameters['sql_last_start'] = @sql_last_start execute_statement(@statement, @parameters) do |row| event = LogStash::Event.new(row)