module Blazer class QueriesController < BaseController before_action :set_query, only: [:show, :edit, :update, :destroy, :refresh] before_action :set_data_source, only: [:tables, :docs, :schema, :cancel] def home set_queries(1000) if params[:filter] @dashboards = [] # TODO show my dashboards else @dashboards = Blazer::Dashboard.order(:name) @dashboards = @dashboards.includes(:creator) if Blazer.user_class end @dashboards = @dashboards.map do |d| { id: d.id, name: d.name, creator: blazer_user && d.try(:creator) == blazer_user ? "You" : d.try(:creator).try(Blazer.user_name), to_param: d.to_param, dashboard: true } end end def index set_queries render json: @queries end def new @query = Blazer::Query.new( data_source: params[:data_source], name: params[:name] ) if params[:fork_query_id] @query.statement ||= Blazer::Query.find(params[:fork_query_id]).try(:statement) end if params[:upload_id] upload = Blazer::Upload.find(params[:upload_id]) upload_settings = Blazer.settings["uploads"] @query.data_source ||= upload_settings["data_source"] @query.statement ||= "SELECT * FROM #{upload.table_name} LIMIT 10" end end def create @query = Blazer::Query.new(query_params) @query.creator = blazer_user if @query.respond_to?(:creator) @query.status = "active" if @query.respond_to?(:status) if @query.save redirect_to query_path(@query, variable_params(@query)) else render_errors @query end end def show @statement = @query.statement.dup process_vars(@statement, @query.data_source) @smart_vars = {} @sql_errors = [] data_source = Blazer.data_sources[@query.data_source] @bind_vars.each do |var| smart_var, error = parse_smart_variables(var, data_source) @smart_vars[var] = smart_var if smart_var @sql_errors << error if error end @query.update!(status: "active") if @query.respond_to?(:status) && @query.status.in?(["archived", nil]) Blazer.transform_statement.call(data_source, @statement) if Blazer.transform_statement add_cohort_analysis_vars if @query.cohort_analysis? end def edit end def run @statement = params[:statement] # before process_vars @cohort_analysis = Query.new(statement: @statement).cohort_analysis? data_source = params[:data_source] process_vars(@statement, data_source) @only_chart = params[:only_chart] @run_id = blazer_params[:run_id] @query = Query.find_by(id: params[:query_id]) if params[:query_id] data_source = @query.data_source if @query && @query.data_source @data_source = Blazer.data_sources[data_source] run_cohort_analysis if @cohort_analysis # ensure viewable if !(@query || Query.new(data_source: @data_source.id)).viewable?(blazer_user) render_forbidden elsif @run_id @timestamp = blazer_params[:timestamp].to_i @result = @data_source.run_results(@run_id) @success = !@result.nil? if @success @data_source.delete_results(@run_id) @columns = @result.columns @rows = @result.rows @error = @result.error @just_cached = !@result.error && @result.cached_at.present? @cached_at = nil params[:data_source] = nil render_run elsif Time.now > Time.at(@timestamp + (@data_source.timeout || 600).to_i + 5) # query lost Rails.logger.info "[blazer lost query] #{@run_id}" @error = "We lost your query :(" @rows = [] @columns = [] render_run else continue_run end elsif @success @run_id = blazer_run_id options = {user: blazer_user, query: @query, refresh_cache: params[:check], run_id: @run_id, async: Blazer.async} if Blazer.async && request.format.symbol != :csv Blazer::RunStatementJob.perform_later(@data_source.id, @statement, options) wait_start = Time.now loop do sleep(0.1) @result = @data_source.run_results(@run_id) break if @result || Time.now - wait_start > 3 end else @result = Blazer::RunStatement.new.perform(@data_source, @statement, options) end if @result @data_source.delete_results(@run_id) if @run_id @columns = @result.columns @rows = @result.rows @error = @result.error @cached_at = @result.cached_at @just_cached = @result.just_cached @forecast = @query && @result.forecastable? && params[:forecast] if @forecast @result.forecast @forecast_error = @result.forecast_error @forecast = @forecast_error.nil? end render_run else @timestamp = Time.now.to_i continue_run end else render layout: false end end def refresh data_source = Blazer.data_sources[@query.data_source] @statement = @query.statement.dup process_vars(@statement, @query.data_source) Blazer.transform_statement.call(data_source, @statement) if Blazer.transform_statement @statement = cohort_analysis_statement(data_source, @statement) if @query.cohort_analysis? data_source.clear_cache(@statement) redirect_to query_path(@query, variable_params(@query)) end def update if params[:commit] == "Fork" @query = Blazer::Query.new @query.creator = blazer_user if @query.respond_to?(:creator) end @query.status = "active" if @query.respond_to?(:status) unless @query.editable?(blazer_user) @query.errors.add(:base, "Sorry, permission denied") end if @query.errors.empty? && @query.update(query_params) redirect_to query_path(@query, variable_params(@query)) else render_errors @query end end def destroy @query.destroy if @query.editable?(blazer_user) redirect_to root_path end def tables render json: @data_source.tables end def docs @smart_variables = @data_source.smart_variables @linked_columns = @data_source.linked_columns @smart_columns = @data_source.smart_columns end def schema @schema = @data_source.schema end def cancel @data_source.cancel(blazer_run_id) head :ok end private def set_data_source @data_source = Blazer.data_sources[params[:data_source]] unless Query.new(data_source: @data_source.id).editable?(blazer_user) render_forbidden end end def continue_run render json: {run_id: @run_id, timestamp: @timestamp}, status: :accepted end def render_run @checks = @query ? @query.checks.order(:id) : [] @first_row = @rows.first || [] @column_types = [] if @rows.any? @columns.each_with_index do |_, i| @column_types << ( case @first_row[i] when Integer "int" when Float, BigDecimal "float" else "string-ins" end ) end end @min_width_types = @columns.each_with_index.select { |c, i| @first_row[i].is_a?(Time) || @first_row[i].is_a?(String) || @data_source.smart_columns[c] }.map(&:last) @boom = @result.boom if @result @linked_columns = @data_source.linked_columns @markers = [] [["latitude", "longitude"], ["lat", "lon"], ["lat", "lng"]].each do |keys| lat_index = @columns.index(keys.first) lon_index = @columns.index(keys.last) if lat_index && lon_index @markers = @rows.select do |r| r[lat_index] && r[lon_index] end.map do |r| { # Mapbox.js does sanitization with https://github.com/mapbox/sanitize-caja # but we should do it here as well title: r.each_with_index.map { |v, i| i == lat_index || i == lon_index ? nil : "#{ERB::Util.html_escape(@columns[i])}: #{ERB::Util.html_escape(v)}" }.compact.join("
").truncate(140), latitude: r[lat_index], longitude: r[lon_index] } end end end render_cohort_analysis if @cohort_analysis && !@error respond_to do |format| format.html do render layout: false end format.csv do send_data csv_data(@columns, @rows, @data_source), type: "text/csv; charset=utf-8; header=present", disposition: "attachment; filename=\"#{@query.try(:name).try(:parameterize).presence || 'query'}.csv\"" end end end def set_queries(limit = nil) @queries = Blazer::Query.named.select(:id, :name, :creator_id, :statement) @queries = @queries.includes(:creator) if Blazer.user_class if blazer_user && params[:filter] == "mine" @queries = @queries.where(creator_id: blazer_user.id).reorder(updated_at: :desc) elsif blazer_user && params[:filter] == "viewed" && Blazer.audit @queries = queries_by_ids(Blazer::Audit.where(user_id: blazer_user.id).order(created_at: :desc).limit(500).pluck(:query_id).uniq) else @queries = @queries.limit(limit) if limit @queries = @queries.active.order(:name) end @queries = @queries.to_a @more = limit && @queries.size >= limit @queries = @queries.select { |q| !q.name.to_s.start_with?("#") || q.try(:creator).try(:id) == blazer_user.try(:id) } @queries = @queries.map do |q| { id: q.id, name: q.name, creator: blazer_user && q.try(:creator) == blazer_user ? "You" : q.try(:creator).try(Blazer.user_name), vars: q.variables.join(", "), to_param: q.to_param } end end def queries_by_ids(favorite_query_ids) queries = Blazer::Query.active.named.where(id: favorite_query_ids) queries = queries.includes(:creator) if Blazer.user_class queries = queries.index_by(&:id) favorite_query_ids.map { |query_id| queries[query_id] }.compact end def set_query @query = Blazer::Query.find(params[:id].to_s.split("-").first) unless @query.viewable?(blazer_user) render_forbidden end end def render_forbidden render plain: "Access denied", status: :forbidden end def query_params params.require(:query).permit(:name, :description, :statement, :data_source) end def blazer_params params[:blazer] || {} end def csv_data(columns, rows, data_source) CSV.generate do |csv| csv << columns rows.each do |row| csv << row.each_with_index.map { |v, i| v.is_a?(Time) ? blazer_time_value(data_source, columns[i], v) : v } end end end def blazer_time_value(data_source, k, v) data_source.local_time_suffix.any? { |s| k.ends_with?(s) } ? v.to_s.sub(" UTC", "") : v.in_time_zone(Blazer.time_zone) end helper_method :blazer_time_value def blazer_run_id params[:run_id].to_s.gsub(/[^a-z0-9\-]/i, "") end def run_cohort_analysis unless @data_source.supports_cohort_analysis? @cohort_error = "This data source does not support cohort analysis" end @show_cohort_rows = !params[:query_id] || @cohort_error unless @show_cohort_rows @statement = cohort_analysis_statement(@data_source, @statement) end end def cohort_analysis_statement(data_source, statement) @cohort_period = params["cohort_period"] || "week" @cohort_period = "week" unless ["day", "week", "month"].include?(@cohort_period) # for now @conversion_period = @cohort_period @cohort_days = case @cohort_period when "day" 1 when "week" 7 when "month" 30 end data_source.cohort_analysis_statement(statement, period: @cohort_period, days: @cohort_days) end def render_cohort_analysis if @show_cohort_rows @cohort_analysis = false @row_limit = 1000 # check results unless @cohort_error # check names expected_columns = ["user_id", "conversion_time"] missing_columns = expected_columns - @result.columns if missing_columns.any? @cohort_error = "Expected user_id and conversion_time columns" end # check types (user_id can be any type) unless @cohort_error column_types = @result.columns.zip(@result.column_types).to_h if !column_types["cohort_time"].in?(["time", nil]) @cohort_error = "cohort_time must be time column" elsif !column_types["conversion_time"].in?(["time", nil]) @cohort_error = "conversion_time must be time column" end end end else @today = Blazer.time_zone.today @min_cohort_date, @max_cohort_date = @result.rows.map { |r| r[0] }.minmax @buckets = {} @rows.each do |r| @buckets[[r[0], r[1]]] = r[2] end @cohort_dates = [] current_date = @max_cohort_date while current_date && current_date >= @min_cohort_date @cohort_dates << current_date current_date = case @cohort_period when "day" current_date - 1 when "week" current_date - 7 else current_date.prev_month end end num_cols = @cohort_dates.size @columns = ["Cohort", "Users"] + num_cols.times.map { |i| "#{@conversion_period.titleize} #{i + 1}" } rows = [] date_format = @cohort_period == "month" ? "%b %Y" : "%b %-e, %Y" @cohort_dates.each do |date| row = [date.strftime(date_format), @buckets[[date, 0]] || 0] num_cols.times do |i| if @today >= date + (@cohort_days * i) row << (@buckets[[date, i + 1]] || 0) end end rows << row end @rows = rows end end end end