Sha256: d86335f9fdeacdfa9ccc91dd70ae2c296290e3e58ff019582d150d9cd6f3bd16

Contents?: true

Size: 1.25 KB

Versions: 8

Compression:

Stored size: 1.25 KB

Contents

class QueryExecution
  @queue = :query_exec
  NUM_SAMPLE_ROWS = 100

  def self.perform(result_id, role)
    result = Result.find(result_id)

    unless Role.configured_connections.include?(role)
      raise "Role '#{role}' does not have connection credentials configured."
    end

    body = result.compiled_body
    result.mark_running!
    sample_callback = ->(sample) { result.mark_processing_from_sample(sample) }

    connection = RedshiftConnectionPool.instance.get(role)

    connection.reconnect_on_failure do
      query_stream = PgStream::Stream.new(connection.pg_connection, body)
      result.headers = query_stream.headers
      result.save!

      rrrc = result.redis_result_row_count

      stream_processor = PgStream::Processor.new(query_stream)
      stream_processor.register(ResultCsvGenerator.new(result_id, result.headers).callbacks)
      stream_processor.register(SampleSkimmer.new(NUM_SAMPLE_ROWS, &sample_callback).callbacks)
      stream_processor.register(CountPublisher.new(rrrc).callbacks)

      row_count = stream_processor.execute
      result.mark_complete_with_count(row_count)
    end
  rescue *RedshiftPG::USER_ERROR_CLASSES => e
    result.mark_failed!(e.message)
  rescue => e
    result.mark_failed!(e.message) if result
    raise
  end
end

Version data entries

8 entries across 8 versions & 1 rubygems

Version Path
aleph_analytics-0.1.0 app/models/query_execution.rb
aleph_analytics-0.0.6 app/models/query_execution.rb
aleph_analytics-0.0.5 app/models/query_execution.rb
aleph_analytics-0.0.4 app/models/query_execution.rb
aleph_analytics-0.0.3 app/models/query_execution.rb
aleph_analytics-0.0.2 app/models/query_execution.rb
aleph_analytics-0.0.1.alpha app/models/query_execution.rb
aleph_analytics-0.0.0.alpha app/models/query_execution.rb