lib/chronicle/etl/cli/jobs.rb in chronicle-etl-0.4.4 vs lib/chronicle/etl/cli/jobs.rb in chronicle-etl-0.5.0
- old
+ new
@@ -18,12 +18,12 @@
class_option :loader, aliases: '-l', desc: 'Loader class. Default: table', banner: 'NAME'
class_option :'loader-opts', desc: 'Loader options', type: :hash, default: {}
# This is an array to deal with shell globbing
class_option :input, aliases: '-i', desc: 'Input filename or directory', default: [], type: 'array', banner: 'FILENAME'
- class_option :since, desc: "Load records SINCE this date", banner: 'DATE'
- class_option :until, desc: "Load records UNTIL this date", banner: 'DATE'
+ class_option :since, desc: "Load records SINCE this date (or fuzzy time duration)", banner: 'DATE'
+ class_option :until, desc: "Load records UNTIL this date (or fuzzy time duration)", banner: 'DATE'
class_option :limit, desc: "Only extract the first LIMIT records", banner: 'N'
class_option :output, aliases: '-o', desc: 'Output filename', type: 'string'
class_option :fields, desc: 'Output only these fields', type: 'array', banner: 'field1 field2 ...'
class_option :header_row, desc: 'Output the header row of tabular output', type: 'boolean'
@@ -47,29 +47,32 @@
def start
job_definition = build_job_definition(options)
if job_definition.plugins_missing?
missing_plugins = job_definition.errors[:plugins]
- .select { |error| error.is_a?(Chronicle::ETL::PluginLoadError) }
+ .select { |error| error.is_a?(Chronicle::ETL::PluginNotInstalledError) }
.map(&:name)
.uniq
install_missing_plugins(missing_plugins)
end
run_job(job_definition)
rescue Chronicle::ETL::JobDefinitionError => e
- cli_fail(message: "Error running job.\n#{job_definition.errors}", exception: e)
+ message = ""
+ job_definition.errors.each_pair do |category, errors|
+ message << "Problem with #{category}:\n - #{errors.map(&:to_s).join("\n -")}"
+ end
+ cli_fail(message: "Error running job.\n#{message}", exception: e)
end
desc "create", "Create a job"
# Create an ETL job
def create
job_definition = build_job_definition(options)
job_definition.validate!
- path = File.join('chronicle', 'etl', 'jobs', options[:name])
- Chronicle::ETL::Config.write(path, job_definition.definition)
+ Chronicle::ETL::Config.write("jobs", options[:name], job_definition.definition)
rescue Chronicle::ETL::JobDefinitionError => e
cli_fail(message: "Job definition error", exception: e)
end
desc "show", "Show details about a job"
@@ -86,11 +89,11 @@
# List available ETL jobs
def list
jobs = Chronicle::ETL::Config.available_jobs
job_details = jobs.map do |job|
- r = Chronicle::ETL::Config.load("chronicle/etl/jobs/#{job}.yml")
+ r = Chronicle::ETL::Config.load("jobs", job)
extractor = r[:extractor][:name] if r[:extractor]
transformer = r[:transformer][:name] if r[:transformer]
loader = r[:loader][:name] if r[:loader]
@@ -107,10 +110,13 @@
end
private
def run_job(job_definition)
+ # FIXME: clumsy to make CLI responsible for setting secrets here. Think about a better way to do this
+ job_definition.apply_default_secrets
+
job = Chronicle::ETL::Job.new(job_definition)
runner = Chronicle::ETL::Runner.new(job)
runner.run!
end
@@ -134,24 +140,25 @@
definition.add_config(process_flag_options(options).transform_keys(&:to_sym))
definition
end
def load_job_config name
- Chronicle::ETL::Config.load_job_from_config(name)
+ Chronicle::ETL::Config.read_job(name)
end
# Takes flag options and turns them into a runner config
+ # TODO: this needs a lot of refactoring
def process_flag_options options
- extractor_options = options[:'extractor-opts'].merge({
+ extractor_options = options[:'extractor-opts'].transform_keys(&:to_sym).merge({
input: (options[:input] if options[:input].any?),
since: options[:since],
until: options[:until],
- limit: options[:limit],
+ limit: options[:limit]
}.compact)
- transformer_options = options[:'transformer-opts']
+ transformer_options = options[:'transformer-opts'].transform_keys(&:to_sym)
- loader_options = options[:'loader-opts'].merge({
+ loader_options = options[:'loader-opts'].transform_keys(&:to_sym).merge({
output: options[:output],
header_row: options[:header_row],
fields: options[:fields]
}.compact)