module Nucleus module Adapters module V1 class CloudFoundryV2 < Stub module Logs LOGGREGATOR_TYPES = [Enums::ApplicationLogfileType::API, Enums::ApplicationLogfileType::APPLICATION, Enums::ApplicationLogfileType::REQUEST, Enums::ApplicationLogfileType::SYSTEM].freeze # Carriage return (newline in Mac OS) + line feed (newline in Unix) == CRLF (newline in Windows) CRLF = "\r\n".freeze WSP = ' '.freeze # @see Stub#logs def logs(application_name_or_id) app_guid = app_guid(application_name_or_id) # retrieve app for timestamps only :/ app_created = get("/v2/apps/#{app_guid}").body[:metadata][:created_at] logs = [] begin log_files_list = download_file(app_guid, 'logs') # parse raw response to array log_files_list.split(CRLF).each do |logfile_line| filename = logfile_line.rpartition(' ').first.strip if filename == 'staging_task.log' filename = 'build' log_type = Enums::ApplicationLogfileType::BUILD else log_type = Enums::ApplicationLogfileType::OTHER end # TODO: right now, we always assume the log has recently been updated logs.push(id: filename, name: filename, type: log_type, created_at: app_created, updated_at: end rescue Errors::AdapterError log.debug('no logs directory found for cf application') end # add the default logtypes, available according to: # LOGGREGATOR_TYPES.each do |type| logs.push(id: type, name: type, type: type, created_at: app_created, updated_at: end # TODO: 'all' is probably not perfect, since the build log wont be included logs.push(id: 'all', name: 'all', type: Enums::ApplicationLogfileType::OTHER, created_at: app_created, updated_at: logs end # @see Stub#log? def log?(application_name_or_id, log_id) app_guid = app_guid(application_name_or_id) # test file existence log_id = 'staging_task.log' if log_id.to_sym == Enums::ApplicationLogfileType::BUILD # checks also if application is even valid response = get("/v2/apps/#{app_guid}/instances/0/files/logs/#{log_id}", follow_redirects: false, expects: [200, 302, 400]) return true if response == 200 || log_stream?(log_id) return false if response == 400 # if 302 (only remaining option), followup... # download log file download_file(app_guid, "logs/#{log_id}") # no error, file exists true rescue Errors::AdapterResourceNotFoundError, Errors::UnknownAdapterCallError, Excon::Errors::NotFound, Excon::Errors::BadRequest false end # @see Stub#tail def tail(application_name_or_id, log_id, stream) app_guid = app_guid(application_name_or_id) return tail_stream(app_guid, log_id, stream) if log_stream?(log_id) tail_file(app_guid, log_id, stream) end # @see Stub#log_entries def log_entries(application_name_or_id, log_id) app_guid = app_guid(application_name_or_id) # first check if this log is a file or must be fetched from the loggregator if log_stream?(log_id) # fetch recent data from loggregator and return an array of log entries recent_decoded = recent_log_messages(app_guid, loggregator_filter(log_id)) recent_decoded.collect { |log_msg| construct_log_entry(log_msg) } elsif log_id.to_sym == Enums::ApplicationLogfileType::BUILD # handle special staging log build_log_entries(app_guid) else download_logfile_entries(app_guid, log_id) end end private def build_log_entries(app_guid) log_id = 'staging_task.log' download_logfile_entries(app_guid, log_id) rescue Errors::AdapterResourceNotFoundError # if there was no build yet, return no entries instead of the 404 error [] end def loggregator_filter(log_id) case log_id.to_sym when Enums::ApplicationLogfileType::API filter = ['API'] when Enums::ApplicationLogfileType::APPLICATION filter = ['APP'] when Enums::ApplicationLogfileType::REQUEST filter = ['RTR'] when Enums::ApplicationLogfileType::SYSTEM filter = %w(STG LGR DEA) when :all # no filter, show all filter = nil else # invalid log requests --> 404 raise Errors::AdapterResourceNotFoundError, "Invalid log file '#{log_id}', not available for application '#{app_guid}'" end filter end def construct_log_entry(decoded_message) # 2015-03-22T15:28:55.83+0100 [RTR/0] OUT message... "#{ / 1_000_000_000.0).iso8601} "\ "[#{decoded_message.source_name}/#{decoded_message.source_id}] "\ "#{decoded_message.message_type == 1 ? 'OUT' : 'ERR'} #{decoded_message.message}" end def download_logfile_entries(app_guid, log_id, headers_to_use = nil) # download log file logfile_contents = download_file(app_guid, "logs/#{log_id}", headers_to_use) # split file into entries by line breaks and return an array of log entries logfile_contents.split("\n") end def download_file(app_guid, file_path, headers_to_use = nil) expected_statuses = [200, 302, 400, 404] # Hack, do not create fresh headers (which would fail) when in a deferred action headers_to_use = headers unless headers_to_use # log list consists of 2 parts, loggregator and files log_files = get("/v2/apps/#{app_guid}/instances/0/files/#{file_path}", follow_redirects: false, expects: expected_statuses, headers: headers_to_use) if log_files.status == 400 || log_files.status == 404 raise Errors::AdapterResourceNotFoundError, "Invalid log file: '#{file_path}' not available for application '#{app_guid}'" end return log_files.body if log_files.status == 200 # status must be 302, follow to the Location download_location = log_files.headers[:Location] # if IBM f*cked with the download URL, fix the address download_location.gsub!(/, '') Excon.defaults[:ssl_verify_peer] = false unless @check_certificates connection_params = { ssl_verify_peer: @check_certificates } connection =, connection_params) downloaded_logfile_response = connection.request(method: :get, expects: expected_statuses) if downloaded_logfile_response.status == 404 raise Errors::AdapterResourceNotFoundError, "Invalid log file: '#{file_path}' not available for application '#{app_guid}'" end downloaded_logfile_response.body end def recent_log_messages(app_guid, filter = nil) loggregator_recent_uri = "https://#{loggregator_endpoint}:443/recent?app=#{app_guid}" # current log state before tailing, multipart message of protobuf objects current_log_response = get(loggregator_recent_uri) current_log_boundary = /boundary=(\w+)/.match(current_log_response.headers['Content-Type'])[1] current_log = current_log_response.body boundary_regexp = /--#{Regexp.quote(current_log_boundary)}(--)?#{CRLF}/ parts = current_log.split(boundary_regexp).collect do |chunk| header_part = chunk.split(/#{CRLF}#{WSP}*#{CRLF}/m, 2)[0] if header_part headers = header_part.split(/\r\n/).map { |kv| kv } headers.length > 1 ? headers[1] : nil end end.compact # decode log messages decoded_messages = parts.collect do |proto_message| Message.decode(proto_message) end.compact return decoded_messages unless filter # return filtered messages decoded_messages.find_all do |msg| filter.include?(msg.source_name) end end def log_stream?(log_id) LOGGREGATOR_TYPES.include?(log_id.to_sym) || log_id.to_sym == :all end def loggregator_endpoint @endpoint_url.gsub(%r{^(\w*://)?(api)([-\.\w]+)$}i, 'loggregator\3') end def tail_file(app_guid, log_id, stream) log.debug 'Tailing CF log file' log_id = 'staging_task.log' if log_id.to_sym == Enums::ApplicationLogfileType::BUILD # cache headers as they are bound to a request and could be lost with the next tick headers_to_use = headers latest_pushed_line = -1 # update every 3 seconds @tail_file_timer = EM.add_periodic_timer(3) do log.debug('Poll updated file tail...') begin latest_pushed_line = push_file_tail(app_guid, log_id, stream, latest_pushed_line, headers_to_use) rescue Errors::AdapterResourceNotFoundError log.debug('Logfile not found, finished tailing') # file lost, close stream @tail_file_timer.cancel if @tail_file_timer stream.close end end # listener to stop polling, :cancel) end def push_file_tail(app_guid, log_id, stream, pushed_line_idx, headers_to_use) log.debug('Fetching file for tail response...') entries = download_logfile_entries(app_guid, log_id, headers_to_use) # file was shortened, close stream since we do not know where to continue if entries.length < pushed_line_idx log.debug('File was modified and shortened, stop tailing the file...') stream.close else entries.each_with_index do |entry, index| next if index <= pushed_line_idx pushed_line_idx = index stream.send_message(entry) end pushed_line_idx end end def tail_stream(app_guid, log_id, stream) filter = loggregator_filter(log_id) # push current state recent_log_messages(app_guid, filter).each { |entry| stream.send_message(construct_log_entry(entry)) } # Now register websocket to receive the latest updates ws ="wss://#{loggregator_endpoint}:443/tail/?app=#{app_guid}", nil, headers: headers.slice('Authorization')) ws.on :message do |event| log.debug "CF loggregator message received: #{event}" begin msg = Message.decode('C*')) # notify stream to print new log line if msg type matches the applied filter stream.send_message(construct_log_entry(msg)) if filter.nil? || filter.include?(msg.source_name) rescue StandardError => e log.error "Cloud Foundry log message de-serialization failed: #{e}" end end ws.on :close do |event| log.debug "Closing CF loggregator websocket: code=#{event.code}, reason=#{event.reason}" ws = nil # notify stream that no more update are to arrive and stream shall be closed stream.close end # return listener to stop websocket, :close) end # Message class definition, matching the Protocol Buffer definition of the Cloud Foundry loggregator. # see also: class Message < ::Protobuf::Message class MessageType < ::Protobuf::Enum define :OUT, 1 define :ERR, 2 end required :bytes, :message, 1 required Logs::Message::MessageType, :message_type, 2 required :sint64, :timestamp, 3 required :string, :app_id, 4 optional :string, :source_id, 6 repeated :string, :drain_urls, 7 optional :string, :source_name, 8 end class Envelope < ::Protobuf::Message required :string, :routing_key, 1 required :bytes, :signature, 2 required Logs::Message, :log_message, 3 end end end end end end