# encoding: utf-8 require 'junkie/helper' require 'yaml' module Junkie module Pyload class Observer include Log, Helper, Config attr_accessor :api DEFAULT_CONFIG = { watchdog_refresh: 10, # interval the watchdog_timer is fired dont_add_episodes_to_queue: false, } def initialize(channels) @config = Config.get_config(self) @api = Junkie::Pyload::Api.new @channels = channels @found_episodes = Array.new @active_episode = nil @ready_for_new_links = true @watchdog_enabled = false @skipped_timer_at_first_complete_detection = false @should_stat_queued_episodes = false @channels[:episodes].subscribe do |episode| next unless episode.status == :decrypted log.info("Got episode from Channel: #{episode}") @found_episodes.push(episode) end end # is a method that is called once from inside the reactor and allows us # to register custom actions into the reactor def setup in_fiber { @api.login cleanup stat_queued_episodes stat_current_download } EM.add_periodic_timer(@config[:watchdog_refresh]) do monitor_progress if @watchdog_enabled if @should_stat_queued_episodes stat_queued_episodes @should_stat_queued_episodes = false end if not @config[:dont_add_episodes_to_queue] in_fiber { add_next_episode_to_pyload } end end end private # Add next episode to Pyload which downloads the episode and extracts it # # @note should only be called if `is_ready?` returns true def add_next_episode_to_pyload # detect if we cann add a new episode return unless is_ready? return if @found_episodes.empty? @active_episode = @found_episodes.shift episode = @active_episode stat_queued_episodes @ready_for_new_links = false episode.status = :downloading package = "%s@%s" % [ episode.id, episode.series ] pid = @api.call(:addPackage, {name: package, links: episode.links }) episode.pid = pid log.info("Added #{episode} to Pyload, Pid=#{pid}") (@watchdog_enabled = true) unless @watchdog_enabled end # sends the current number of queued episodes out on the info channel def stat_queued_episodes @channels[:info].push({ key: "Pending Episodes", desc: "Number of episodes queued for download", value: @found_episodes.size, additional: @found_episodes.map { |e| e.to_s } }) end # send the info about the current active download out on the # info channel def stat_current_download(complete=nil) @channels[:info].push({ key: "Current active download", desc: @active_episode ? @active_episode.to_s : "No active download", value: complete ? "#{complete} %" : "-", }) end # checks if the Observer is ready to add new episodes to Pyload # # @returns [Boolean] true if new links can be added def is_ready? @ready_for_new_links end # Utility method that fetches the QueueData, validates it and returns # the JSON Object as a real Ruby Object # # @raise [Junkie::InvalidResponse] if the response is faulty # @return [Array] the JSON response as an object def get_queue_data data = @api.call(:getQueueData) if data && data.is_a?(Array) return data end raise Junkie::InvalidResponse, "'#{data}' is not a valid response from Pyload" end ######################################################################### # This method is called from the watchdog timer periodically # # # # It monitors the download process and reacts depending on the results # def monitor_progress log.debug("Watchdog timer has been fired") in_fiber { catch(:break) do begin queue_data = get_queue_data # check for empty queue and disable watchdog if needed check_for_empty_queue(queue_data) stat_current_download(get_complete_percentage(queue_data)) # extract package IDs and map them to current downloads update_package_ids(queue_data) check_for_failed_links(queue_data) check_for_complete_packages(queue_data) rescue Junkie::InvalidResponse => e log.error( "Pyload returns InvalidResponse (#{e}), I will ignore this") rescue Exception => e log.error( "an unexpected error is occured (#{e}), I will ignore this") end end } end # Checks if the Pyload Queue is empty and disables the watchdog if # it is really real # # @param [Array] queue_data returned from :getQueueData api method def check_for_empty_queue(queue_data) if queue_data.empty? unless @active_episode.nil? log.info("Empty queue detected but a download is set as active") return end log.info("Empty Pyload queue detected, will double check") # check twice if the queue is definitely empty check_data = get_queue_data if check_data.empty? log.info("Double check is successful, I will disable the watchdog") @watchdog_enabled = false throw :break else log.info("Double check failed, Queue is not empty") end end end # Searches for failed links in the pyload queue and reacts on this # through issuing a restartFailed call # # @param [Array] queue_data returned from :getQueueData api method def check_for_failed_links(queue_data) failed = 0 queue_data.each do |package| package['links'].each do |link| status = Pyload::Api::FILE_STATUS[link['status']] (failed += 1) if status == :failed end end if failed > 0 log.info("There are failed links in the Queue, will fix this") @api.call(:restartFailed) throw :break end end # Looks for complete packages and does some post-processing # # @param [Array] queue_data returned from :getQueueData api method def check_for_complete_packages(queue_data) pids = get_complete_downloads(queue_data) throw :break if pids.empty? if @skipped_timer_at_first_complete_detection # post process complete active download and send it out on the # channel if pids.include? @active_episode.pid log.info("'#{@active_episode}' has been extracted completely") @active_episode.pid = nil @active_episode.status = :extracted log.info("Sending complete episode out on the channel") @channels[:episodes].push(@active_episode) @active_episode = nil stat_current_download end # remove all complete packages @api.call(:deletePackages, { pids: pids }) log.info("Complete packages are removed from Pyload Queue #{ pids }") @skipped_timer_at_first_complete_detection = false @ready_for_new_links = true else # If a package is complete, we are sometimes so fast to remove # it, before the extracting can be started, so we will skip it # the first time log.info("Complete package detected, skip the first time") @skipped_timer_at_first_complete_detection = true end end # get the percentage of completeness # # @param [Array] queue_data returned from :getQueueData api method # # @return [int] percentage of current download or nil def get_complete_percentage(queue_data) pids = [] queue_data.each do |package| begin sizetotal = package['sizetotal'].to_i sizedone = package['sizedone'].to_i return (sizedone * 100) / sizetotal rescue Exception => e end end nil end # Looks for complete downloads and returns their pids # # @param [Array] queue_data returned from :getQueueData api method # # @return [Array] list of pids there the download is complete def get_complete_downloads(queue_data) pids = [] queue_data.each do |package| next unless package['links'].size > 0 next if package['linksdone'] == 0 invalid_links = package['links'].reject do |e| [:done, :skipped].include?( Junkie::Pyload::Api::FILE_STATUS[e['status']]) end next unless invalid_links.empty? sizetotal = package['sizetotal'].to_i sizedone = package['sizedone'].to_i if sizetotal > 0 && sizedone > 0 && sizetotal == sizedone pids << package['pid'] end end pids end # extract package IDs and map them to current downloads # # @param [Array] queue_data returned from :getQueueData api method def update_package_ids(queue_data) if queue_data.size == 1 pid = queue_data[0]['pid'] if @active_episode && @active_episode.pid != pid log.info("Package ID has been changed, New PID is #{pid}") @active_episode.pid = pid end elsif queue_data.size > 1 raise InvalidStateError, "There is more than 1 Package in the Queue" end end # Removes existing packages from the Pyload Queue and creates a valid # state to begin communication def cleanup log.debug("Made a cleanup of Pyload's Queue") @api.call(:stopAllDownloads) packages = @api.call(:getQueue).map { |e| e['pid'] } @api.call(:deletePackages, {pids: packages}) log.debug("The following packages have been removed: #{packages}") @api.call(:unpauseServer) end end end end