lib/berkshelf/api/cache_manager.rb in berkshelf-api-0.2.0 vs lib/berkshelf/api/cache_manager.rb in berkshelf-api-1.0.0

- old
+ new

@@ -8,16 +8,20 @@ end include Berkshelf::API::GenericServer include Berkshelf::API::Logging + extend Forwardable + def_delegators :@cache, :warmed?, :set_warmed, :clear + SAVE_INTERVAL = 30.0 server_name :cache_manager finalizer :finalize_callback - exclusive :add, :clear, :remove, :save + exclusive :merge, :add, :remove + # @return [DependencyCache] attr_reader :cache def initialize log.info "Cache Manager starting..." @cache = DependencyCache.new @@ -28,20 +32,83 @@ # @param [RemoteCookbook] cookbook # @param [Ridley::Chef::Cookbook::Metadata] metadata # # @return [Hash] def add(cookbook, metadata) - @cache.add(cookbook, metadata) + log.debug "#{self} adding (#{cookbook.name}, #{cookbook.version})" + cache.add(cookbook, metadata) end - # Clear any items added to the cache + # Remove the cached item matching the given name and version # - # @return [Hash] - def clear - @cache.clear + # @param [#to_s] name + # @param [#to_s] version + # + # @return [DependencyCache] + def remove(name, version) + log.debug "#{self} removing (#{name}, #{version})" + cache.remove(name, version) end + # Loops through a list of workers and merges their cookbook sets into the cache + # + # @param [Array<CacheBuilder::Worker::Base>] workers + # The workers for this cache + # + # @return [Boolean] + def process_workers(workers) + # If the cache has been warmed already, we want to spawn + # workers for all the endpoints concurrently. However, if the + # cache is cold we want to run sequentially, so higher priority + # endpoints can work before lower priority, avoiding duplicate + # downloads. + # We don't want crashing workers to crash the CacheManager. + # Crashes are logged so just ignore the exceptions + if warmed? + Array(workers).flatten.collect do |worker| + self.future(:process_worker, worker) + end.each do |f| + f.value rescue nil + end + else + Array(workers).flatten.each do |worker| + process_worker(worker) rescue nil + end + end + self.set_warmed + end + + # @param [CacheBuilder::Worker::Base] worker + def process_worker(worker) + log.info "processing #{worker}" + remote_cookbooks = worker.cookbooks + log.info "found #{remote_cookbooks.size} cookbooks from #{worker}" + created_cookbooks, deleted_cookbooks = diff(remote_cookbooks, worker.priority) + log.debug "#{created_cookbooks.size} cookbooks to be added to the cache from #{worker}" + log.debug "#{deleted_cookbooks.size} cookbooks to be removed from the cache from #{worker}" + + # Process metadata in chunks - Ridley cookbook resource uses a + # task_class TaskThread, which means each future gets its own + # thread. If we have many (>2000) cookbooks we can easily + # exhaust the available threads on the system. + created_cookbooks_with_metadata = [] + until created_cookbooks.empty? + work = created_cookbooks.slice!(0,500) + log.info "processing metadata for #{work.size} cookbooks with #{created_cookbooks.size} remaining on #{worker}" + work.map! do |remote| + [ remote, worker.future(:metadata, remote) ] + end.map! do |remote, metadata| + [remote, metadata.value] + end + created_cookbooks_with_metadata += work + end + + log.info "about to merge cookbooks" + merge(created_cookbooks_with_metadata, deleted_cookbooks) + log.info "#{self} cache updated." + end + # Check if the cache knows about the given cookbook version # # @param [#to_s] name # @param [#to_s] version # @@ -49,46 +116,55 @@ def has_cookbook?(name, version) @cache.has_cookbook?(name, version) end def load_save + log.info "Loading save from #{self.class.cache_file}" @cache = DependencyCache.from_file(self.class.cache_file) + log.info "Cache contains #{@cache.cookbooks.size} items" end - # Remove the cached item matching the given name and version - # - # @param [#to_s] name - # @param [#to_s] version - # - # @return [DependencyCache] - def remove(name, version) - @cache.remove(name, version) - end + private - def save - log.info "Saving the cache to: #{self.class.cache_file}" - cache.save(self.class.cache_file) - log.info "Cache saved!" - end + def merge(created_cookbooks, deleted_cookbooks) + log.info "#{self} adding (#{created_cookbooks.length}) items..." + created_cookbooks.each do |remote_with_metadata| + remote, metadata = remote_with_metadata + add(remote, metadata) + end - # @param [Array<RemoteCookbook>] cookbooks - # An array of RemoteCookbooks representing all the cookbooks on the indexed site - # - # @return [Array<Array<RemoteCookbook>, Array<RemoteCookbook>>] - # A tuple of Arrays of RemoteCookbooks - # The first array contains items not in the cache - # The second array contains items in the cache, but not in the cookbooks parameter - def diff(cookbooks) - known_cookbooks = cache.cookbooks - created_cookbooks = cookbooks - known_cookbooks - deleted_cookbooks = known_cookbooks - cookbooks - [ created_cookbooks, deleted_cookbooks ] - end + log.info "#{self} removing (#{deleted_cookbooks.length}) items..." + deleted_cookbooks.each { |remote| remove(remote.name, remote.version) } - private + log.info "#{self} cache updated." + save + end + def save + if warmed? + log.info "Saving the cache to: #{self.class.cache_file}" + cache.save(self.class.cache_file) + log.info "Cache saved!" + end + end + + # @param [Array<RemoteCookbook>] cookbooks + # An array of RemoteCookbooks representing all the cookbooks on the indexed site + # @param [Integer] worker_priority + # The priority/ID of the endpoint that is running + # @return [Array(Array<RemoteCookbook>, Array<RemoteCookbook>)] + # A tuple of Arrays of RemoteCookbooks + # The first array contains items not in the cache + # The second array contains items in the cache, but not in the cookbooks parameter + def diff(cookbooks, worker_priority) + known_cookbooks = cache.cookbooks.select { |c| c.priority <= worker_priority } + created_cookbooks = cookbooks - known_cookbooks + deleted_cookbooks = (known_cookbooks - cookbooks).select { |c| c.priority == worker_priority } + [ created_cookbooks, deleted_cookbooks ] + end + def finalize_callback log.info "Cache Manager shutting down..." - self.save + save end end end