# # Copyright (c) 2010-2017 GoodData Corporation. All rights reserved. # This source code is licensed under the BSD-style license found in the # LICENSE file in the root directory of this source tree. require 'zip' require 'uri' require_relative '../helpers/global_helpers' require_relative '../rest/resource' require_relative 'execution_detail' require_relative 'schedule' APP_STORE_URL ||= 'https://github.com/gooddata/app_store' module GoodData class Process < Rest::Resource attr_reader :data alias_method :raw_data, :data alias_method :json, :data alias_method :to_hash, :data class << self def [](id, options = { :client => GoodData.connection }) project = options[:project] client = options[:client] || (project && project.client) fail 'Client has to be specified in options' unless client if id == :all && project uri = "/gdc/projects/#{project.pid}/dataload/processes" data = client.get(uri) data['processes']['items'].map do |process_data| client.create(Process, process_data, project: project) end elsif id == :all uri = "/gdc/account/profile/#{client.user.obj_id}/dataload/processes" data = client.get(uri) pids = data['processes']['items'].map { |process_data| process_data['process']['links']['self'].match(%r{/gdc/projects/(\w*)/})[1] }.uniq projects_lookup = pids.pmap { |pid| client.projects(pid) }.reduce({}) do |a, e| a[e.pid] = e a end data['processes']['items'].map do |process_data| pid = process_data['process']['links']['self'].match(%r{/gdc/projects/(\w*)/})[1] client.create(Process, process_data, project: projects_lookup[pid]) end else uri = "/gdc/projects/#{project.pid}/dataload/processes/#{id}" client.create(Process, client.get(uri), project: project) end end def all Process[:all] end def with_deploy(dir, options = {}, &block) _client, project = GoodData.get_client_and_project(options) GoodData.with_project(project) do params = options[:params].nil? ? [] : [options[:params]] if block begin res = GoodData::Process.deploy(dir, options.merge(:files_to_exclude => params)) block.call(res) rescue StandardError => e GoodData.logger.error(e.inspect) ensure res.delete if res end else GoodData::Process.deploy(dir, options.merge(:files_to_exclude => params)) end end end def upload_package(path, files_to_exclude, opts = { :client => GoodData.connection }) GoodData.get_client_and_project(opts) zip_and_upload(path, files_to_exclude, opts) end # Deploy a new process or redeploy existing one. # # @param path [String] Path to ZIP archive or to a directory containing files that should be ZIPed # @option options [String] :files_to_exclude # @option options [String] :type ('GRAPH') Type of process - GRAPH or RUBY # @option options [String] :name Readable name of the process # @option options [String] :process_id ID of a process to be redeployed (do not set if you want to create a new process) # @option options [Boolean] :verbose (false) Switch on verbose mode for detailed logging def deploy(path, options = { client: GoodData.client, project: GoodData.project }) if path.is_a?(Hash) && path[:component] deploy_component path, options elsif path.to_s.start_with?(APP_STORE_URL) deploy_brick path, options elsif path.to_s =~ %r{\${.*}:(.*)\/(.*):\/} deploy_from_appstore path.to_s, options else deploy_simple_process path, options end end def deploy_simple_process(path, options = { client: GoodData.client, project: GoodData.project }) client, project = GoodData.get_client_and_project(options) fail 'Path is not specified' unless path path = Pathname(path) || fail('Path is not a valid pathname') files_to_exclude = options[:files_to_exclude].nil? ? [] : options[:files_to_exclude].map { |pname| Pathname(pname) } type = options[:type] || 'GRAPH' deploy_name = options[:name] || "Process of #{path} script" fail ArgumentError, 'options[:name] can not be nil or empty!' if deploy_name.nil? || deploy_name.empty? verbose = options[:verbose] || false GoodData.logger.info("Deploying #{path}") if verbose deployed_path = Process.upload_package(path, files_to_exclude, client: client, project: project) data_sources = options[:data_sources] || [] data = { :process => { :name => deploy_name, :path => "/uploads/#{File.basename(deployed_path)}", :type => type, :dataSources => data_sources } } save(data, options) end def deploy_brick(path, options = { :client => GoodData.client, :project => GoodData.project }) client, project = GoodData.get_client_and_project(options) brick_uri_parts = URI(path).path.split('/') ref = brick_uri_parts[4] brick_name = brick_uri_parts.last brick_path = brick_uri_parts[5..-1].join('/') Dir.mktmpdir do |dir| Dir.chdir(dir) do `git clone #{APP_STORE_URL}` end Dir.chdir(File.join(dir, 'app_store')) do if ref `git checkout #{ref}` fail 'Wrong branch or tag specified!' if $CHILD_STATUS.to_i.nonzero? end opts = { :client => client, :project => project, :name => brick_name, :type => 'RUBY' } full_brick_path = File.join(dir, 'app_store', brick_path) fail "Invalid brick name specified - '#{brick_name}'" unless File.exist?(full_brick_path) return deploy(full_brick_path, opts) end end end def deploy_from_appstore(path, options = { :client => GoodData.client, :project => GoodData.project }) deploy_name = options[:name] || "Process of #{path}" verbose = options[:verbose] || false GoodData.logger.info("Deploying #{path}") if verbose data_sources = options[:data_sources] || [] data = { process: { name: deploy_name, path: path, dataSources: data_sources, type: 'RUBY' } } save(data, options) end def deploy_component(data, options = { client: GoodData.client, project: GoodData.project }) client, project = GoodData.get_client_and_project(options) data = { process: data } unless data[:process] data[:process] = GoodData::Helpers.symbolize_keys(data[:process]).select { |k| %i[type name component dataSources].include? k } data[:process][:component] = GoodData::Helpers.symbolize_keys(data[:process][:component]).select { |k| %i[name version configLocation config].include? k } save(data, options) end private def save(data, options = { client: GoodData.client, project: GoodData.project }) client, project = GoodData.get_client_and_project(options) process_id = options[:process_id] res = if process_id.nil? client.post("/gdc/projects/#{project.pid}/dataload/processes", data) else client.put("/gdc/projects/#{project.pid}/dataload/processes/#{process_id}", data) end res = JSON.parse(client.poll_on_code(res['asyncTask']['links']['poll'], options.merge(process: false))) if res.keys.first == 'asyncTask' client.create(Process, res, project: project) end def with_zip(opts = {}) client = opts[:client] temp = Tempfile.new(['deploy-graph-archive', '.zip']) zip_filename = temp.path temp.close! Zip::File.open(zip_filename, Zip::File::CREATE) do |zipfile| yield zipfile end client.upload_to_user_webdav(zip_filename, opts) zip_filename end def zip_and_upload(path, files_to_exclude, opts = {}) client = opts[:client] GoodData.logger.info('Creating package for upload') if !path.directory? && (path.extname == '.grf' || path.extname == '.rb') with_zip(opts) do |zipfile| zipfile.add(File.basename(path), path) end elsif !path.directory? # this branch expects a zipped file. Since the filename on webdav is by default # equal to the filename of a local file. I happened often that the name clashed # if ran in parallel. Create a randomized name to mitigate that randomized_filename = (0...16).map { (rand(65..90)).chr }.join client.upload_to_user_webdav(path, { filename: randomized_filename }.merge(opts)) randomized_filename else with_zip(opts) do |zipfile| files_to_upload = Dir[File.join(path, '**', '**')].reject { |f| files_to_exclude.include?(Pathname(path) + f) } GoodData.logger.info("Uploading #{files_to_upload.count} files.") files_to_upload.each do |file| file_pathname = Pathname.new(file) file_relative_pathname = file_pathname.relative_path_from(Pathname.new(path)) zipfile.add(file_relative_pathname, file) end end end end # ----------------------------- end def initialize(data) @data = data end def delete client.delete(uri) end # Redeploy existing process. # # @param path [String] Path to ZIP archive or to a directory containing files that should be ZIPed # @option options [String] :files_to_exclude # @option options [String] :process_id ('nobody') From address # @option options [String] :type ('GRAPH') Type of process - GRAPH or RUBY # @option options [String] :name Readable name of the process # @option options [Boolean] :verbose (false) Switch on verbose mode for detailed logging def deploy(path, options = {}) Process.deploy(path, { client: client, process_id: process_id, :project => project, :name => name, :type => type, :data_sources => data_sources }.merge(options)) end # Downloads the process from S3 in a zipped form. # # @return [IO] The stream of data that represents a zipped deployed process. def download link = links['source'] client.connection.refresh_token client.get(link, process: false) { |_, _, result| RestClient.get(result.to_hash['location'].first) } end def process data['process'] end def name process['name'] end def type process['type'].downcase.to_sym end def links process['links'] end def link links['self'] end alias_method :uri, :link def obj_id uri.split('/').last end alias_method :process_id, :obj_id def executions_link links['executions'] end def graphs process['graphs'] end def executables process['executables'] end def path process['path'] end def component process['component'] end def data_sources process['dataSources'] end # Determines whether the process is an ADDv2 component. # @return [Bool] True if the process is an ADDv2 component. def add_v2_component? process['component'] && process['component']['name'] == 'gdc-data-distribution' end def schedules project.schedules.select { |schedule| schedule.process_id == obj_id } end def create_manual_schedule(options = {}) create_schedule(nil, nil, options) end def create_schedule(cron, executable, options = {}) project.create_schedule(process_id, cron, executable, options.merge(client: client, project: project)) end def execute(executable, options = {}) result = start_execution(executable, options) begin client.poll_on_code(result['executionTask']['links']['poll'], options) rescue RestClient::RequestFailed => e raise(e) ensure result = client.get(result['executionTask']['links']['detail']) fail "Runing process failed. You can look at a log here #{result['executionDetail']['logFileName']}" if result['executionDetail']['status'] == 'ERROR' end client.create(GoodData::ExecutionDetail, result, client: client, project: project) end def start_execution(executable, options = {}) params = options[:params] || {} hidden_params = options[:hidden_params] || {} client.post(executions_link, :execution => { :graph => executable.to_s, :params => GoodData::Helpers.encode_public_params(params), :hiddenParams => GoodData::Helpers.encode_hidden_params(hidden_params) }) end def notification_rules NotificationRule.all(project: project, process: self, client: client) end def create_notification_rule(opts = {}) NotificationRule.create(opts.merge(project: project, process: self, client: client)) end end end