#!/usr/bin/env python # # Copyright 2010 Google Inc. # # Permission is hereby granted, free of charge, to any person obtaining a # copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, dis- # tribute, sublicense, and/or sell copies of the Software, and to permit # persons to whom the Software is furnished to do so, subject to the fol- # lowing conditions: # # The above copyright notice and this permission notice shall be included # in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS # OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABIL- # ITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT # SHALL THE AUTHOR BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, # WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS # IN THE SOFTWARE. """ Tests of resumable downloads. """ import errno import os import re import boto from boto.s3.resumable_download_handler import get_cur_file_size from boto.s3.resumable_download_handler import ResumableDownloadHandler from boto.exception import ResumableTransferDisposition from boto.exception import ResumableDownloadException from cb_test_harness import CallbackTestHarness from tests.integration.gs.testcase import GSTestCase SMALL_KEY_SIZE = 2 * 1024 # 2 KB. LARGE_KEY_SIZE = 500 * 1024 # 500 KB. class ResumableDownloadTests(GSTestCase): """Resumable download test suite.""" def make_small_key(self): small_src_key_as_string = os.urandom(SMALL_KEY_SIZE) small_src_key = self._MakeKey(data=small_src_key_as_string) return small_src_key_as_string, small_src_key def make_tracker_file(self, tmpdir=None): if not tmpdir: tmpdir = self._MakeTempDir() tracker_file = os.path.join(tmpdir, 'tracker') return tracker_file def make_dst_fp(self, tmpdir=None): if not tmpdir: tmpdir = self._MakeTempDir() dst_file = os.path.join(tmpdir, 'dstfile') return open(dst_file, 'w') def test_non_resumable_download(self): """ Tests that non-resumable downloads work """ dst_fp = self.make_dst_fp() small_src_key_as_string, small_src_key = self.make_small_key() small_src_key.get_contents_to_file(dst_fp) self.assertEqual(SMALL_KEY_SIZE, get_cur_file_size(dst_fp)) self.assertEqual(small_src_key_as_string, small_src_key.get_contents_as_string()) def test_download_without_persistent_tracker(self): """ Tests a single resumable download, with no tracker persistence """ res_download_handler = ResumableDownloadHandler() dst_fp = self.make_dst_fp() small_src_key_as_string, small_src_key = self.make_small_key() small_src_key.get_contents_to_file( dst_fp, res_download_handler=res_download_handler) self.assertEqual(SMALL_KEY_SIZE, get_cur_file_size(dst_fp)) self.assertEqual(small_src_key_as_string, small_src_key.get_contents_as_string()) def test_failed_download_with_persistent_tracker(self): """ Tests that failed resumable download leaves a correct tracker file """ harness = CallbackTestHarness() tmpdir = self._MakeTempDir() tracker_file_name = self.make_tracker_file(tmpdir) dst_fp = self.make_dst_fp(tmpdir) res_download_handler = ResumableDownloadHandler( tracker_file_name=tracker_file_name, num_retries=0) small_src_key_as_string, small_src_key = self.make_small_key() try: small_src_key.get_contents_to_file( dst_fp, cb=harness.call, res_download_handler=res_download_handler) self.fail('Did not get expected ResumableDownloadException') except ResumableDownloadException, e: # We'll get a ResumableDownloadException at this point because # of CallbackTestHarness (above). Check that the tracker file was # created correctly. self.assertEqual(e.disposition, ResumableTransferDisposition.ABORT_CUR_PROCESS) self.assertTrue(os.path.exists(tracker_file_name)) f = open(tracker_file_name) etag_line = f.readline() m = re.search(ResumableDownloadHandler.ETAG_REGEX, etag_line) f.close() self.assertTrue(m) def test_retryable_exception_recovery(self): """ Tests handling of a retryable exception """ # Test one of the RETRYABLE_EXCEPTIONS. exception = ResumableDownloadHandler.RETRYABLE_EXCEPTIONS[0] harness = CallbackTestHarness(exception=exception) res_download_handler = ResumableDownloadHandler(num_retries=1) dst_fp = self.make_dst_fp() small_src_key_as_string, small_src_key = self.make_small_key() small_src_key.get_contents_to_file( dst_fp, cb=harness.call, res_download_handler=res_download_handler) # Ensure downloaded object has correct content. self.assertEqual(SMALL_KEY_SIZE, get_cur_file_size(dst_fp)) self.assertEqual(small_src_key_as_string, small_src_key.get_contents_as_string()) def test_broken_pipe_recovery(self): """ Tests handling of a Broken Pipe (which interacts with an httplib bug) """ exception = IOError(errno.EPIPE, "Broken pipe") harness = CallbackTestHarness(exception=exception) res_download_handler = ResumableDownloadHandler(num_retries=1) dst_fp = self.make_dst_fp() small_src_key_as_string, small_src_key = self.make_small_key() small_src_key.get_contents_to_file( dst_fp, cb=harness.call, res_download_handler=res_download_handler) # Ensure downloaded object has correct content. self.assertEqual(SMALL_KEY_SIZE, get_cur_file_size(dst_fp)) self.assertEqual(small_src_key_as_string, small_src_key.get_contents_as_string()) def test_non_retryable_exception_handling(self): """ Tests resumable download that fails with a non-retryable exception """ harness = CallbackTestHarness( exception=OSError(errno.EACCES, 'Permission denied')) res_download_handler = ResumableDownloadHandler(num_retries=1) dst_fp = self.make_dst_fp() small_src_key_as_string, small_src_key = self.make_small_key() try: small_src_key.get_contents_to_file( dst_fp, cb=harness.call, res_download_handler=res_download_handler) self.fail('Did not get expected OSError') except OSError, e: # Ensure the error was re-raised. self.assertEqual(e.errno, 13) def test_failed_and_restarted_download_with_persistent_tracker(self): """ Tests resumable download that fails once and then completes, with tracker file """ harness = CallbackTestHarness() tmpdir = self._MakeTempDir() tracker_file_name = self.make_tracker_file(tmpdir) dst_fp = self.make_dst_fp(tmpdir) small_src_key_as_string, small_src_key = self.make_small_key() res_download_handler = ResumableDownloadHandler( tracker_file_name=tracker_file_name, num_retries=1) small_src_key.get_contents_to_file( dst_fp, cb=harness.call, res_download_handler=res_download_handler) # Ensure downloaded object has correct content. self.assertEqual(SMALL_KEY_SIZE, get_cur_file_size(dst_fp)) self.assertEqual(small_src_key_as_string, small_src_key.get_contents_as_string()) # Ensure tracker file deleted. self.assertFalse(os.path.exists(tracker_file_name)) def test_multiple_in_process_failures_then_succeed(self): """ Tests resumable download that fails twice in one process, then completes """ res_download_handler = ResumableDownloadHandler(num_retries=3) dst_fp = self.make_dst_fp() small_src_key_as_string, small_src_key = self.make_small_key() small_src_key.get_contents_to_file( dst_fp, res_download_handler=res_download_handler) # Ensure downloaded object has correct content. self.assertEqual(SMALL_KEY_SIZE, get_cur_file_size(dst_fp)) self.assertEqual(small_src_key_as_string, small_src_key.get_contents_as_string()) def test_multiple_in_process_failures_then_succeed_with_tracker_file(self): """ Tests resumable download that fails completely in one process, then when restarted completes, using a tracker file """ # Set up test harness that causes more failures than a single # ResumableDownloadHandler instance will handle, writing enough data # before the first failure that some of it survives that process run. harness = CallbackTestHarness( fail_after_n_bytes=LARGE_KEY_SIZE/2, num_times_to_fail=2) larger_src_key_as_string = os.urandom(LARGE_KEY_SIZE) larger_src_key = self._MakeKey(data=larger_src_key_as_string) tmpdir = self._MakeTempDir() tracker_file_name = self.make_tracker_file(tmpdir) dst_fp = self.make_dst_fp(tmpdir) res_download_handler = ResumableDownloadHandler( tracker_file_name=tracker_file_name, num_retries=0) try: larger_src_key.get_contents_to_file( dst_fp, cb=harness.call, res_download_handler=res_download_handler) self.fail('Did not get expected ResumableDownloadException') except ResumableDownloadException, e: self.assertEqual(e.disposition, ResumableTransferDisposition.ABORT_CUR_PROCESS) # Ensure a tracker file survived. self.assertTrue(os.path.exists(tracker_file_name)) # Try it one more time; this time should succeed. larger_src_key.get_contents_to_file( dst_fp, cb=harness.call, res_download_handler=res_download_handler) self.assertEqual(LARGE_KEY_SIZE, get_cur_file_size(dst_fp)) self.assertEqual(larger_src_key_as_string, larger_src_key.get_contents_as_string()) self.assertFalse(os.path.exists(tracker_file_name)) # Ensure some of the file was downloaded both before and after failure. self.assertTrue( len(harness.transferred_seq_before_first_failure) > 1 and len(harness.transferred_seq_after_first_failure) > 1) def test_download_with_inital_partial_download_before_failure(self): """ Tests resumable download that successfully downloads some content before it fails, then restarts and completes """ # Set up harness to fail download after several hundred KB so download # server will have saved something before we retry. harness = CallbackTestHarness( fail_after_n_bytes=LARGE_KEY_SIZE/2) larger_src_key_as_string = os.urandom(LARGE_KEY_SIZE) larger_src_key = self._MakeKey(data=larger_src_key_as_string) res_download_handler = ResumableDownloadHandler(num_retries=1) dst_fp = self.make_dst_fp() larger_src_key.get_contents_to_file( dst_fp, cb=harness.call, res_download_handler=res_download_handler) # Ensure downloaded object has correct content. self.assertEqual(LARGE_KEY_SIZE, get_cur_file_size(dst_fp)) self.assertEqual(larger_src_key_as_string, larger_src_key.get_contents_as_string()) # Ensure some of the file was downloaded both before and after failure. self.assertTrue( len(harness.transferred_seq_before_first_failure) > 1 and len(harness.transferred_seq_after_first_failure) > 1) def test_zero_length_object_download(self): """ Tests downloading a zero-length object (exercises boundary conditions). """ res_download_handler = ResumableDownloadHandler() dst_fp = self.make_dst_fp() k = self._MakeKey() k.get_contents_to_file(dst_fp, res_download_handler=res_download_handler) self.assertEqual(0, get_cur_file_size(dst_fp)) def test_download_with_invalid_tracker_etag(self): """ Tests resumable download with a tracker file containing an invalid etag """ tmp_dir = self._MakeTempDir() dst_fp = self.make_dst_fp(tmp_dir) small_src_key_as_string, small_src_key = self.make_small_key() invalid_etag_tracker_file_name = os.path.join(tmp_dir, 'invalid_etag_tracker') f = open(invalid_etag_tracker_file_name, 'w') f.write('3.14159\n') f.close() res_download_handler = ResumableDownloadHandler( tracker_file_name=invalid_etag_tracker_file_name) # An error should be printed about the invalid tracker, but then it # should run the update successfully. small_src_key.get_contents_to_file( dst_fp, res_download_handler=res_download_handler) self.assertEqual(SMALL_KEY_SIZE, get_cur_file_size(dst_fp)) self.assertEqual(small_src_key_as_string, small_src_key.get_contents_as_string()) def test_download_with_inconsistent_etag_in_tracker(self): """ Tests resumable download with an inconsistent etag in tracker file """ tmp_dir = self._MakeTempDir() dst_fp = self.make_dst_fp(tmp_dir) small_src_key_as_string, small_src_key = self.make_small_key() inconsistent_etag_tracker_file_name = os.path.join(tmp_dir, 'inconsistent_etag_tracker') f = open(inconsistent_etag_tracker_file_name, 'w') good_etag = small_src_key.etag.strip('"\'') new_val_as_list = [] for c in reversed(good_etag): new_val_as_list.append(c) f.write('%s\n' % ''.join(new_val_as_list)) f.close() res_download_handler = ResumableDownloadHandler( tracker_file_name=inconsistent_etag_tracker_file_name) # An error should be printed about the expired tracker, but then it # should run the update successfully. small_src_key.get_contents_to_file( dst_fp, res_download_handler=res_download_handler) self.assertEqual(SMALL_KEY_SIZE, get_cur_file_size(dst_fp)) self.assertEqual(small_src_key_as_string, small_src_key.get_contents_as_string()) def test_download_with_unwritable_tracker_file(self): """ Tests resumable download with an unwritable tracker file """ # Make dir where tracker_file lives temporarily unwritable. tmp_dir = self._MakeTempDir() tracker_file_name = os.path.join(tmp_dir, 'tracker') save_mod = os.stat(tmp_dir).st_mode try: os.chmod(tmp_dir, 0) res_download_handler = ResumableDownloadHandler( tracker_file_name=tracker_file_name) except ResumableDownloadException, e: self.assertEqual(e.disposition, ResumableTransferDisposition.ABORT) self.assertNotEqual( e.message.find('Couldn\'t write URI tracker file'), -1) finally: # Restore original protection of dir where tracker_file lives. os.chmod(tmp_dir, save_mod)