diff --git a/pymongo/asynchronous/pool.py b/pymongo/asynchronous/pool.py index f4d5b174fa..9a39883fc2 100644 --- a/pymongo/asynchronous/pool.py +++ b/pymongo/asynchronous/pool.py @@ -131,6 +131,7 @@ class AsyncConnection: :param pool: a Pool instance :param address: the server's (host, port) :param id: the id of this socket in it's pool + :param is_sdam: SDAM connections do not call hello on creation """ def __init__( @@ -139,11 +140,13 @@ def __init__( pool: Pool, address: tuple[str, int], id: int, + is_sdam: bool, ): self.pool_ref = weakref.ref(pool) self.conn = conn self.address = address self.id = id + self.is_sdam = is_sdam self.closed = False self.last_checkin_time = time.monotonic() self.performed_handshake = False @@ -711,13 +714,13 @@ def __init__( self, address: _Address, options: PoolOptions, - handshake: bool = True, + is_sdam: bool = False, client_id: Optional[ObjectId] = None, ): """ :param address: a (hostname, port) tuple :param options: a PoolOptions instance - :param handshake: whether to call hello for each new AsyncConnection + :param is_sdam: whether to call hello for each new AsyncConnection """ if options.pause_enabled: self.state = PoolState.PAUSED @@ -746,14 +749,14 @@ def __init__( self.pid = os.getpid() self.address = address self.opts = options - self.handshake = handshake + self.is_sdam = is_sdam # Don't publish events or logs in Monitor pools. self.enabled_for_cmap = ( - self.handshake + not self.is_sdam and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) - self.enabled_for_logging = self.handshake + self.enabled_for_logging = not self.is_sdam # The first portion of the wait queue. # Enforces: maxPoolSize @@ -1058,14 +1061,14 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A raise - conn = AsyncConnection(networking_interface, self, self.address, conn_id) # type: ignore[arg-type] + conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] async with self.lock: self.active_contexts.add(conn.cancel_context) self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() try: - if self.handshake: + if not self.is_sdam: await conn.hello() self.is_writable = conn.is_writable if handler: diff --git a/pymongo/asynchronous/topology.py b/pymongo/asynchronous/topology.py index 99b30fed1e..052f91afee 100644 --- a/pymongo/asynchronous/topology.py +++ b/pymongo/asynchronous/topology.py @@ -985,7 +985,7 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool: ) return self._settings.pool_class( - address, monitor_pool_options, handshake=False, client_id=self._topology_id + address, monitor_pool_options, is_sdam=True, client_id=self._topology_id ) def _error_message(self, selector: Callable[[Selection], Selection]) -> str: diff --git a/pymongo/network_layer.py b/pymongo/network_layer.py index 3fa180bf7a..6f1bb9a357 100644 --- a/pymongo/network_layer.py +++ b/pymongo/network_layer.py @@ -357,7 +357,12 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me except socket.timeout: if conn.cancel_context.cancelled: raise _OperationCancelled("operation cancelled") from None - if _PYPY: + if ( + _PYPY + or not conn.is_sdam + and deadline is not None + and deadline - time.monotonic() < 0 + ): # We reached the true deadline. raise continue diff --git a/pymongo/synchronous/pool.py b/pymongo/synchronous/pool.py index 44aec31a86..505f58c60f 100644 --- a/pymongo/synchronous/pool.py +++ b/pymongo/synchronous/pool.py @@ -131,6 +131,7 @@ class Connection: :param pool: a Pool instance :param address: the server's (host, port) :param id: the id of this socket in it's pool + :param is_sdam: SDAM connections do not call hello on creation """ def __init__( @@ -139,11 +140,13 @@ def __init__( pool: Pool, address: tuple[str, int], id: int, + is_sdam: bool, ): self.pool_ref = weakref.ref(pool) self.conn = conn self.address = address self.id = id + self.is_sdam = is_sdam self.closed = False self.last_checkin_time = time.monotonic() self.performed_handshake = False @@ -709,13 +712,13 @@ def __init__( self, address: _Address, options: PoolOptions, - handshake: bool = True, + is_sdam: bool = False, client_id: Optional[ObjectId] = None, ): """ :param address: a (hostname, port) tuple :param options: a PoolOptions instance - :param handshake: whether to call hello for each new Connection + :param is_sdam: whether to call hello for each new Connection """ if options.pause_enabled: self.state = PoolState.PAUSED @@ -744,14 +747,14 @@ def __init__( self.pid = os.getpid() self.address = address self.opts = options - self.handshake = handshake + self.is_sdam = is_sdam # Don't publish events or logs in Monitor pools. self.enabled_for_cmap = ( - self.handshake + not self.is_sdam and self.opts._event_listeners is not None and self.opts._event_listeners.enabled_for_cmap ) - self.enabled_for_logging = self.handshake + self.enabled_for_logging = not self.is_sdam # The first portion of the wait queue. # Enforces: maxPoolSize @@ -1054,14 +1057,14 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect raise - conn = Connection(networking_interface, self, self.address, conn_id) # type: ignore[arg-type] + conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type] with self.lock: self.active_contexts.add(conn.cancel_context) self.active_contexts.discard(tmp_context) if tmp_context.cancelled: conn.cancel_context.cancel() try: - if self.handshake: + if not self.is_sdam: conn.hello() self.is_writable = conn.is_writable if handler: diff --git a/pymongo/synchronous/topology.py b/pymongo/synchronous/topology.py index 10d41def6e..28370d4adc 100644 --- a/pymongo/synchronous/topology.py +++ b/pymongo/synchronous/topology.py @@ -983,7 +983,7 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool: ) return self._settings.pool_class( - address, monitor_pool_options, handshake=False, client_id=self._topology_id + address, monitor_pool_options, is_sdam=True, client_id=self._topology_id ) def _error_message(self, selector: Callable[[Selection], Selection]) -> str: diff --git a/test/asynchronous/utils.py b/test/asynchronous/utils.py index f653c575e9..ca80d1f6dd 100644 --- a/test/asynchronous/utils.py +++ b/test/asynchronous/utils.py @@ -159,6 +159,7 @@ def __init__(self): self.cancel_context = _CancellationContext() self.more_to_come = False self.id = random.randint(0, 100) + self.is_sdam = False self.server_connection_id = random.randint(0, 100) def close_conn(self, reason): @@ -172,7 +173,7 @@ def __aexit__(self, exc_type, exc_val, exc_tb): class AsyncMockPool: - def __init__(self, address, options, handshake=True, client_id=None): + def __init__(self, address, options, is_sdam=False, client_id=None): self.gen = _PoolGeneration() self._lock = _async_create_lock() self.opts = options diff --git a/test/test_topology.py b/test/test_topology.py index 22e94739ee..141b2d7f21 100644 --- a/test/test_topology.py +++ b/test/test_topology.py @@ -121,7 +121,7 @@ def test_timeout_configuration(self): self.assertEqual(1, monitor._pool.opts.socket_timeout) # The monitor, not its pool, is responsible for calling hello. - self.assertFalse(monitor._pool.handshake) + self.assertTrue(monitor._pool.is_sdam) class TestSingleServerTopology(TopologyTest): diff --git a/test/utils.py b/test/utils.py index 3027ed7517..25d95d1d3c 100644 --- a/test/utils.py +++ b/test/utils.py @@ -157,6 +157,7 @@ def __init__(self): self.cancel_context = _CancellationContext() self.more_to_come = False self.id = random.randint(0, 100) + self.is_sdam = False self.server_connection_id = random.randint(0, 100) def close_conn(self, reason): @@ -170,7 +171,7 @@ def __exit__(self, exc_type, exc_val, exc_tb): class MockPool: - def __init__(self, address, options, handshake=True, client_id=None): + def __init__(self, address, options, is_sdam=False, client_id=None): self.gen = _PoolGeneration() self._lock = _create_lock() self.opts = options