lib/junkie/pyload/observer.rb in junkie-0.0.3 vs lib/junkie/pyload/observer.rb in junkie-0.0.4
- old
+ new
@@ -9,57 +9,91 @@
DEFAULT_CONFIG = {
watchdog_refresh: 10, # interval the watchdog_timer is fired
}
- def initialize(config={})
+ def initialize(channels)
@config = Config.get_config(self)
@api = Junkie::Pyload::Api.new
+ @channels = channels
+
+ @found_episodes = Array.new
+
+ @active_episode = nil
+
@ready_for_new_links = true
@watchdog_enabled = false
- @active_downloads = Hash.new
-
@skipped_timer_at_first_complete_detection = false
+ @should_send_it_on_channel = false
+
+ @channels[:episodes].subscribe do |episode|
+ next unless episode.status == :found
+
+ log.info("Got episode from Channel: #{episode}")
+ @found_episodes.push(episode)
+ end
end
+
# is a method that is called once from inside the reactor and allows us
# to register custom actions into the reactor
def setup
in_fiber {
@api.login
cleanup
}
EM.add_periodic_timer(@config[:watchdog_refresh]) do
+
+ # temporary fix for the SystemStackError
+ if @should_send_it_on_channel
+ log.info("Sending complete episode out on the channel")
+ @channels[:episodes].push(@active_episode)
+ @active_episode = nil
+
+ @should_send_it_on_channel = false
+ end
+
monitor_progress if @watchdog_enabled
+
+ in_fiber {
+ add_next_episode_to_pyload
+ }
end
end
- # Adds new episode to Pyload which downloads the episode and extracts it
+
+ # Add next episode to Pyload which downloads the episode and extracts it
#
- # @param [Junkie::Episode] episode which should be downloaded
- #
# @note should only be called if `is_ready?` returns true
- def add_episode(episode)
- raise InvalidStateError, "Observer is not ready" unless is_ready?
- @ready_for_new_links = false
+ def add_next_episode_to_pyload
+ # detect if we cann add a new episode
+ return unless is_ready?
+ return if @found_episodes.empty?
+
+ @active_episode = @found_episodes.shift
+ episode = @active_episode
+
+ @ready_for_new_links = false
episode.status = :downloading
package = "%s@%s" % [ episode.id, episode.series ]
- pid = @api.call(:addPackage, {name: package, links: [episode.link]})
+ pid = @api.call(:addPackage,
+ { name: package, links: [episode.link] })
- @active_downloads[pid] = episode
+ episode.pid = pid
log.info("Added #{episode} to Pyload, Pid=#{pid}")
(@watchdog_enabled = true) unless @watchdog_enabled
end
+
# checks if the Observer is ready to add new episodes to Pyload
#
# @returns [Boolean] true if new links can be added
def is_ready?
@ready_for_new_links
@@ -70,11 +104,10 @@
# This method is called from the watchdog timer periodically
#
# It monitors the download process and reacts depending on the results
def monitor_progress
log.debug("Watchdog timer has been fired")
-
in_fiber {
catch(:break) {
queue_data = @api.call(:getQueueData)
if queue_data.empty?
@@ -95,14 +128,26 @@
# look for complete downloads
pids = get_complete_downloads(queue_data)
if not pids.empty?
if @skipped_timer_at_first_complete_detection
+
+ # post process complete active download and send it out on the
+ # channel
+ if pids.include? @active_episode.pid
+ log.info("'#{@active_episode}' is extracted completely")
+ @active_episode.pid = nil
+ @active_episode.status = :extracted
+
+ @should_send_it_on_channel = true
+ end
+
+ # remove all complete packages
@api.call(:deletePackages, {pids: pids})
log.info("Complete packages are removed from Pyload Queue")
- @skipped_timer_at_first_complete_detection = false
+ @skipped_timer_at_first_complete_detection = false
@ready_for_new_links = true
else
# If a package is complete, we are sometimes so fast to remove
# it, before the extracting can be started, so we will skip it
# the first time
@@ -112,10 +157,11 @@
end
}
}
end
+
# Searches for failed links in the pyload queue
#
# @param [Array] queue_data returned from :getQueueData api method
#
# @return [Boolean] true if there are any failed links, false otherwise
@@ -128,10 +174,11 @@
end
false
end
+
# Looks for complete downloads and returns their pids
#
# @param [Array] queue_data returned from :getQueueData api method
#
# @return [Array] list of pids there the download is complete
@@ -167,15 +214,14 @@
#
# @param [Array] queue_data returned from :getQueueData api method
def update_package_ids(queue_data)
queue_data.each do |package|
pid = package['pid']
- next if @active_downloads.has_key? pid
+ next if @active_episode.pid == pid
- if @active_downloads.has_key? pid-1
+ if @active_episode.pid == pid-1
log.info("Package ID has been changed, I will correct this")
- @active_downloads[pid] = @active_downloads[pid-1]
- @active_downloads.delete(pid-1)
+ @active_episode.pid = pid
next
end
raise InvalidStateError,
"PackageID #{pid} can't be mapped to active Download"