Sha256: 8713a186f26aca71ddc20c71fe03b480b12f5153b69418f88b9fb0553d8b81a6
Contents?: true
Size: 1.39 KB
Versions: 2
Compression:
Stored size: 1.39 KB
Contents
class QueryExecution @queue = :query_exec NUM_SAMPLE_ROWS = 100 def self.perform(result_id, role) result = Result.find(result_id) csv_service = CsvService.new(result_id) unless Role.configured_connections.include?(role) raise "Role '#{role}' does not have connection credentials configured." end body = result.compiled_body result.mark_running! sample_callback = ->(sample) { result.mark_processing_from_sample(sample) } connection = RedshiftConnectionPool.instance.get(role) connection.reconnect_on_failure do query_stream = PgStream::Stream.new(connection.pg_connection, body) result.headers = query_stream.headers result.save! rrrc = result.redis_result_row_count stream_processor = PgStream::Processor.new(query_stream) stream_processor.register(ResultCsvGenerator.new(result_id, result.headers).callbacks) stream_processor.register(SampleSkimmer.new(NUM_SAMPLE_ROWS, &sample_callback).callbacks) stream_processor.register(CountPublisher.new(rrrc).callbacks) row_count = stream_processor.execute result.mark_complete_with_count(row_count) end rescue *RedshiftPG::USER_ERROR_CLASSES => e csv_service.clear_tmp_file result.mark_failed!(e.message) rescue => e if result && csv_service csv_service.clear_tmp_file result.mark_failed!(e.message) end raise end end
Version data entries
2 entries across 2 versions & 1 rubygems
Version | Path |
---|---|
aleph_analytics-0.3.0 | app/models/query_execution.rb |
aleph_analytics-0.2.0 | app/models/query_execution.rb |