lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-0.1.3 vs lib/logstash/inputs/jdbc.rb in logstash-input-jdbc-1.0.0
- old
+ new
@@ -1,71 +1,163 @@
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require "logstash/plugin_mixins/jdbc"
+require "yaml" # persistence
-# INFORMATION
+# This plugin was created as a way to ingest data in any database
+# with a JDBC interface into Logstash. You can periodically schedule ingestion
+# using a cron syntax (see `schedule` setting) or run the query one time to load
+# data into Logstash. Each row in the resultset becomes a single event.
+# Columns in the resultset are converted into fields in the event.
#
-# This plugin was created as a way to iteratively ingest any database
-# with a JDBC interface into Logstash.
+# ==== Drivers
#
-# #### JDBC Mixin
+# This plugin does not come packaged with JDBC driver libraries. The desired
+# jdbc driver library must be explicitly passed in to the plugin using the
+# `jdbc_driver_library` configuration option.
+#
+# ==== Scheduling
#
-# This plugin utilizes a mixin that helps Logstash plugins manage JDBC connections.
-# The mixin provides its own set of configurations (some are required) to properly
-# set up the connection to the appropriate database.
+# Input from this plugin can be scheduled to run periodically according to a specific
+# schedule. This scheduling syntax is powered by [rufus-scheduler](https://github.com/jmettraux/rufus-scheduler).
+# The syntax is cron-like with some extensions specific to Rufus (e.g. timezone support ).
#
-# #### Predefined Parameters
+# Examples:
#
-# Some parameters are built-in and can be used from within your queries.
-# Here is the list:
-#
# |==========================================================
-# |sql_last_start |The time the last query executed in plugin
+# | * 5 * 1-3 * | will execute every minute of 5am every day of January through March.
+# | 0 * * * * | will execute on the 0th minute of every hour every day.
+# | 0 6 * * * America/Chicago | will execute at 6:00am (UTC/GMT -5) every day.
# |==========================================================
+#
#
-# #### Usage:
-# This is an example logstash config
+# Further documentation describing this syntax can be found [here](https://github.com/jmettraux/rufus-scheduler#parsing-cronlines-and-time-strings)
+#
+# ==== State
+#
+# The plugin will persist the `sql_last_start` parameter in the form of a
+# metadata file stored in the configured `last_run_metadata_path`. Upon shutting down,
+# this file will be updated with the current value of `sql_last_start`. Next time
+# the pipeline starts up, this value will be updated by reading from the file. If
+# `clean_run` is set to true, this value will be ignored and `sql_last_start` will be
+# set to Jan 1, 1970, as if no query has ever been executed.
+#
+# ==== Dealing With Large Result-sets
+#
+# Many JDBC drivers use the `fetch_size` parameter to limit how many
+# results are pre-fetched at a time from the cursor into the client's cache
+# before retrieving more results from the result-set. This is configured in
+# this plugin using the `jdbc_fetch_size` configuration option. No fetch size
+# is set by default in this plugin, so the specific driver's default size will
+# be used.
+#
+# ==== Usage:
+#
+# Here is an example of setting up the plugin to fetch data from a MySQL database.
+# First, we place the appropriate JDBC driver library in our current
+# path (this can be placed anywhere on your filesystem). In this example, we connect to
+# the 'mydb' database using the user: 'mysql' and wish to input all rows in the 'songs'
+# table that match a specific artist. The following examples demonstrates a possible
+# Logstash configuration for this. The `schedule` option in this example will
+# instruct the plugin to execute this input statement on the minute, every minute.
+#
# [source,ruby]
+# ----------------------------------
# input {
# jdbc {
-# jdbc_driver_class => "org.apache.derby.jdbc.EmbeddedDriver" (required; from mixin)
-# jdbc_connection_string => "jdbc:derby:memory:testdb;create=true" (required; from mixin)
-# jdbc_user => "username" (from mixin)
-# jdbc_password => "mypass" (from mixin)
-# statement => "SELECT * from table where created_at > :sql_last_start and id = :my_id" (required)
-# parameters => { "my_id" => "231" }
+# jdbc_driver_library => "mysql-connector-java-5.1.36-bin.jar"
+# jdbc_driver_class => "com.mysql.jdbc.Driver"
+# jdbc_connection_string => "jdbc:mysql://localhost:3306/mydb"
+# jdbc_user => "mysql"
+# parameters => { "favorite_artist" => "Beethoven" }
# schedule => "* * * * *"
+# statement => "SELECT * from songs where artist = :favorite_artist"
# }
# }
+# ----------------------------------
+#
+# ==== Configuring SQL statement
+#
+# A sql statement is required for this input. This can be passed-in via a
+# statement option in the form of a string, or read from a file (`statement_filepath`). File
+# option is typically used when the SQL statement is large or cumbersome to supply in the config.
+# The file option only supports one SQL statement. The plugin will only accept one of the options.
+# It cannot read a statement from a file as well as from the `statement` configuration parameter.
+#
+# ==== Predefined Parameters
+#
+# Some parameters are built-in and can be used from within your queries.
+# Here is the list:
+#
+# |==========================================================
+# |sql_last_start | The last time a statement was executed. This is set to
+# | | Thursday, 1 January 1970 before any query is run, and updated
+# | | accordingly after first query is run.
+# |==========================================================
+#
class LogStash::Inputs::Jdbc < LogStash::Inputs::Base
include LogStash::PluginMixins::Jdbc
config_name "jdbc"
# If undefined, Logstash will complain, even if codec is unused.
default :codec, "plain"
# Statement to execute
+ #
# To use parameters, use named parameter syntax.
# For example:
+ #
+ # [source, ruby]
+ # ----------------------------------
# "SELECT * FROM MYTABLE WHERE id = :target_id"
- # here ":target_id" is a named parameter
+ # ----------------------------------
#
- config :statement, :validate => :string, :required => true
+ # here, ":target_id" is a named parameter. You can configure named parameters
+ # with the `parameters` setting.
+ config :statement, :validate => :string
+ # Path of file containing statement to execute
+ config :statement_filepath, :validate => :path
+
# Hash of query parameter, for example `{ "target_id" => "321" }`
config :parameters, :validate => :hash, :default => {}
# Schedule of when to periodically run statement, in Cron format
# for example: "* * * * *" (execute query every minute, on the minute)
+ #
+ # There is no schedule by default. If no schedule is given, then the statement is run
+ # exactly once.
config :schedule, :validate => :string
+ # Path to file with last run time
+ config :last_run_metadata_path, :validate => :string, :default => "#{ENV['HOME']}/.logstash_jdbc_last_run"
+
+ # Whether the previous run state should be preserved
+ config :clean_run, :validate => :boolean, :default => false
+
+ # Whether to save state or not in last_run_metadata_path
+ config :record_last_run, :validate => :boolean, :default => true
+
public
def register
require "rufus/scheduler"
- prepare_jdbc_connection()
+ prepare_jdbc_connection
+
+ # load sql_last_start from file if exists
+ if @clean_run && File.exist?(@last_run_metadata_path)
+ File.delete(@last_run_metadata_path)
+ elsif File.exist?(@last_run_metadata_path)
+ @sql_last_start = YAML.load(File.read(@last_run_metadata_path))
+ end
+
+ unless @statement.nil? ^ @statement_filepath.nil?
+ raise(LogStash::ConfigurationError, "Must set either :statement or :statement_filepath. Only one may be set at a time.")
+ end
+
+ @statement = File.read(@statement_filepath) if @statement_filepath
end # def register
def run(queue)
if @schedule
@scheduler = Rufus::Scheduler.new
@@ -77,16 +169,21 @@
execute_query(queue)
end
end # def run
def teardown
- if @scheduler
- @scheduler.stop
+ @scheduler.stop if @scheduler
+
+ # update state file for next run
+ if @record_last_run
+ File.write(@last_run_metadata_path, YAML.dump(@sql_last_start))
end
- close_jdbc_connection()
+
+ close_jdbc_connection
end # def teardown
private
+
def execute_query(queue)
# update default parameters
@parameters['sql_last_start'] = @sql_last_start
execute_statement(@statement, @parameters) do |row|
event = LogStash::Event.new(row)