require 'thread/pool' class Quandl::Command::Tasks::Download < Quandl::Command::Tasks::Base syntax "quandl download (SOURCE_CODE/)CODE" description "Download a dataset using its quandl code." options({ String => { order: "Return rows in either ASC or DESC order", transform: "Transform data using one of: #{Quandl::Operation::Transform.valid_transformations.join(', ')}", collapse: "Collapse data to one of: #{Quandl::Operation::Collapse.valid_collapses.join(', ')}", trim_start: "Exclude rows that are older than trim_start", trim_end: "Exclude rows that are newer than trim_end", }, Integer => { threads: "How many workers to use during download.", limit: "Limit the number of rows returned", column: "Pluck a specific column by index", row: "Pluck a specific row by index", offset: "Offset the start of the rows", } }) warn_unauthenticated_users validates :trim_start, :trim_end, allow_nil: true, format: { with: Quandl::Pattern.dataset_date, message: "is invalid. Expected format: #{Quandl::Pattern.dataset_date.to_example}" } validates :order, allow_nil: true, inclusion: { in: ['asc', 'desc'], message: "must be one of: asc, desc" } validates :collapse, allow_nil: true, inclusion: { in: Quandl::Operation::Collapse.valid_collapses.collect(&:to_s), message: "is not included in the list: #{Quandl::Operation::Collapse.valid_collapses.join(", ")}" } validates :transform, allow_nil: true, inclusion: { in: Quandl::Operation::Transform.valid_transformations.collect(&:to_s), message: "is not included in the list: #{Quandl::Operation::Transform.valid_transformations.join(", ")}" } def execute # download using arguments when present return download_each_argument if args.first.present? # otherwise download using stdin download_each_stdin end def download_each_argument args.each do |code| pool.process{ download(code) } end pool.shutdown end def download_each_stdin $stdin.each_line do |code| pool.process{ download(code.strip.rstrip) } end pool.shutdown end def download(code) timer = Time.now # find dataset dataset = Quandl::Client::Dataset.find( code ) # fail fast return error(table(Quandl::Client::HTTP_STATUS_CODES[404], code)) if dataset.nil? # set data operations dataset.data.assign_attributes(data_params) unless dataset.blank? # send request & check for errors. if !dataset.exists? && !dataset.valid? # raise detailed error return error( table( dataset.human_status, code, dataset.elapsed_request_time_ms ) ) end # generate qdf elapsed = timer.elapsed_ms qdf = dataset.to_qdf # write to STDOUT mutex.synchronize{ debug("# #{dataset.try(:full_url)} downloaded in #{elapsed}") info(qdf) } true end def trim_start options.trim_start end def trim_end options.trim_end end def order options.order end def collapse options.collapse end def transform options.transform end def data_params params = {} self.class.options.each do |class_type, opts| opts.each do |name, desc| if options.is_a?(OpenStruct) params[name] = self.options.send(name) else params[name] = self.options[name] if self.options[name].present? end end end params end end