lib/chronicle/etl/cli/jobs.rb in chronicle-etl-0.5.5 vs lib/chronicle/etl/cli/jobs.rb in chronicle-etl-0.6.1
- old
+ new
@@ -1,60 +1,85 @@
-require 'pp'
+# frozen_string_literal: true
+
require 'tty-prompt'
module Chronicle
module ETL
module CLI
# CLI commands for working with ETL jobs
class Jobs < SubcommandBase
- default_task "start"
+ default_task 'start'
namespace :jobs
- class_option :extractor, aliases: '-e', desc: "Extractor class. Default: stdin", banner: 'NAME'
+ class_option :extractor, aliases: '-e', desc: 'Extractor class. Default: stdin', banner: 'NAME'
class_option :'extractor-opts', desc: 'Extractor options', type: :hash, default: {}
- class_option :transformer, aliases: '-t', desc: 'Transformer class. Default: null', banner: 'NAME'
- class_option :'transformer-opts', desc: 'Transformer options', type: :hash, default: {}
+ class_option :transformer,
+ aliases: '-t',
+ desc: 'Transformer identifier. Default: null',
+ banner: 'NAME',
+ type: 'array',
+ repeatable: true
class_option :loader, aliases: '-l', desc: 'Loader class. Default: table', banner: 'NAME'
class_option :'loader-opts', desc: 'Loader options', type: :hash, default: {}
# This is an array to deal with shell globbing
- class_option :input, aliases: '-i', desc: 'Input filename or directory', default: [], type: 'array', banner: 'FILENAME'
- class_option :since, desc: "Load records SINCE this date (or fuzzy time duration)", banner: 'DATE'
- class_option :until, desc: "Load records UNTIL this date (or fuzzy time duration)", banner: 'DATE'
- class_option :limit, desc: "Only extract the first LIMIT records", banner: 'N'
+ class_option :input,
+ aliases: '-i',
+ desc: 'Input filename or directory',
+ default: [],
+ type: 'array',
+ banner: 'FILENAME'
+ class_option :since, desc: 'Load records SINCE this date (or fuzzy time duration)', banner: 'DATE'
+ class_option :until, desc: 'Load records UNTIL this date (or fuzzy time duration)', banner: 'DATE'
+ class_option :limit, desc: 'Only extract the first LIMIT records', banner: 'N'
+ class_option :schema,
+ desc: 'Which Schema to transform',
+ banner: 'SCHEMA_NAME',
+ type: 'string',
+ enum: %w[chronicle activitystream schemaorg chronobase]
+ class_option :format,
+ desc: 'How to serialize results',
+ banner: 'SCHEMA_NAME',
+ type: 'string',
+ enum: %w[jsonapi jsonld]
+
class_option :output, aliases: '-o', desc: 'Output filename', type: 'string'
class_option :fields, desc: 'Output only these fields', type: 'array', banner: 'field1 field2 ...'
+ class_option :'fields-limit', desc: 'Output first N fields', type: :numeric
+ class_option :filter, desc: 'Filter records', type: 'array', banner: 'field=value'
class_option :header_row, desc: 'Output the header row of tabular output', type: 'boolean'
# Thor doesn't like `run` as a command name
map run: :start
- desc "run", "Start a job"
+ desc 'run', 'Start a job'
option :dry_run, desc: 'Only run the extraction and transform steps, not the loading', type: :boolean
long_desc <<-LONG_DESC
This will run an ETL job. Each job needs three parts:
1. #{'Extractor'.underline}: pulls data from an external source. By default, this is stdout. Other common options including pulling data from an API or reading JSON from a file.
- 2. #{'Transformer'.underline}: transforms data into a new format. If none is specified, we use the `null` transformer which does nothing to the data.
+ 2. #{'Transformers'.underline}: transform data into a new format. If none is specified, we use the `null` transformer which does nothing to the data.
3. #{'Loader'.underline}: takes that transformed data and loads it externally. This can be an API, flat files, (or by default), stdout. With the --dry-run option, this step won't be run.
If you do not want to use the command line flags, you can also configure a job with a .yml config file. You can either specify the path to this file or use the filename and place the file in ~/.config/chronicle/etl/jobs/NAME.yml and call it with `--job NAME`
-LONG_DESC
+ LONG_DESC
# Run an ETL job
- def start(name = nil)
+ def start(*args)
+ name = args.first
+
# If someone runs `$ chronicle-etl` with no arguments, show help menu.
# TODO: decide if we should check that there's nothing in stdin pipe
# in case user wants to actually run this sort of job stdin->null->stdout
if name.nil? && options[:extractor].nil?
m = Chronicle::ETL::CLI::Main.new
m.help
cli_exit
end
- cli_fail(message: "Job '#{name}' does not exist") if name && !Chronicle::ETL::Config.exists?("jobs", name)
+ cli_fail(message: "Job '#{name}' does not exist") if name && !Chronicle::ETL::Config.exists?('jobs', name)
job_definition = build_job_definition(name, options)
if job_definition.plugins_missing?
missing_plugins = job_definition.errors[:plugins]
@@ -64,75 +89,91 @@
install_missing_plugins(missing_plugins)
end
run_job(job_definition)
rescue Chronicle::ETL::JobDefinitionError => e
- message = ""
+ message = ''
job_definition.errors.each_pair do |category, errors|
message << "Problem with #{category}:\n - #{errors.map(&:to_s).join("\n - ")}"
end
cli_fail(message: "Error running job.\n#{message}", exception: e)
end
option :'skip-confirmation', aliases: '-y', type: :boolean
- desc "save", "Save a job"
+ desc 'save', 'Save a job'
# Create an ETL job
def save(name)
write_config = true
job_definition = build_job_definition(name, options)
job_definition.validate!
- if Chronicle::ETL::Config.exists?("jobs", name) && !options[:'skip-confirmation']
+ if Chronicle::ETL::Config.exists?('jobs', name) && !options[:'skip-confirmation']
prompt = TTY::Prompt.new
write_config = false
message = "Job '#{name}' exists already. Ovewrite it?"
begin
write_config = prompt.yes?(message)
rescue TTY::Reader::InputInterrupt
end
end
if write_config
- Chronicle::ETL::Config.write("jobs", name, job_definition.definition)
+ Chronicle::ETL::Config.write('jobs', name, job_definition.definition)
cli_exit(message: "Job saved. Run it with `$ chronicle-etl jobs:run #{name}`")
else
cli_fail(message: "\nJob not saved")
end
rescue Chronicle::ETL::JobDefinitionError => e
- cli_fail(message: "Job definition error", exception: e)
+ cli_fail(message: 'Job definition error', exception: e)
end
- desc "show", "Show details about a job"
+ desc 'show', 'Show details about a job'
# Show an ETL job
def show(name = nil)
- cli_fail(message: "Job '#{name}' does not exist") if name && !Chronicle::ETL::Config.exists?("jobs", name)
+ cli_fail(message: "Job '#{name}' does not exist") if name && !Chronicle::ETL::Config.exists?('jobs', name)
job_definition = build_job_definition(name, options)
job_definition.validate!
puts Chronicle::ETL::Job.new(job_definition)
rescue Chronicle::ETL::JobDefinitionError => e
- cli_fail(message: "Job definition error", exception: e)
+ cli_fail(message: 'Job definition error', exception: e)
end
- desc "list", "List all available jobs"
+ desc 'edit', 'Edit a job in default editor ($EDITOR)'
+ def edit(name = nil)
+ cli_fail(message: "Job '#{name}' does not exist") if name && !Chronicle::ETL::Config.exists?('jobs', name)
+
+ filename = Chronicle::ETL::Config.path('jobs', name)
+ system "${VISUAL:-${EDITOR:-vi}} \"#{filename}\""
+
+ definition = Chronicle::ETL::JobDefinition.new
+ definition.add_config(load_job_config(name))
+ definition.validate!
+
+ cli_exit(message: "Job '#{name}' saved")
+ rescue Chronicle::ETL::JobDefinitionError => e
+ cli_fail(message: 'Job definition error', exception: e)
+ end
+
+ desc 'list', 'List all available jobs'
# List available ETL jobs
def list
jobs = Chronicle::ETL::Config.available_jobs
job_details = jobs.map do |job|
- r = Chronicle::ETL::Config.load("jobs", job)
+ r = Chronicle::ETL::Config.load('jobs', job)
extractor = r[:extractor][:name] if r[:extractor]
transformer = r[:transformer][:name] if r[:transformer]
loader = r[:loader][:name] if r[:loader]
[job, extractor, transformer, loader]
end
- headers = ['name', 'extractor', 'transformer', 'loader'].map { |h| h.upcase.bold }
+ headers = %w[name extractor transformer loader].map { |h| h.upcase.bold }
- puts "Available jobs:"
+ puts 'Available jobs:'
table = TTY::Table.new(headers, job_details)
puts table.render(indent: 0, padding: [0, 2])
rescue Chronicle::ETL::ConfigError => e
cli_fail(message: "Config error. #{e.message}", exception: e)
end
@@ -146,22 +187,23 @@
job_definition.apply_default_secrets
job = Chronicle::ETL::Job.new(job_definition)
runner = Chronicle::ETL::Runner.new(job)
runner.run!
rescue RunnerError => e
- cli_fail(message: "#{e.message}", exception: e)
+ cli_fail(message: e.message.to_s, exception: e)
end
# TODO: probably could merge this with something in cli/plugin
def install_missing_plugins(missing_plugins)
prompt = TTY::Prompt.new
message = "Plugin#{'s' if missing_plugins.count > 1} specified by job not installed.\n"
- message += "Do you want to install "
- message += missing_plugins.map { |name| "chronicle-#{name}".bold}.join(", ")
- message += " and start the job?"
+ message += 'Do you want to install '
+ message += missing_plugins.map { |name| "chronicle-#{name}".bold }
+ .join(', ')
+ message += ' and start the job?'
will_install = prompt.yes?(message)
- cli_fail(message: "Must install #{missing_plugins.join(", ")} plugin to run job") unless will_install
+ cli_fail(message: "Must install #{missing_plugins.join(', ')} plugin to run job") unless will_install
Chronicle::ETL::CLI::Plugins.new.install(*missing_plugins)
end
# Create job definition by reading config file and then overwriting with flag options
@@ -170,46 +212,81 @@
definition.add_config(load_job_config(name))
definition.add_config(process_flag_options(options).transform_keys(&:to_sym))
definition
end
- def load_job_config name
+ def load_job_config(name)
Chronicle::ETL::Config.read_job(name)
end
# Takes flag options and turns them into a runner config
# TODO: this needs a lot of refactoring
- def process_flag_options options
- extractor_options = options[:'extractor-opts'].transform_keys(&:to_sym).merge({
- input: (options[:input] if options[:input].any?),
- since: options[:since],
- until: options[:until],
- limit: options[:limit]
- }.compact)
+ def process_flag_options(options)
+ extractor_options = options[:'extractor-opts'].transform_keys(&:to_sym).merge(
+ {
+ input: (options[:input] if options[:input].any?),
+ since: options[:since],
+ until: options[:until],
+ limit: options[:limit]
+ }.compact
+ )
- transformer_options = options[:'transformer-opts'].transform_keys(&:to_sym)
+ loader_options = options[:'loader-opts'].transform_keys(&:to_sym).merge(
+ {
+ output: options[:output],
+ header_row: options[:header_row]
+ }.compact
+ )
- loader_options = options[:'loader-opts'].transform_keys(&:to_sym).merge({
- output: options[:output],
- header_row: options[:header_row],
- fields: options[:fields]
- }.compact)
-
- {
+ processed_options = {
dry_run: options[:dry_run],
extractor: {
name: options[:extractor],
options: extractor_options
}.compact,
- transformer: {
- name: options[:transformer],
- options: transformer_options
- }.compact,
loader: {
name: options[:loader],
options: loader_options
}.compact
}
+
+ add_transformer(processed_options, 'chronicle') if options[:schema]
+ add_transformer(processed_options, options[:schema]) if options[:schema] && options[:schema] != 'chronicle'
+ add_transformers_from_option(processed_options, options[:transformer]) if options[:transformer]&.any?
+ if options[:filter]
+ add_transformer(processed_options, :filter, { filters: options[:filter].to_h do |f|
+ f.split('=')
+ end })
+ end
+ add_transformer(processed_options, :format, { format: options[:format] }) if options[:format]
+ add_transformer(processed_options, :filter_fields, { fields: options[:fields] }) if options[:fields]
+ if options[:'fields-limit']
+ add_transformer(processed_options, :fields_limit,
+ { limit: options[:'fields-limit'] })
+ end
+
+ processed_options
+ end
+
+ def add_transformer(processed_options, name, options = {})
+ processed_options[:transformers] ||= []
+ processed_options[:transformers] << { name:, options: }
+ end
+
+ def add_transformers_from_option(processed_options, transformer_option)
+ processed_options[:transformers] ||= []
+ processed_options[:transformers] += transformer_option.map do |transformer_args|
+ transformer_name, *transformer_options = transformer_args
+ transformer_options = transformer_options.filter { |opt| opt.include?('=') }
+
+ {
+ name: transformer_name,
+ options: transformer_options.to_h do |opt|
+ key, value = opt.split('=')
+ [key.to_sym, value]
+ end
+ }
+ end
end
end
end
end
end