lib/junkie/pyload/observer.rb in junkie-0.0.6 vs lib/junkie/pyload/observer.rb in junkie-0.0.7
- old
+ new
@@ -5,10 +5,12 @@
module Junkie
module Pyload
class Observer
include Log, Helper, Config
+ attr_accessor :api
+
DEFAULT_CONFIG = {
watchdog_refresh: 10, # interval the watchdog_timer is fired
}
def initialize(channels)
@@ -23,11 +25,10 @@
@active_episode = nil
@ready_for_new_links = true
@watchdog_enabled = false
@skipped_timer_at_first_complete_detection = false
- @should_send_it_on_channel = false
@channels[:episodes].subscribe do |episode|
next unless episode.status == :found
log.info("Got episode from Channel: #{episode}")
@@ -43,28 +44,17 @@
@api.login
cleanup
}
EM.add_periodic_timer(@config[:watchdog_refresh]) do
-
- # temporary fix for the SystemStackError
- if @should_send_it_on_channel
- log.info("Sending complete episode out on the channel")
- @channels[:episodes].push(@active_episode)
- @active_episode = nil
-
- @should_send_it_on_channel = false
- end
-
monitor_progress if @watchdog_enabled
- in_fiber {
- add_next_episode_to_pyload
- }
+ in_fiber { add_next_episode_to_pyload }
end
end
+ private
# Add next episode to Pyload which downloads the episode and extracts it
#
# @note should only be called if `is_ready?` returns true
def add_next_episode_to_pyload
@@ -97,88 +87,139 @@
# @returns [Boolean] true if new links can be added
def is_ready?
@ready_for_new_links
end
- private
- # This method is called from the watchdog timer periodically
+ # Utility method that fetches the QueueData, validates it and returns
+ # the JSON Object as a real Ruby Object
#
- # It monitors the download process and reacts depending on the results
+ # @raise [Junkie::InvalidResponse] if the response is faulty
+ # @return [Array] the JSON response as an object
+ def get_queue_data
+ data = @api.call(:getQueueData)
+
+ if data && data.is_a?(Array)
+ return data
+ end
+ raise Junkie::InvalidResponse,
+ "'#{data}' is not a valid response from Pyload"
+ end
+
+
+ #########################################################################
+ # This method is called from the watchdog timer periodically #
+ # #
+ # It monitors the download process and reacts depending on the results #
def monitor_progress
log.debug("Watchdog timer has been fired")
in_fiber {
catch(:break) {
- queue_data = @api.call(:getQueueData)
+ queue_data = get_queue_data
- if queue_data.empty? and @active_episode.nil?
- log.info("Empty Pyload queue, I cancel the watchdog timer")
- @watchdog_enabled = false
- throw :break
- end
+ # check for empty queue and disable watchdog if needed
+ check_for_empty_queue(queue_data)
# extract package IDs and map them to current downloads
update_package_ids(queue_data)
- if has_queue_any_failed_links?(queue_data)
- log.info("There are failed links in the Queue, will fix this")
- @api.call(:restartFailed)
- throw :break
- end
+ check_for_failed_links(queue_data)
- # look for complete downloads
- pids = get_complete_downloads(queue_data)
+ check_for_complete_packages(queue_data)
+ }
+ }
+ rescue Junkie::InvalidResponse => e
+ log.error("Pyload returns InvalidResponse (#{e}), I will ignore this")
+ end
- if not pids.empty?
- if @skipped_timer_at_first_complete_detection
- # post process complete active download and send it out on the
- # channel
- if pids.include? @active_episode.pid
- log.info("'#{@active_episode}' is extracted completely")
- @active_episode.pid = nil
- @active_episode.status = :extracted
+ # Checks if the Pyload Queue is empty and disables the watchdog if
+ # it is really real
+ #
+ # @param [Array] queue_data returned from :getQueueData api method
+ def check_for_empty_queue(queue_data)
+ if queue_data.empty?
- @should_send_it_on_channel = true
- end
+ unless @active_episode.nil?
+ log.info("Empty queue detected but a download is set as active")
+ return
+ end
- # remove all complete packages
- @api.call(:deletePackages, {pids: pids})
- log.info("Complete packages are removed from Pyload Queue")
+ log.info("Empty Pyload queue detected, will double check")
- @skipped_timer_at_first_complete_detection = false
- @ready_for_new_links = true
- else
- # If a package is complete, we are sometimes so fast to remove
- # it, before the extracting can be started, so we will skip it
- # the first time
- log.info("Complete package detected, skip the first time")
- @skipped_timer_at_first_complete_detection = true
- end
- end
- }
- }
+ # check twice if the queue is definitely empty
+ check_data = get_queue_data
+ if check_data.empty?
+ log.info("Double check is successful, I will disable the watchdog")
+ @watchdog_enabled = false
+ throw :break
+ else
+ log.info("Double check failed, Queue is not empty")
+ end
+ end
end
-
- # Searches for failed links in the pyload queue
+ # Searches for failed links in the pyload queue and reacts on this
+ # through issuing a restartFailed call
#
# @param [Array] queue_data returned from :getQueueData api method
- #
- # @return [Boolean] true if there are any failed links, false otherwise
- def has_queue_any_failed_links?(queue_data)
+ def check_for_failed_links(queue_data)
+ failed = 0
queue_data.each do |package|
package['links'].each do |link|
status = Pyload::Api::FILE_STATUS[link['status']]
- return true if status == :failed
+ (failed += 1) if status == :failed
end
end
- false
+ if failed > 0
+ log.info("There are failed links in the Queue, will fix this")
+ @api.call(:restartFailed)
+ throw :break
+ end
end
+ # Looks for complete packages and does some post-processing
+ #
+ # @param [Array] queue_data returned from :getQueueData api method
+ def check_for_complete_packages(queue_data)
+ pids = get_complete_downloads(queue_data)
+
+ throw :break if pids.empty?
+
+ if @skipped_timer_at_first_complete_detection
+
+ # post process complete active download and send it out on the
+ # channel
+ if pids.include? @active_episode.pid
+ log.info("'#{@active_episode}' has been extracted completely")
+ @active_episode.pid = nil
+ @active_episode.status = :extracted
+
+ log.info("Sending complete episode out on the channel")
+ @channels[:episodes].push(@active_episode)
+ @active_episode = nil
+ end
+
+ # remove all complete packages
+ @api.call(:deletePackages, { pids: pids })
+ log.info("Complete packages are removed from Pyload Queue #{ pids }")
+
+ @skipped_timer_at_first_complete_detection = false
+ @ready_for_new_links = true
+
+ else
+ # If a package is complete, we are sometimes so fast to remove
+ # it, before the extracting can be started, so we will skip it
+ # the first time
+ log.info("Complete package detected, skip the first time")
+ @skipped_timer_at_first_complete_detection = true
+ end
+ end
+
+
# Looks for complete downloads and returns their pids
#
# @param [Array] queue_data returned from :getQueueData api method
#
# @return [Array] list of pids there the download is complete
@@ -187,49 +228,48 @@
queue_data.each do |package|
next unless package['links'].size > 0
next if package['linksdone'] == 0
- # When extracting is in progress the status of the first link is set
- # to :extracting
- extracting = package['links'].select do |link|
- Pyload::Api::FILE_STATUS[link['status']] == :extracting
+ invalid_links = package['links'].reject do |e|
+ [:done, :skipped].include?(
+ Junkie::Pyload::Api::FILE_STATUS[e['status']])
end
- next unless extracting.empty?
+ next unless invalid_links.empty?
sizetotal = package['sizetotal'].to_i
sizedone = package['sizedone'].to_i
- if sizetotal > 0 && sizedone > 0
-
- if sizetotal == sizedone
+ if sizetotal > 0 && sizedone > 0 && sizetotal == sizedone
pids << package['pid']
- end
end
end
pids
end
+
# extract package IDs and map them to current downloads
#
# @param [Array] queue_data returned from :getQueueData api method
def update_package_ids(queue_data)
- queue_data.each do |package|
- pid = package['pid']
- next if @active_episode.pid == pid
- if @active_episode.pid == pid-1
- log.info("Package ID has been changed, I will correct this")
+ if queue_data.size == 1
+ pid = queue_data[0]['pid']
+ if @active_episode && @active_episode.pid != pid
+ log.info("Package ID has been changed, New PID is #{pid}")
@active_episode.pid = pid
- next
end
+ elsif queue_data.size > 1
raise InvalidStateError,
- "PackageID #{pid} can't be mapped to active Download"
+ "There is more than 1 Package in the Queue"
end
end
+
+ # Removes existing packages from the Pyload Queue and creates a valid
+ # state to begin communication
def cleanup
log.debug("Made a cleanup of Pyload's Queue")
@api.call(:stopAllDownloads)
packages = @api.call(:getQueue).map { |e| e['pid'] }