lib/junkie/pyload/observer.rb in junkie-0.0.6 vs lib/junkie/pyload/observer.rb in junkie-0.0.7

- old
+ new

@@ -5,10 +5,12 @@ module Junkie module Pyload class Observer include Log, Helper, Config + attr_accessor :api + DEFAULT_CONFIG = { watchdog_refresh: 10, # interval the watchdog_timer is fired } def initialize(channels) @@ -23,11 +25,10 @@ @active_episode = nil @ready_for_new_links = true @watchdog_enabled = false @skipped_timer_at_first_complete_detection = false - @should_send_it_on_channel = false @channels[:episodes].subscribe do |episode| next unless episode.status == :found log.info("Got episode from Channel: #{episode}") @@ -43,28 +44,17 @@ @api.login cleanup } EM.add_periodic_timer(@config[:watchdog_refresh]) do - - # temporary fix for the SystemStackError - if @should_send_it_on_channel - log.info("Sending complete episode out on the channel") - @channels[:episodes].push(@active_episode) - @active_episode = nil - - @should_send_it_on_channel = false - end - monitor_progress if @watchdog_enabled - in_fiber { - add_next_episode_to_pyload - } + in_fiber { add_next_episode_to_pyload } end end + private # Add next episode to Pyload which downloads the episode and extracts it # # @note should only be called if `is_ready?` returns true def add_next_episode_to_pyload @@ -97,88 +87,139 @@ # @returns [Boolean] true if new links can be added def is_ready? @ready_for_new_links end - private - # This method is called from the watchdog timer periodically + # Utility method that fetches the QueueData, validates it and returns + # the JSON Object as a real Ruby Object # - # It monitors the download process and reacts depending on the results + # @raise [Junkie::InvalidResponse] if the response is faulty + # @return [Array] the JSON response as an object + def get_queue_data + data = @api.call(:getQueueData) + + if data && data.is_a?(Array) + return data + end + raise Junkie::InvalidResponse, + "'#{data}' is not a valid response from Pyload" + end + + + ######################################################################### + # This method is called from the watchdog timer periodically # + # # + # It monitors the download process and reacts depending on the results # def monitor_progress log.debug("Watchdog timer has been fired") in_fiber { catch(:break) { - queue_data = @api.call(:getQueueData) + queue_data = get_queue_data - if queue_data.empty? and @active_episode.nil? - log.info("Empty Pyload queue, I cancel the watchdog timer") - @watchdog_enabled = false - throw :break - end + # check for empty queue and disable watchdog if needed + check_for_empty_queue(queue_data) # extract package IDs and map them to current downloads update_package_ids(queue_data) - if has_queue_any_failed_links?(queue_data) - log.info("There are failed links in the Queue, will fix this") - @api.call(:restartFailed) - throw :break - end + check_for_failed_links(queue_data) - # look for complete downloads - pids = get_complete_downloads(queue_data) + check_for_complete_packages(queue_data) + } + } + rescue Junkie::InvalidResponse => e + log.error("Pyload returns InvalidResponse (#{e}), I will ignore this") + end - if not pids.empty? - if @skipped_timer_at_first_complete_detection - # post process complete active download and send it out on the - # channel - if pids.include? @active_episode.pid - log.info("'#{@active_episode}' is extracted completely") - @active_episode.pid = nil - @active_episode.status = :extracted + # Checks if the Pyload Queue is empty and disables the watchdog if + # it is really real + # + # @param [Array] queue_data returned from :getQueueData api method + def check_for_empty_queue(queue_data) + if queue_data.empty? - @should_send_it_on_channel = true - end + unless @active_episode.nil? + log.info("Empty queue detected but a download is set as active") + return + end - # remove all complete packages - @api.call(:deletePackages, {pids: pids}) - log.info("Complete packages are removed from Pyload Queue") + log.info("Empty Pyload queue detected, will double check") - @skipped_timer_at_first_complete_detection = false - @ready_for_new_links = true - else - # If a package is complete, we are sometimes so fast to remove - # it, before the extracting can be started, so we will skip it - # the first time - log.info("Complete package detected, skip the first time") - @skipped_timer_at_first_complete_detection = true - end - end - } - } + # check twice if the queue is definitely empty + check_data = get_queue_data + if check_data.empty? + log.info("Double check is successful, I will disable the watchdog") + @watchdog_enabled = false + throw :break + else + log.info("Double check failed, Queue is not empty") + end + end end - - # Searches for failed links in the pyload queue + # Searches for failed links in the pyload queue and reacts on this + # through issuing a restartFailed call # # @param [Array] queue_data returned from :getQueueData api method - # - # @return [Boolean] true if there are any failed links, false otherwise - def has_queue_any_failed_links?(queue_data) + def check_for_failed_links(queue_data) + failed = 0 queue_data.each do |package| package['links'].each do |link| status = Pyload::Api::FILE_STATUS[link['status']] - return true if status == :failed + (failed += 1) if status == :failed end end - false + if failed > 0 + log.info("There are failed links in the Queue, will fix this") + @api.call(:restartFailed) + throw :break + end end + # Looks for complete packages and does some post-processing + # + # @param [Array] queue_data returned from :getQueueData api method + def check_for_complete_packages(queue_data) + pids = get_complete_downloads(queue_data) + + throw :break if pids.empty? + + if @skipped_timer_at_first_complete_detection + + # post process complete active download and send it out on the + # channel + if pids.include? @active_episode.pid + log.info("'#{@active_episode}' has been extracted completely") + @active_episode.pid = nil + @active_episode.status = :extracted + + log.info("Sending complete episode out on the channel") + @channels[:episodes].push(@active_episode) + @active_episode = nil + end + + # remove all complete packages + @api.call(:deletePackages, { pids: pids }) + log.info("Complete packages are removed from Pyload Queue #{ pids }") + + @skipped_timer_at_first_complete_detection = false + @ready_for_new_links = true + + else + # If a package is complete, we are sometimes so fast to remove + # it, before the extracting can be started, so we will skip it + # the first time + log.info("Complete package detected, skip the first time") + @skipped_timer_at_first_complete_detection = true + end + end + + # Looks for complete downloads and returns their pids # # @param [Array] queue_data returned from :getQueueData api method # # @return [Array] list of pids there the download is complete @@ -187,49 +228,48 @@ queue_data.each do |package| next unless package['links'].size > 0 next if package['linksdone'] == 0 - # When extracting is in progress the status of the first link is set - # to :extracting - extracting = package['links'].select do |link| - Pyload::Api::FILE_STATUS[link['status']] == :extracting + invalid_links = package['links'].reject do |e| + [:done, :skipped].include?( + Junkie::Pyload::Api::FILE_STATUS[e['status']]) end - next unless extracting.empty? + next unless invalid_links.empty? sizetotal = package['sizetotal'].to_i sizedone = package['sizedone'].to_i - if sizetotal > 0 && sizedone > 0 - - if sizetotal == sizedone + if sizetotal > 0 && sizedone > 0 && sizetotal == sizedone pids << package['pid'] - end end end pids end + # extract package IDs and map them to current downloads # # @param [Array] queue_data returned from :getQueueData api method def update_package_ids(queue_data) - queue_data.each do |package| - pid = package['pid'] - next if @active_episode.pid == pid - if @active_episode.pid == pid-1 - log.info("Package ID has been changed, I will correct this") + if queue_data.size == 1 + pid = queue_data[0]['pid'] + if @active_episode && @active_episode.pid != pid + log.info("Package ID has been changed, New PID is #{pid}") @active_episode.pid = pid - next end + elsif queue_data.size > 1 raise InvalidStateError, - "PackageID #{pid} can't be mapped to active Download" + "There is more than 1 Package in the Queue" end end + + # Removes existing packages from the Pyload Queue and creates a valid + # state to begin communication def cleanup log.debug("Made a cleanup of Pyload's Queue") @api.call(:stopAllDownloads) packages = @api.call(:getQueue).map { |e| e['pid'] }