|  | 
| 1 |  | -import mimetypes | 
| 2 |  | -import socket | 
| 3 | 1 | import unittest | 
|  | 2 | +import subprocess | 
|  | 3 | +import http.client | 
|  | 4 | +import os | 
|  | 5 | + | 
|  | 6 | + | 
|  | 7 | +class WebTestCase(unittest.TestCase): | 
|  | 8 | +    """tests for the echo server and client""" | 
|  | 9 | + | 
|  | 10 | +    def setUp(self): | 
|  | 11 | +        self.server_process = subprocess.Popen( | 
|  | 12 | +            [ | 
|  | 13 | +                "python", | 
|  | 14 | +                "http_server.py" | 
|  | 15 | +            ], | 
|  | 16 | +            stdout=subprocess.PIPE, | 
|  | 17 | +            stderr=subprocess.PIPE, | 
|  | 18 | +        ) | 
|  | 19 | + | 
|  | 20 | +    def tearDown(self): | 
|  | 21 | +        self.server_process.kill() | 
|  | 22 | +        self.server_process.communicate() | 
|  | 23 | + | 
|  | 24 | +    def get_response(self, url): | 
|  | 25 | +        """ | 
|  | 26 | +        Helper function to get a response from a given url, using http.client | 
|  | 27 | +        """ | 
| 4 | 28 | 
 | 
|  | 29 | +        conn = http.client.HTTPConnection('localhost:10000') | 
|  | 30 | +        conn.request('GET', url) | 
|  | 31 | + | 
|  | 32 | +        response = conn.getresponse() | 
|  | 33 | + | 
|  | 34 | +        conn.close() | 
| 5 | 35 | 
 | 
