require 'addressable/uri' module Scrivito class ContentServiceObjQueries def initialize(queries) @queries = queries @open_queries = queries.dup @results = {} end def open_queries @open_queries[0..99] end def handle_response(response) objs = {} response["objs"].each do |obj| objs[obj["_id"].first] = obj end queries_to_delete = [] response["results"].each_with_index do |response, i| query = @open_queries[i] if response["continuation_handle"] query[:continuation_handle] = response["continuation_handle"] else queries_to_delete << i end result = (@results[query.__id__] ||= []) response["refs"].each do |obj_ref| id = obj_ref["id"] # TODO fetch missing ObjData from Service result << (objs[id] or raise "Data for Obj with id #{id} missing!") end end queries_to_delete.reverse_each {|i| @open_queries.delete_at(i) } end def results @queries.map {|query| @results[query.__id__] || [] } end def finished? open_queries.empty? end end class CmsBackend VALID_INDEX_NAMES = %w[id path ppath permalink].freeze class << self def instance @instance ||= new end end attr_accessor :die_content_service def initialize @query_counter = 0 @caching = true end def begin_caching @caching = true end def end_caching CmsDataCache.cache.clear @caching = false end def clear_cache CmsDataCache.cache.clear end def caching? !!@caching end def query_counter @query_counter end # For test purpose only. def reset_query_counter! @query_counter = 0 end def find_workspace_data_by_id(id) if die_content_service begin cached_workspace_state = CmsDataCache.read_workspace_state(id) cached_csid = cached_workspace_state.try(:first) cached_workspace_data_tag = cached_workspace_state.try(:second) changes = CmsRestApi.get("/workspaces/#{id}/changes", from: cached_csid) update_obj_cache(id, cached_csid, changes) workspace_data, workspace_data_tag = update_workspace_cache( id, cached_workspace_data_tag, changes["workspace"]) current_csid = changes["current"] current_workspace_state = [current_csid, workspace_data_tag] if current_workspace_state != cached_workspace_state CmsDataCache.write_workspace_state(id, current_workspace_state) end return WorkspaceData.new(workspace_data.merge( "content_state_id" => current_csid)) rescue Scrivito::ClientError => client_error if client_error.http_code == 404 return nil else raise end end end workspace_data_from_cache = WorkspaceDataFromService.find_from_cache(id) from_content_state_id = workspace_data_from_cache.try(:content_state_id) request_params = {:workspace_id => id} request_params[:content_state_id] = from_content_state_id if from_content_state_id raw_data = if id == 'published' && workspace_data_from_cache begin ContentService.query('workspaces/query', request_params, timeout: 1) rescue CommunicationError => e warn_backend_not_available(id, from_content_state_id, e.message) return workspace_data_from_cache end else ContentService.query('workspaces/query', request_params) end if raw_workspace_data = raw_data['workspace'] workspace_data = WorkspaceDataFromService.new(raw_workspace_data) if from_content_state_id != workspace_data.content_state_id workspace_data.store_in_cache_and_create_content_state end workspace_data end end def find_obj_data_by(revision, index, keys) index = index.to_s if die_content_service obj_datas = if index == "id" obj_datas = Backend::ObjLoad.load(revision, keys) obj_datas.map { |obj_data| obj_data ? [obj_data] : [] } else index_implementation = Backend::Index.by_name(index) Backend::ObjQuery.query(revision, index_implementation, keys) end return obj_datas end assert_valid_index_name(index) raw_data = find_raw_data_from_cache_or_database_by(revision, index, keys) raw_data.map do |raw_result| raw_result.each_with_object([]) do |raw_data, result| result << ObjDataFromService.new(raw_data) end end end def find_blob_data(id, access, verb, transformation_definition) if blob_data = find_blob_data_from_cache(id, access, verb, transformation_definition) blob_data else id = normalize_blob_id(id) blob_datas = request_blob_datas_from_backend(id, transformation_definition) store_blob_datas_in_cache(id, transformation_definition, blob_datas) blob_datas[access][verb] end end def find_blob_data_from_cache(id, access, verb, transformation_definition) cache_key = blob_data_cache_key(normalize_blob_id(id), access, verb, transformation_definition) CmsDataCache.cache.read(cache_key) end def find_blob_metadata(id, url) if blob_metadata = fetch_blob_metadata_from_cache(id) blob_metadata else blob_metadata = request_blob_metadata_from_s3(url) store_blob_metadata_in_cache(id, blob_metadata) blob_metadata end end def search_objs(workspace, params) cache_index = 'search' cache_key = params.to_param if die_content_service cache = Backend::ObjDataCache.view_for_revision(workspace.revision) if hit = cache.read_index(cache_index, cache_key) return hit end result = request_search_result_from_backend(workspace, params) cache.write_index(cache_index, cache_key, result) return result end content_state = workspace.revision.content_state if result = fetch_search_result_from_cache(content_state, cache_index, cache_key) result else request_search_result_from_backend(workspace, params).tap do |result| store_search_result_in_cache(content_state, cache_index, cache_key, result) end end end private def update_workspace_cache(id, cached_data_tag, changed_workspace) if changed_workspace workspace_data = changed_workspace else cached_workspace_data = CmsDataCache.read_data_from_tag(cached_data_tag) if cached_workspace_data workspace_data = cached_workspace_data workspace_data_tag = cached_data_tag else workspace_data = CmsRestApi.get("/workspaces/#{id}") end end workspace_data_tag ||= CmsDataCache.write_data_to_tag(workspace_data) [workspace_data, workspace_data_tag] end def update_obj_cache(workspace_id, cached_csid, changes) objs = changes["objs"] if objs.present? && objs != "*" last_state = Backend::ContentStateNode.find(cached_csid) changes_index = Backend::ObjDataCache.changes_index_from(objs) successor = last_state.create_successor(changes["to"], changes_index) cache = Backend::ObjDataCache.view_for_workspace(workspace_id, successor) changes_index.each do |id, tag| cache.write_obj_tag(id, tag) end end end def fetch_search_result_from_cache(content_state, cache_index, cache_key) content_state.find_obj_data(cache_index, cache_key) if caching? end def request_search_result_from_backend(workspace, params) CmsRestApi.get("workspaces/#{workspace.id}/objs/search", params) end def store_search_result_in_cache(content_state, cache_index, cache_key, result) content_state.save_obj_data(cache_index, cache_key, result) if caching? end def request_blob_datas_from_backend(id, transformation_definition) @query_counter += 1 if transformation_definition CmsRestApi.get("blobs/#{id}/transform", transformation: transformation_definition) else CmsRestApi.get("blobs/#{id}") end end def store_blob_datas_in_cache(id, transformation_definition, blob_datas) %w[public_access private_access].each do |access| %w[get head].each do |verb| blob_data = blob_datas[access][verb] cache_key = blob_data_cache_key(id, access, verb, transformation_definition) CmsDataCache.cache.write(cache_key, blob_data, blob_data['maxage']) end end end def blob_data_cache_key(id, access, verb, transformation_definition) cache_key = "blob_data/#{id}/#{access}/#{verb}" cache_key << "/#{transformation_definition.to_query}" if transformation_definition cache_key end def normalize_blob_id(id) Addressable::URI.normalize_component(id, Addressable::URI::CharacterClasses::UNRESERVED) end def fetch_blob_metadata_from_cache(id) CmsDataCache.cache.read(blob_metadata_cache_key(id)) end def request_blob_metadata_from_s3(url) uri = URI.parse(url) retried = false begin response = ConnectionManager.request(uri, Net::HTTP::Head.new(uri)) @query_counter += 1 rescue NetworkError raise if retried retried = true retry end raise ScrivitoError, "S3 responded with #{response.code}" unless response.code == '200' { content_length: response['content-length'], content_type: response['content-type'], cache_control: response['cache-control'], } end def store_blob_metadata_in_cache(id, blob_metadata) max_age = blob_metadata.delete(:cache_control) =~ /max-age=(.*),/ && $1 max_age = max_age.to_i if max_age CmsDataCache.cache.write(blob_metadata_cache_key(id), blob_metadata, max_age) end def blob_metadata_cache_key(id) "blob_metadata/#{id}" end def find_raw_data_from_cache_or_database_by(revision, index, keys) keys_from_database = [] # load results from cache results_from_cache = keys.map do |key| find_raw_data_from_cache_by(revision, index, key).tap do |objs| keys_from_database << key unless objs end end # load cache misses from database and store them in cache results_from_database = find_raw_data_from_database_by(revision, index, keys_from_database) keys_from_database.each_with_index do |key, key_number| store_raw_data_list_in_cache(revision, index, key, results_from_database[key_number]) end # combine the results results_from_cache.map do |objs_from_cache| objs_from_cache || results_from_database.shift end end def find_raw_data_from_cache_by(revision, index, key) ContentStateCaching.find_obj_data(revision.content_state, index, key) if caching? end def find_raw_data_from_database_by(revision, index, keys) return [] if keys.blank? instrumenter = ActiveSupport::Notifications.instrumenter instrumenter.instrument( "cms_load.scrivito", :name => "Obj Load", :index => index, :keys => keys ) do @query_counter += 1 queries = ContentServiceObjQueries.new(keys.map {|key| {:type => index, :param => key} }) queries.handle_response(request_content_service(queries, revision)) until queries.finished? queries.results end end def request_content_service(queries, revision) ContentService.query('objs/query', content_service_request_params(queries, revision)) end def content_service_request_params(queries, revision) params = { queries: queries.open_queries, revision_id: revision.id, include_deleted: true } # A base revision doesn't have a directly corresponding workspace. Instead it uses its # derivative workspace as fallback to access the contents. Thus fallback workspace of a base # revision may not be used for backend requests. params[:workspace_id] = revision.workspace.id unless revision.base? params end UNIQUE_INDICES = [:id, :path, :permalink].freeze def store_raw_data_list_in_cache(revision, index, key, raw_data_list) raw_data_list.each do |values| UNIQUE_INDICES.each do |unique_index| unique_index_values = values["_#{unique_index}"] if unique_index_values.present? store_item_in_cache(revision, unique_index, unique_index_values.first, [values]) end end end unless UNIQUE_INDICES.include?(index) store_item_in_cache(revision, index, key, raw_data_list) end end def store_item_in_cache(revision, index, key, item) ContentStateCaching.store_obj_data(revision.content_state, index, key, item) end def assert_valid_index_name(index) raise ArgumentError, "invalid index name '#{index}'" unless VALID_INDEX_NAMES.include?(index) end def warn_backend_not_available(workspace_id, content_state_id, error_message) message = <<-EOS Couldn't connect to content service for workspace with id=#{workspace_id} and content_state_id=#{content_state_id}. #{error_message} Serving from cache. EOS Rails.logger.warn(message) end end end