Skip to content

PYTHON-5369 - Re-raise socket.timeout errors if the deadline has alre… #2326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: master
Choose a base branch
from
17 changes: 10 additions & 7 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class AsyncConnection:
:param pool: a Pool instance
:param address: the server's (host, port)
:param id: the id of this socket in it's pool
:param is_sdam: SDAM connections do not call hello on creation
"""

def __init__(
Expand All @@ -139,11 +140,13 @@ def __init__(
pool: Pool,
address: tuple[str, int],
id: int,
is_sdam: bool,
):
self.pool_ref = weakref.ref(pool)
self.conn = conn
self.address = address
self.id = id
self.is_sdam = is_sdam
self.closed = False
self.last_checkin_time = time.monotonic()
self.performed_handshake = False
Expand Down Expand Up @@ -711,13 +714,13 @@ def __init__(
self,
address: _Address,
options: PoolOptions,
handshake: bool = True,
is_sdam: bool = False,
client_id: Optional[ObjectId] = None,
):
"""
:param address: a (hostname, port) tuple
:param options: a PoolOptions instance
:param handshake: whether to call hello for each new AsyncConnection
:param is_sdam: whether to call hello for each new AsyncConnection
"""
if options.pause_enabled:
self.state = PoolState.PAUSED
Expand Down Expand Up @@ -746,14 +749,14 @@ def __init__(
self.pid = os.getpid()
self.address = address
self.opts = options
self.handshake = handshake
self.is_sdam = is_sdam
# Don't publish events or logs in Monitor pools.
self.enabled_for_cmap = (
self.handshake
not self.is_sdam
and self.opts._event_listeners is not None
and self.opts._event_listeners.enabled_for_cmap
)
self.enabled_for_logging = self.handshake
self.enabled_for_logging = not self.is_sdam

# The first portion of the wait queue.
# Enforces: maxPoolSize
Expand Down Expand Up @@ -1058,14 +1061,14 @@ async def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> A

raise

conn = AsyncConnection(networking_interface, self, self.address, conn_id) # type: ignore[arg-type]
conn = AsyncConnection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
async with self.lock:
self.active_contexts.add(conn.cancel_context)
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
try:
if self.handshake:
if not self.is_sdam:
await conn.hello()
self.is_writable = conn.is_writable
if handler:
Expand Down
2 changes: 1 addition & 1 deletion pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -985,7 +985,7 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool:
)

return self._settings.pool_class(
address, monitor_pool_options, handshake=False, client_id=self._topology_id
address, monitor_pool_options, is_sdam=True, client_id=self._topology_id
)

def _error_message(self, selector: Callable[[Selection], Selection]) -> str:
Expand Down
7 changes: 6 additions & 1 deletion pymongo/network_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -357,7 +357,12 @@ def receive_data(conn: Connection, length: int, deadline: Optional[float]) -> me
except socket.timeout:
if conn.cancel_context.cancelled:
raise _OperationCancelled("operation cancelled") from None
if _PYPY:
if (
_PYPY
or not conn.is_sdam
and deadline is not None
and deadline - time.monotonic() < 0
):
Copy link
Member

@ShaneHarvey ShaneHarvey May 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we want sigstop/sigcont to work the same regardless of what env variables are defined. What if we take an alternative approach here and replace not _is_faas() with not conn.is_sdam? IE only do the extra non-blocking read on SDAM connections.

Will that still fix the flaky tests?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the flaky tests still fixed after the is_sdam change?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# We reached the true deadline.
raise
continue
Expand Down
17 changes: 10 additions & 7 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,7 @@ class Connection:
:param pool: a Pool instance
:param address: the server's (host, port)
:param id: the id of this socket in it's pool
:param is_sdam: SDAM connections do not call hello on creation
"""

