lib/chronicle/etl/cli/jobs.rb in chronicle-etl-0.4.4 vs lib/chronicle/etl/cli/jobs.rb in chronicle-etl-0.5.0

- old
+ new

@@ -18,12 +18,12 @@ class_option :loader, aliases: '-l', desc: 'Loader class. Default: table', banner: 'NAME' class_option :'loader-opts', desc: 'Loader options', type: :hash, default: {} # This is an array to deal with shell globbing class_option :input, aliases: '-i', desc: 'Input filename or directory', default: [], type: 'array', banner: 'FILENAME' - class_option :since, desc: "Load records SINCE this date", banner: 'DATE' - class_option :until, desc: "Load records UNTIL this date", banner: 'DATE' + class_option :since, desc: "Load records SINCE this date (or fuzzy time duration)", banner: 'DATE' + class_option :until, desc: "Load records UNTIL this date (or fuzzy time duration)", banner: 'DATE' class_option :limit, desc: "Only extract the first LIMIT records", banner: 'N' class_option :output, aliases: '-o', desc: 'Output filename', type: 'string' class_option :fields, desc: 'Output only these fields', type: 'array', banner: 'field1 field2 ...' class_option :header_row, desc: 'Output the header row of tabular output', type: 'boolean' @@ -47,29 +47,32 @@ def start job_definition = build_job_definition(options) if job_definition.plugins_missing? missing_plugins = job_definition.errors[:plugins] - .select { |error| error.is_a?(Chronicle::ETL::PluginLoadError) } + .select { |error| error.is_a?(Chronicle::ETL::PluginNotInstalledError) } .map(&:name) .uniq install_missing_plugins(missing_plugins) end run_job(job_definition) rescue Chronicle::ETL::JobDefinitionError => e - cli_fail(message: "Error running job.\n#{job_definition.errors}", exception: e) + message = "" + job_definition.errors.each_pair do |category, errors| + message << "Problem with #{category}:\n - #{errors.map(&:to_s).join("\n -")}" + end + cli_fail(message: "Error running job.\n#{message}", exception: e) end desc "create", "Create a job" # Create an ETL job def create job_definition = build_job_definition(options) job_definition.validate! - path = File.join('chronicle', 'etl', 'jobs', options[:name]) - Chronicle::ETL::Config.write(path, job_definition.definition) + Chronicle::ETL::Config.write("jobs", options[:name], job_definition.definition) rescue Chronicle::ETL::JobDefinitionError => e cli_fail(message: "Job definition error", exception: e) end desc "show", "Show details about a job" @@ -86,11 +89,11 @@ # List available ETL jobs def list jobs = Chronicle::ETL::Config.available_jobs job_details = jobs.map do |job| - r = Chronicle::ETL::Config.load("chronicle/etl/jobs/#{job}.yml") + r = Chronicle::ETL::Config.load("jobs", job) extractor = r[:extractor][:name] if r[:extractor] transformer = r[:transformer][:name] if r[:transformer] loader = r[:loader][:name] if r[:loader] @@ -107,10 +110,13 @@ end private def run_job(job_definition) + # FIXME: clumsy to make CLI responsible for setting secrets here. Think about a better way to do this + job_definition.apply_default_secrets + job = Chronicle::ETL::Job.new(job_definition) runner = Chronicle::ETL::Runner.new(job) runner.run! end @@ -134,24 +140,25 @@ definition.add_config(process_flag_options(options).transform_keys(&:to_sym)) definition end def load_job_config name - Chronicle::ETL::Config.load_job_from_config(name) + Chronicle::ETL::Config.read_job(name) end # Takes flag options and turns them into a runner config + # TODO: this needs a lot of refactoring def process_flag_options options - extractor_options = options[:'extractor-opts'].merge({ + extractor_options = options[:'extractor-opts'].transform_keys(&:to_sym).merge({ input: (options[:input] if options[:input].any?), since: options[:since], until: options[:until], - limit: options[:limit], + limit: options[:limit] }.compact) - transformer_options = options[:'transformer-opts'] + transformer_options = options[:'transformer-opts'].transform_keys(&:to_sym) - loader_options = options[:'loader-opts'].merge({ + loader_options = options[:'loader-opts'].transform_keys(&:to_sym).merge({ output: options[:output], header_row: options[:header_row], fields: options[:fields] }.compact)