|
5 | 5 | import unittest
|
6 | 6 | import functools
|
7 | 7 | import contextlib
|
| 8 | +import os.path |
8 | 9 | from test import support
|
9 | 10 | from nntplib import NNTP, GroupInfo
|
10 | 11 | import nntplib
|
|
13 | 14 | import ssl
|
14 | 15 | except ImportError:
|
15 | 16 | ssl = None
|
| 17 | +try: |
| 18 | + import threading |
| 19 | +except ImportError: |
| 20 | + threading = None |
16 | 21 |
|
17 | 22 | TIMEOUT = 30
|
| 23 | +certfile = os.path.join(os.path.dirname(__file__), 'keycert3.pem') |
18 | 24 |
|
19 | 25 | # TODO:
|
20 | 26 | # - test the `file` arg to more commands
|
@@ -202,24 +208,6 @@ def _check_caps(caps):
|
202 | 208 | resp, caps = self.server.capabilities()
|
203 | 209 | _check_caps(caps)
|
204 | 210 |
|
205 |
| - @unittest.skipUnless(ssl, 'requires SSL support') |
206 |
| - def test_starttls(self): |
207 |
| - file = self.server.file |
208 |
| - sock = self.server.sock |
209 |
| - try: |
210 |
| - self.server.starttls() |
211 |
| - except nntplib.NNTPPermanentError: |
212 |
| - self.skipTest("STARTTLS not supported by server.") |
213 |
| - else: |
214 |
| - # Check that the socket and internal pseudo-file really were |
215 |
| - # changed. |
216 |
| - self.assertNotEqual(file, self.server.file) |
217 |
| - self.assertNotEqual(sock, self.server.sock) |
218 |
| - # Check that the new socket really is an SSL one |
219 |
| - self.assertIsInstance(self.server.sock, ssl.SSLSocket) |
220 |
| - # Check that trying starttls when it's already active fails. |
221 |
| - self.assertRaises(ValueError, self.server.starttls) |
222 |
| - |
223 | 211 | def test_zlogin(self):
|
224 | 212 | # This test must be the penultimate because further commands will be
|
225 | 213 | # refused.
|
@@ -1520,6 +1508,64 @@ class MockSslTests(MockSocketTests):
|
1520 | 1508 | def nntp_class(*pos, **kw):
|
1521 | 1509 | return nntplib.NNTP_SSL(*pos, ssl_context=bypass_context, **kw)
|
1522 | 1510 |
|
| 1511 | +@unittest.skipUnless(threading, 'requires multithreading') |
| 1512 | +class LocalServerTests(unittest.TestCase): |
| 1513 | + def setUp(self): |
| 1514 | + sock = socket.socket() |
| 1515 | + port = support.bind_port(sock) |
| 1516 | + sock.listen() |
| 1517 | + self.background = threading.Thread( |
| 1518 | + target=self.run_server, args=(sock,)) |
| 1519 | + self.background.start() |
| 1520 | + self.addCleanup(self.background.join) |
| 1521 | + |
| 1522 | + self.nntp = NNTP(support.HOST, port, usenetrc=False).__enter__() |
| 1523 | + self.addCleanup(self.nntp.__exit__, None, None, None) |
| 1524 | + |
| 1525 | + def run_server(self, sock): |
| 1526 | + # Could be generalized to handle more commands in separate methods |
| 1527 | + with sock: |
| 1528 | + [client, _] = sock.accept() |
| 1529 | + with contextlib.ExitStack() as cleanup: |
| 1530 | + cleanup.enter_context(client) |
| 1531 | + reader = cleanup.enter_context(client.makefile('rb')) |
| 1532 | + client.sendall(b'200 Server ready\r\n') |
| 1533 | + while True: |
| 1534 | + cmd = reader.readline() |
| 1535 | + if cmd == b'CAPABILITIES\r\n': |
| 1536 | + client.sendall( |
| 1537 | + b'101 Capability list:\r\n' |
| 1538 | + b'VERSION 2\r\n' |
| 1539 | + b'STARTTLS\r\n' |
| 1540 | + b'.\r\n' |
| 1541 | + ) |
| 1542 | + elif cmd == b'STARTTLS\r\n': |
| 1543 | + reader.close() |
| 1544 | + client.sendall(b'382 Begin TLS negotiation now\r\n') |
| 1545 | + client = ssl.wrap_socket( |
| 1546 | + client, server_side=True, certfile=certfile) |
| 1547 | + cleanup.enter_context(client) |
| 1548 | + reader = cleanup.enter_context(client.makefile('rb')) |
| 1549 | + elif cmd == b'QUIT\r\n': |
| 1550 | + client.sendall(b'205 Bye!\r\n') |
| 1551 | + break |
| 1552 | + else: |
| 1553 | + raise ValueError('Unexpected command {!r}'.format(cmd)) |
| 1554 | + |
| 1555 | + @unittest.skipUnless(ssl, 'requires SSL support') |
| 1556 | + def test_starttls(self): |
| 1557 | + file = self.nntp.file |
| 1558 | + sock = self.nntp.sock |
| 1559 | + self.nntp.starttls() |
| 1560 | + # Check that the socket and internal pseudo-file really were |
| 1561 | + # changed. |
| 1562 | + self.assertNotEqual(file, self.nntp.file) |
| 1563 | + self.assertNotEqual(sock, self.nntp.sock) |
| 1564 | + # Check that the new socket really is an SSL one |
| 1565 | + self.assertIsInstance(self.nntp.sock, ssl.SSLSocket) |
| 1566 | + # Check that trying starttls when it's already active fails. |
| 1567 | + self.assertRaises(ValueError, self.nntp.starttls) |
| 1568 | + |
1523 | 1569 |
|
1524 | 1570 | if __name__ == "__main__":
|
1525 | 1571 | unittest.main()
|
0 commit comments