# encoding: utf-8 require 'junkie/helper' require 'yaml' module Junkie module Pyload class Observer include Log, Helper, Config DEFAULT_CONFIG = { watchdog_refresh: 10, # interval the watchdog_timer is fired } def initialize(config={}) @config = Config.get_config(self) @api = Junkie::Pyload::Api.new @ready_for_new_links = true @watchdog = nil @active_downloads = Hash.new end # is a method that is called once from inside the reactor and allows us # to register custom actions into the reactor def setup in_fiber { @api.login cleanup } end # Adds new episode to Pyload which downloads the episode and extracts it # # @param [Junkie::Episode] episode which should be downloaded # # @note should only be called if `is_ready?` returns true def add_episode(episode) raise InvalidStateError, "Observer is not ready" unless is_ready? @ready_for_new_links = false episode.status = :downloading package = "%s@%s" % [ episode.id, episode.series ] pid = @api.call(:addPackage, {name: package, links: [episode.link]}) @active_downloads[pid] = episode log.info("Added #{episode} to Pyload, Pid=#{pid}") # start watchdog timer if it does not already run if @watchdog.nil? @watchdog = EM::PeriodicTimer.new(@config[:watchdog_refresh]) do monitor_progress end end end # checks if the Observer is ready to add new episodes to Pyload # # @returns [Boolean] true if new links can be added def is_ready? @ready_for_new_links end private # This method is called from the watchdog timer periodically # # It monitors the download process and reacts depending on the results def monitor_progress log.debug("Watchdog timer has been fired") in_fiber { catch(:break) { queue_data = @api.call(:getQueueData) if queue_data.empty? log.info("Empty Pyload queue, I cancel the watchdog timer") unschedule_watchdog throw :break end # extract package IDs and map them to current downloads update_package_ids(queue_data) if has_queue_any_failed_links?(queue_data) log.info("There are failed links in the Queue, will fix this") @api.call(:restartFailed) throw :break end # look for complete downloads pids = get_complete_downloads(queue_data) if not pids.empty? @api.call(:deletePackages, {pids: pids}) log.info("Complete packages are removed from Pyload Queue") @ready_for_new_links = true end } } end # Searches for failed links in the pyload queue # # @param [Array] queue_data returned from :getQueueData api method # # @return [Boolean] true if there are any failed links, false otherwise def has_queue_any_failed_links?(queue_data) queue_data.each do |package| package['links'].each do |link| status = Pyload::Api::FILE_STATUS[link['status']] return true if status == :failed end end false end # Looks for complete downloads and returns their pids # # @param [Array] queue_data returned from :getQueueData api method # # @return [Array] list of pids there the download is complete def get_complete_downloads(queue_data) pids = [] queue_data.each do |package| next unless package['links'].size > 0 next if package['linksdone'] == 0 # When extracting is in progress the status of the first link is set # to :extracting extracting = package['links'].select do |link| Pyload::Api::FILE_STATUS[link['status']] == :extracting end next unless extracting.empty? sizetotal = package['sizetotal'].to_i sizedone = package['sizedone'].to_i if sizetotal > 0 && sizedone > 0 if sizetotal == sizedone pids << package['pid'] end end end pids end # extract package IDs and map them to current downloads # # @param [Array] queue_data returned from :getQueueData api method def update_package_ids(queue_data) queue_data.each do |package| pid = package['pid'] next if @active_downloads.has_key? pid if @active_downloads.has_key? pid-1 log.info("Package ID has been changed, I will correct this") @active_downloads[pid] = @active_downloads[pid-1] @active_downloads.delete(pid-1) next end raise InvalidStateError, "PackageID #{pid} can't be mapped to active Download" end end # cancels the watchdog timer def unschedule_watchdog if @watchdog_timer @watchdog_timer.cancel end end def cleanup log.debug("Made a cleanup of Pyload's Queue") @api.call(:stopAllDownloads) packages = @api.call(:getQueue).map { |e| e['pid'] } @api.call(:deletePackages, {pids: packages}) log.debug("The following packages have been removed: #{packages}") @api.call(:unpauseServer) end end end end