| 6 |  | -CRLF = '\r\n' | 
| 7 |  | -CRLF_BYTES = CRLF.encode('utf8') | 
| 8 |  | -KNOWN_TYPES = set( | 
| 9 |  | -    map(lambda x: x.encode('utf8'), mimetypes.types_map.values()) | 
| 10 |  | -) | 
| 11 |  | - | 
| 12 |  | - | 
| 13 |  | -def extract_response_code(response): | 
| 14 |  | -    return response.split(CRLF_BYTES, 1)[0].split(b' ', 1)[1].strip() | 
| 15 |  | - | 
| 16 |  | - | 
| 17 |  | -def extract_response_protocol(response): | 
| 18 |  | -    return response.split(CRLF_BYTES, 1)[0].split(b' ', 1)[0].strip() | 
| 19 |  | - | 
| 20 |  | - | 
| 21 |  | -def extract_headers(response): | 
| 22 |  | -    return response.split(CRLF_BYTES*2, 1)[0].split(CRLF_BYTES)[1:] | 
| 23 |  | - | 
| 24 |  | - | 
| 25 |  | -class ResponseOkTestCase(unittest.TestCase): | 
| 26 |  | -    """unit tests for the response_ok method in our server | 
| 27 |  | -    Becase this is a unit test case, it does not require the server to be | 
| 28 |  | -    running. | 
| 29 |  | -    """ | 
| 30 |  | - | 
| 31 |  | -    def call_function_under_test(self): | 
| 32 |  | -        """call the `response_ok` function from our http_server module""" | 
| 33 |  | -        from http_server import response_ok | 
| 34 |  | -        return response_ok() | 
| 35 |  | - | 
| 36 |  | -    def test_response_code(self): | 
| 37 |  | -        ok = self.call_function_under_test() | 
| 38 |  | -        expected = "200 OK" | 
| 39 |  | -        actual = extract_response_code(ok) | 
| 40 |  | -        self.assertEqual(expected.encode('utf8'), actual) | 
| 41 |  | - | 
| 42 |  | -    def test_response_protocol(self): | 
| 43 |  | -        ok = self.call_function_under_test() | 
| 44 |  | -        expected = 'HTTP/1.1' | 
| 45 |  | -        actual = extract_response_protocol(ok) | 
| 46 |  | -        self.assertEqual(expected.encode('utf8'), actual) | 
| 47 |  | - | 
| 48 |  | -    def test_response_has_content_type_header(self): | 
| 49 |  | -        ok = self.call_function_under_test() | 
| 50 |  | -        headers = extract_headers(ok) | 
| 51 |  | -        expected_name = 'content-type'.encode('utf8') | 
| 52 |  | -        has_header = False | 
| 53 |  | -        for header in headers: | 
| 54 |  | -            name, value = header.split(b':') | 
| 55 |  | -            actual_name = name.strip().lower() | 
| 56 |  | -            if actual_name == expected_name: | 
| 57 |  | -                has_header = True | 
| 58 |  | -                break | 
| 59 |  | -        self.assertTrue(has_header) | 
| 60 |  | - | 
| 61 |  | -    def test_response_has_legitimate_content_type(self): | 
| 62 |  | -        ok = self.call_function_under_test() | 
| 63 |  | -        headers = extract_headers(ok) | 
| 64 |  | -        expected_name = 'content-type'.encode('utf8') | 
| 65 |  | -        for header in headers: | 
| 66 |  | -            name, value = header.split(b':') | 
| 67 |  | -            actual_name = name.strip().lower() | 
| 68 |  | -            if actual_name == expected_name: | 
| 69 |  | -                self.assertTrue(value.strip() in KNOWN_TYPES) | 
| 70 |  | -                return | 
| 71 |  | -        self.fail('no content type header found') | 
| 72 |  | - | 
| 73 |  | - | 
| 74 |  | -class ResponseMethodNotAllowedTestCase(unittest.TestCase): | 
| 75 |  | -    """unit tests for the response_method_not_allowed function""" | 
| 76 |  | - | 
| 77 |  | -    def call_function_under_test(self): | 
| 78 |  | -        """call the `response_method_not_allowed` function""" | 
| 79 |  | -        from http_server import response_method_not_allowed | 
| 80 |  | -        return response_method_not_allowed() | 
| 81 |  | - | 
| 82 |  | -    def test_response_code(self): | 
| 83 |  | -        resp = self.call_function_under_test() | 
| 84 |  | -        expected = "405 Method Not Allowed" | 
| 85 |  | -        actual = extract_response_code(resp) | 
| 86 |  | -        self.assertEqual(expected.encode('utf8'), actual) | 
| 87 |  | - | 
| 88 |  | -    def test_response_method(self): | 
| 89 |  | -        resp = self.call_function_under_test() | 
| 90 |  | -        expected = 'HTTP/1.1' | 
| 91 |  | -        actual = extract_response_protocol(resp) | 
| 92 |  | -        self.assertEqual(expected.encode('utf8'), actual) | 
| 93 |  | - | 
| 94 |  | - | 
| 95 |  | -class ParseRequestTestCase(unittest.TestCase): | 
| 96 |  | -    """unit tests for the parse_request method""" | 
| 97 |  | - | 
| 98 |  | -    def call_function_under_test(self, request): | 
| 99 |  | -        """call the `parse_request` function""" | 
| 100 |  | -        from http_server import parse_request | 
| 101 |  | -        return parse_request(request) | 
| 102 |  | - | 
| 103 |  | -    def test_get_method(self): | 
| 104 |  | -        """verify that GET HTTP requests do not raise an error""" | 
| 105 |  | -        request = "GET / HTTP/1.1\r\nHost: example.com\r\n\r\n" | 
| 106 |  | -        try: | 
| 107 |  | -            self.call_function_under_test(request) | 
| 108 |  | -        except (NotImplementedError, Exception) as e: | 
| 109 |  | -            self.fail('GET method raises an error {0}'.format(str(e))) | 
| 110 |  | - | 
| 111 |  | -    def test_bad_http_methods(self): | 
| 112 |  | -        """verify that non-GET HTTP methods raise a NotImplementedError""" | 
| 113 |  | -        methods = ['POST', 'PUT', 'DELETE', 'HEAD'] | 
| 114 |  | -        request_template = "{0} / HTTP/1.1\r\nHost: example.com\r\n\r\n" | 
| 115 |  | -        for method in methods: | 
| 116 |  | -            request = request_template.format(method) | 
| 117 |  | -            self.assertRaises( | 
| 118 |  | -                NotImplementedError, self.call_function_under_test, request | 
| 119 |  | -            ) | 
| 120 |  | - | 
| 121 |  | - | 
| 122 |  | -class HTTPServerFunctionalTestCase(unittest.TestCase): | 
| 123 |  | -    """functional tests of the HTTP Server | 
| 124 |  | -    This test case interacts with the http server, and as such requires it to | 
| 125 |  | -    be running in order for the tests to pass | 
| 126 |  | -    """ | 
| 127 |  | - | 
| 128 |  | -    def send_message(self, message): | 
| 129 |  | -        """Attempt to send a message using the client and the test buffer | 
| 130 |  | -        In case of a socket error, fail and report the problem | 
| 131 |  | -        """ | 
| 132 |  | -        from simple_client import client | 
| 133 |  | -        response = '' | 
| 134 |  | -        try: | 
| 135 |  | -            response = client(message) | 
| 136 |  | -        except socket.error as e: | 
| 137 |  | -            if e.errno == 61: | 
| 138 |  | -                msg = "Error: {0}, is the server running?" | 
| 139 |  | -                self.fail(msg.format(e.strerror)) | 
| 140 |  | -            else: | 
| 141 |  | -                self.fail("Unexpected Error: {0}".format(str(e))) | 
| 142 | 36 |         return response | 
| 143 | 37 | 
 | 
