# encoding: utf-8 require 'junkie/helper' require 'yaml' module Junkie module Pyload class Observer include Log, Helper, Config DEFAULT_CONFIG = { watchdog_refresh: 10, # interval the watchdog_timer is fired } def initialize(channels) @config = Config.get_config(self) @api = Junkie::Pyload::Api.new @channels = channels @found_episodes = Array.new @active_episode = nil @ready_for_new_links = true @watchdog_enabled = false @skipped_timer_at_first_complete_detection = false @should_send_it_on_channel = false @channels[:episodes].subscribe do |episode| next unless episode.status == :found log.info("Got episode from Channel: #{episode}") @found_episodes.push(episode) end end # is a method that is called once from inside the reactor and allows us # to register custom actions into the reactor def setup in_fiber { @api.login cleanup } EM.add_periodic_timer(@config[:watchdog_refresh]) do # temporary fix for the SystemStackError if @should_send_it_on_channel log.info("Sending complete episode out on the channel") @channels[:episodes].push(@active_episode) @active_episode = nil @should_send_it_on_channel = false end monitor_progress if @watchdog_enabled in_fiber { add_next_episode_to_pyload } end end # Add next episode to Pyload which downloads the episode and extracts it # # @note should only be called if `is_ready?` returns true def add_next_episode_to_pyload # detect if we cann add a new episode return unless is_ready? return if @found_episodes.empty? @active_episode = @found_episodes.shift episode = @active_episode @ready_for_new_links = false episode.status = :downloading package = "%s@%s" % [ episode.id, episode.series ] pid = @api.call(:addPackage, { name: package, links: [episode.link] }) episode.pid = pid log.info("Added #{episode} to Pyload, Pid=#{pid}") (@watchdog_enabled = true) unless @watchdog_enabled end # checks if the Observer is ready to add new episodes to Pyload # # @returns [Boolean] true if new links can be added def is_ready? @ready_for_new_links end private # This method is called from the watchdog timer periodically # # It monitors the download process and reacts depending on the results def monitor_progress log.debug("Watchdog timer has been fired") in_fiber { catch(:break) { queue_data = @api.call(:getQueueData) if queue_data.empty? log.info("Empty Pyload queue, I cancel the watchdog timer") @watchdog_enabled = false throw :break end # extract package IDs and map them to current downloads update_package_ids(queue_data) if has_queue_any_failed_links?(queue_data) log.info("There are failed links in the Queue, will fix this") @api.call(:restartFailed) throw :break end # look for complete downloads pids = get_complete_downloads(queue_data) if not pids.empty? if @skipped_timer_at_first_complete_detection # post process complete active download and send it out on the # channel if pids.include? @active_episode.pid log.info("'#{@active_episode}' is extracted completely") @active_episode.pid = nil @active_episode.status = :extracted @should_send_it_on_channel = true end # remove all complete packages @api.call(:deletePackages, {pids: pids}) log.info("Complete packages are removed from Pyload Queue") @skipped_timer_at_first_complete_detection = false @ready_for_new_links = true else # If a package is complete, we are sometimes so fast to remove # it, before the extracting can be started, so we will skip it # the first time log.info("Complete package detected, skip the first time") @skipped_timer_at_first_complete_detection = true end end } } end # Searches for failed links in the pyload queue # # @param [Array] queue_data returned from :getQueueData api method # # @return [Boolean] true if there are any failed links, false otherwise def has_queue_any_failed_links?(queue_data) queue_data.each do |package| package['links'].each do |link| status = Pyload::Api::FILE_STATUS[link['status']] return true if status == :failed end end false end # Looks for complete downloads and returns their pids # # @param [Array] queue_data returned from :getQueueData api method # # @return [Array] list of pids there the download is complete def get_complete_downloads(queue_data) pids = [] queue_data.each do |package| next unless package['links'].size > 0 next if package['linksdone'] == 0 # When extracting is in progress the status of the first link is set # to :extracting extracting = package['links'].select do |link| Pyload::Api::FILE_STATUS[link['status']] == :extracting end next unless extracting.empty? sizetotal = package['sizetotal'].to_i sizedone = package['sizedone'].to_i if sizetotal > 0 && sizedone > 0 if sizetotal == sizedone pids << package['pid'] end end end pids end # extract package IDs and map them to current downloads # # @param [Array] queue_data returned from :getQueueData api method def update_package_ids(queue_data) queue_data.each do |package| pid = package['pid'] next if @active_episode.pid == pid if @active_episode.pid == pid-1 log.info("Package ID has been changed, I will correct this") @active_episode.pid = pid next end raise InvalidStateError, "PackageID #{pid} can't be mapped to active Download" end end def cleanup log.debug("Made a cleanup of Pyload's Queue") @api.call(:stopAllDownloads) packages = @api.call(:getQueue).map { |e| e['pid'] } @api.call(:deletePackages, {pids: packages}) log.debug("The following packages have been removed: #{packages}") @api.call(:unpauseServer) end end end end