lib/junkie/pyload/observer.rb in junkie-0.0.3 vs lib/junkie/pyload/observer.rb in junkie-0.0.4

- old
+ new

@@ -9,57 +9,91 @@ DEFAULT_CONFIG = { watchdog_refresh: 10, # interval the watchdog_timer is fired } - def initialize(config={}) + def initialize(channels) @config = Config.get_config(self) @api = Junkie::Pyload::Api.new + @channels = channels + + @found_episodes = Array.new + + @active_episode = nil + @ready_for_new_links = true @watchdog_enabled = false - @active_downloads = Hash.new - @skipped_timer_at_first_complete_detection = false + @should_send_it_on_channel = false + + @channels[:episodes].subscribe do |episode| + next unless episode.status == :found + + log.info("Got episode from Channel: #{episode}") + @found_episodes.push(episode) + end end + # is a method that is called once from inside the reactor and allows us # to register custom actions into the reactor def setup in_fiber { @api.login cleanup } EM.add_periodic_timer(@config[:watchdog_refresh]) do + + # temporary fix for the SystemStackError + if @should_send_it_on_channel + log.info("Sending complete episode out on the channel") + @channels[:episodes].push(@active_episode) + @active_episode = nil + + @should_send_it_on_channel = false + end + monitor_progress if @watchdog_enabled + + in_fiber { + add_next_episode_to_pyload + } end end - # Adds new episode to Pyload which downloads the episode and extracts it + + # Add next episode to Pyload which downloads the episode and extracts it # - # @param [Junkie::Episode] episode which should be downloaded - # # @note should only be called if `is_ready?` returns true - def add_episode(episode) - raise InvalidStateError, "Observer is not ready" unless is_ready? - @ready_for_new_links = false + def add_next_episode_to_pyload + # detect if we cann add a new episode + return unless is_ready? + return if @found_episodes.empty? + + @active_episode = @found_episodes.shift + episode = @active_episode + + @ready_for_new_links = false episode.status = :downloading package = "%s@%s" % [ episode.id, episode.series ] - pid = @api.call(:addPackage, {name: package, links: [episode.link]}) + pid = @api.call(:addPackage, + { name: package, links: [episode.link] }) - @active_downloads[pid] = episode + episode.pid = pid log.info("Added #{episode} to Pyload, Pid=#{pid}") (@watchdog_enabled = true) unless @watchdog_enabled end + # checks if the Observer is ready to add new episodes to Pyload # # @returns [Boolean] true if new links can be added def is_ready? @ready_for_new_links @@ -70,11 +104,10 @@ # This method is called from the watchdog timer periodically # # It monitors the download process and reacts depending on the results def monitor_progress log.debug("Watchdog timer has been fired") - in_fiber { catch(:break) { queue_data = @api.call(:getQueueData) if queue_data.empty? @@ -95,14 +128,26 @@ # look for complete downloads pids = get_complete_downloads(queue_data) if not pids.empty? if @skipped_timer_at_first_complete_detection + + # post process complete active download and send it out on the + # channel + if pids.include? @active_episode.pid + log.info("'#{@active_episode}' is extracted completely") + @active_episode.pid = nil + @active_episode.status = :extracted + + @should_send_it_on_channel = true + end + + # remove all complete packages @api.call(:deletePackages, {pids: pids}) log.info("Complete packages are removed from Pyload Queue") - @skipped_timer_at_first_complete_detection = false + @skipped_timer_at_first_complete_detection = false @ready_for_new_links = true else # If a package is complete, we are sometimes so fast to remove # it, before the extracting can be started, so we will skip it # the first time @@ -112,10 +157,11 @@ end } } end + # Searches for failed links in the pyload queue # # @param [Array] queue_data returned from :getQueueData api method # # @return [Boolean] true if there are any failed links, false otherwise @@ -128,10 +174,11 @@ end false end + # Looks for complete downloads and returns their pids # # @param [Array] queue_data returned from :getQueueData api method # # @return [Array] list of pids there the download is complete @@ -167,15 +214,14 @@ # # @param [Array] queue_data returned from :getQueueData api method def update_package_ids(queue_data) queue_data.each do |package| pid = package['pid'] - next if @active_downloads.has_key? pid + next if @active_episode.pid == pid - if @active_downloads.has_key? pid-1 + if @active_episode.pid == pid-1 log.info("Package ID has been changed, I will correct this") - @active_downloads[pid] = @active_downloads[pid-1] - @active_downloads.delete(pid-1) + @active_episode.pid = pid next end raise InvalidStateError, "PackageID #{pid} can't be mapped to active Download"