def __init__(
Expand All @@ -139,11 +140,13 @@ def __init__(
pool: Pool,
address: tuple[str, int],
id: int,
is_sdam: bool,
):
self.pool_ref = weakref.ref(pool)
self.conn = conn
self.address = address
self.id = id
self.is_sdam = is_sdam
self.closed = False
self.last_checkin_time = time.monotonic()
self.performed_handshake = False
Expand Down Expand Up @@ -709,13 +712,13 @@ def __init__(
self,
address: _Address,
options: PoolOptions,
handshake: bool = True,
is_sdam: bool = False,
client_id: Optional[ObjectId] = None,
):
"""
:param address: a (hostname, port) tuple
:param options: a PoolOptions instance
:param handshake: whether to call hello for each new Connection
:param is_sdam: whether to call hello for each new Connection
"""
if options.pause_enabled:
self.state = PoolState.PAUSED
Expand Down Expand Up @@ -744,14 +747,14 @@ def __init__(
self.pid = os.getpid()
self.address = address
self.opts = options
self.handshake = handshake
self.is_sdam = is_sdam
# Don't publish events or logs in Monitor pools.
self.enabled_for_cmap = (
self.handshake
not self.is_sdam
and self.opts._event_listeners is not None
and self.opts._event_listeners.enabled_for_cmap
)
self.enabled_for_logging = self.handshake
self.enabled_for_logging = not self.is_sdam

# The first portion of the wait queue.
# Enforces: maxPoolSize
Expand Down Expand Up @@ -1054,14 +1057,14 @@ def connect(self, handler: Optional[_MongoClientErrorHandler] = None) -> Connect

raise

conn = Connection(networking_interface, self, self.address, conn_id) # type: ignore[arg-type]
conn = Connection(networking_interface, self, self.address, conn_id, self.is_sdam) # type: ignore[arg-type]
with self.lock:
self.active_contexts.add(conn.cancel_context)
self.active_contexts.discard(tmp_context)
if tmp_context.cancelled:
conn.cancel_context.cancel()
try:
if self.handshake:
if not self.is_sdam:
conn.hello()
self.is_writable = conn.is_writable
if handler:
Expand Down
2 changes: 1 addition & 1 deletion pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -983,7 +983,7 @@ def _create_pool_for_monitor(self, address: _Address) -> Pool:
)

return self._settings.pool_class(
address, monitor_pool_options, handshake=False, client_id=self._topology_id
address, monitor_pool_options, is_sdam=True, client_id=self._topology_id
)

def _error_message(self, selector: Callable[[Selection], Selection]) -> str:
Expand Down
3 changes: 2 additions & 1 deletion test/asynchronous/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,7 @@ def __init__(self):
self.cancel_context = _CancellationContext()
self.more_to_come = False
self.id = random.randint(0, 100)
self.is_sdam = False
self.server_connection_id = random.randint(0, 100)

def close_conn(self, reason):
Expand All @@ -172,7 +173,7 @@ def __aexit__(self, exc_type, exc_val, exc_tb):


class AsyncMockPool:
def __init__(self, address, options, handshake=True, client_id=None):
def __init__(self, address, options, is_sdam=False, client_id=None):
self.gen = _PoolGeneration()
self._lock = _async_create_lock()
self.opts = options
Expand Down
2 changes: 1 addition & 1 deletion test/test_topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def test_timeout_configuration(self):
self.assertEqual(1, monitor._pool.opts.socket_timeout)

# The monitor, not its pool, is responsible for calling hello.
self.assertFalse(monitor._pool.handshake)
self.assertTrue(monitor._pool.is_sdam)


class TestSingleServerTopology(TopologyTest):
Expand Down
3 changes: 2 additions & 1 deletion test/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,7 @@ def __init__(self):
self.cancel_context = _CancellationContext()
self.more_to_come = False
self.id = random.randint(0, 100)
self.is_sdam = False
self.server_connection_id = random.randint(0, 100)

def close_conn(self, reason):
Expand All @@ -170,7 +171,7 @@ def __exit__(self, exc_type, exc_val, exc_tb):


class MockPool:
def __init__(self, address, options, handshake=True, client_id=None):
def __init__(self, address, options, is_sdam=False, client_id=None):
self.gen = _PoolGeneration()
self._lock = _create_lock()
self.opts = options
Expand Down
Loading