| 144 |  | -    def test_get_request(self): | 
| 145 |  | -        message = CRLF.join(['GET / HTTP/1.1', 'Host: example.com', '']) | 
| 146 |  | -        expected = '200 OK' | 
| 147 |  | -        actual = self.send_message(message) | 
| 148 |  | -        self.assertTrue( | 
| 149 |  | -            expected in actual, '"{0}" not in "{1}"'.format(expected, actual) | 
| 150 |  | -        ) | 
|  | 38 | +    def test_get_sample_text_content(self): | 
|  | 39 | +        """ | 
|  | 40 | +        A call to /sample.txt returns the correct body | 
|  | 41 | +        """ | 
|  | 42 | +        file = 'sample.txt' | 
|  | 43 | + | 
|  | 44 | +        local_path = os.path.join('webroot', *file.split('/')) | 
|  | 45 | +        web_path = '/' + file | 
|  | 46 | +        error_comment = "Error encountered while visiting " + web_path | 
|  | 47 | + | 
|  | 48 | +        response = self.get_response(web_path) | 
|  | 49 | + | 
|  | 50 | +        self.assertEqual(response.getcode(), 200, error_comment) | 
|  | 51 | + | 
|  | 52 | +        with open(local_path, 'rb') as f: | 
|  | 53 | +            self.assertEqual(f.read(), response.read(), error_comment) | 
|  | 54 | + | 
|  | 55 | +    def test_get_sample_text_mime_type(self): | 
|  | 56 | +        """ | 
|  | 57 | +        A call to /sample.txt returns the correct mimetype | 
|  | 58 | +        """ | 
|  | 59 | +        file = 'sample.txt' | 
|  | 60 | + | 
|  | 61 | +        web_path = '/' + file | 
|  | 62 | +        error_comment = "Error encountered while visiting " + web_path | 
|  | 63 | + | 
|  | 64 | +        response = self.get_response(web_path) | 
|  | 65 | + | 
|  | 66 | +        self.assertEqual(response.getcode(), 200, error_comment) | 
|  | 67 | +        self.assertEqual(response.getheader('Content-Type'), 'text/plain', error_comment) | 
|  | 68 | + | 
|  | 69 | +    def test_get_sample_scene_balls_jpeg(self): | 
|  | 70 | +        """ | 
|  | 71 | +        A call to /images/Sample_Scene_Balls.jpg returns the correct body | 
|  | 72 | +        """ | 
|  | 73 | +        file = 'images/Sample_Scene_Balls.jpg' | 
|  | 74 | + | 
|  | 75 | +        local_path = os.path.join('webroot', *file.split('/')) | 
|  | 76 | +        web_path = '/' + file | 
|  | 77 | +        error_comment = "Error encountered while visiting " + web_path | 
|  | 78 | + | 
|  | 79 | +        response = self.get_response(web_path) | 
|  | 80 | + | 
|  | 81 | +        self.assertEqual(response.getcode(), 200, error_comment) | 
|  | 82 | + | 
|  | 83 | +        with open(local_path, 'rb') as f: | 
|  | 84 | +            self.assertEqual(f.read(), response.read(), error_comment) | 
|  | 85 | + | 
|  | 86 | +    def test_get_sample_scene_balls_jpeg_mime_type(self): | 
|  | 87 | +        """ | 
|  | 88 | +        A call to /images/Sample_Scene_Balls.jpg returns the correct mimetype | 
|  | 89 | +        """ | 
|  | 90 | +        file = 'images/Sample_Scene_Balls.jpg' | 
|  | 91 | + | 
|  | 92 | +        web_path = '/' + file | 
|  | 93 | +        error_comment = "Error encountered while visiting " + web_path | 
|  | 94 | + | 
|  | 95 | +        response = self.get_response(web_path) | 
|  | 96 | + | 
|  | 97 | +        self.assertEqual(response.getcode(), 200, error_comment) | 
|  | 98 | +        self.assertEqual(response.getheader('Content-Type'), 'image/jpeg', error_comment) | 
|  | 99 | + | 
|  | 100 | +    def test_get_sample_1_png(self): | 
|  | 101 | +        """ | 
|  | 102 | +        A call to /images/sample_1.png returns the correct body | 
|  | 103 | +        """ | 
|  | 104 | +        file = 'images/sample_1.png' | 
|  | 105 | + | 
|  | 106 | +        local_path = os.path.join('webroot', *file.split('/')) | 
|  | 107 | +        web_path = '/' + file | 
|  | 108 | +        error_comment = "Error encountered while visiting " + web_path | 
|  | 109 | + | 
|  | 110 | +        response = self.get_response(web_path) | 
|  | 111 | + | 
|  | 112 | +        self.assertEqual(response.getcode(), 200, error_comment) | 
|  | 113 | + | 
|  | 114 | +        with open(local_path, 'rb') as f: | 
|  | 115 | +            self.assertEqual(f.read(), response.read(), error_comment) | 
|  | 116 | + | 
|  | 117 | +    def test_get_sample_1_png_mime_type(self): | 
|  | 118 | +        """ | 
|  | 119 | +        A call to /images/sample_1.png returns the correct mimetype | 
|  | 120 | +        """ | 
|  | 121 | +        file = 'images/sample_1.png' | 
|  | 122 | + | 
|  | 123 | +        web_path = '/' + file | 
|  | 124 | +        error_comment = "Error encountered while visiting " + web_path | 
|  | 125 | + | 
|  | 126 | +        response = self.get_response(web_path) | 
|  | 127 | + | 
|  | 128 | +        self.assertEqual(response.getcode(), 200, error_comment) | 
|  | 129 | +        self.assertEqual(response.getheader('Content-Type'), 'image/png', error_comment) | 
|  | 130 | + | 
|  | 131 | +    def test_get_404(self): | 
|  | 132 | +        """ | 
|  | 133 | +        A call to /asdf.txt (a file which does not exist in webroot) yields a 404 error | 
|  | 134 | +        """ | 
|  | 135 | +        file = 'asdf.txt' | 
|  | 136 | + | 
|  | 137 | +        web_path = '/' + file | 
|  | 138 | +        error_comment = "Error encountered while visiting " + web_path | 
|  | 139 | + | 
|  | 140 | +        response = self.get_response(web_path) | 
|  | 141 | + | 
|  | 142 | +        self.assertEqual(response.getcode(), 404, error_comment) | 
|  | 143 | + | 
|  | 144 | +    def test_images_index(self): | 
|  | 145 | +        """ | 
|  | 146 | +        A call to /images/ yields a list of files in the images directory | 
|  | 147 | +        """ | 
|  | 148 | + | 
|  | 149 | +        directory = 'images' | 
|  | 150 | +        local_path = os.path.join('webroot', directory) | 
|  | 151 | +        web_path = '/' + directory | 
|  | 152 | +        error_comment = "Error encountered while visiting " + web_path | 
|  | 153 | + | 
|  | 154 | +        response = self.get_response(web_path) | 
|  | 155 | +        body = response.read().decode() | 
|  | 156 | + | 
|  | 157 | +        for path in os.listdir(local_path): | 
|  | 158 | +            self.assertIn(path, body, error_comment) | 
|  | 159 | + | 
|  | 160 | +    def test_root_index(self): | 
|  | 161 | +        """ | 
|  | 162 | +        A call to / yields a list of files in the images directory | 
|  | 163 | +        """ | 
|  | 164 | + | 
|  | 165 | +        directory = '' | 
|  | 166 | +        local_path = os.path.join('webroot', directory) | 
|  | 167 | +        web_path = '/' + directory | 
|  | 168 | +        error_comment = "Error encountered while visiting " + web_path | 
|  | 169 | + | 
|  | 170 | +        response = self.get_response(web_path) | 
|  | 171 | +        body = response.read().decode() | 
|  | 172 | + | 
|  | 173 | +        for path in os.listdir(local_path): | 
|  | 174 | +            self.assertIn(path, body, error_comment) | 
| 151 | 175 | 
 | 
| 152 |  | -    def test_post_request(self): | 
| 153 |  | -        message = CRLF.join(['POST / HTTP/1.1', 'Host: example.com', '']) | 
| 154 |  | -        expected = '405 Method Not Allowed' | 
| 155 |  | -        actual = self.send_message(message) | 
| 156 |  | -        self.assertTrue( | 
| 157 |  | -            expected in actual, '"{0}" not in "{1}"'.format(expected, actual) | 
| 158 |  | -        ) | 
| 159 | 176 | 
 | 
| 160 | 177 | 
 | 
| 161 | 178 | if __name__ == '__main__': | 
|  | 
0 commit comments