Skip to content

Commit 6b9d0dc

Browse files
committed
WL#15523: Support Python DB API asynchronous execution
This worklog introduces the support for asynchronous execution in MySQL Connector/Python pure Python implementation following the Python Database API Specification. The asyncio implementation of MySQL Connector/Python allows for a non-blocking, asynchronous interaction with a MySQL server using a new package mysql.connector.aio. Usage example: from mysql.connector.aio import connect async with await connect(user="myuser", password="mypass") as cnx: async with await cnx.cursor() as cur: # Execute a non-blocking query await cur.execute("SELECT version()") # Retrieve the results of the query asynchronously results = await cur.fetchall() print(results) Change-Id: I35eea47fbeb3c172f4b98acfe5a891ed4476c198
1 parent 4e14e58 commit 6b9d0dc

27 files changed

+15768
-0
lines changed

CHANGES.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ v8.3.0
1515
- WL#15983: Stop using mysql_ssl_set api
1616
- WL#15982: Remove use of mysql_shutdown
1717
- WL#15942: Improve type hints and standardize byte type handling
18+
- WL#15523: Support Python DB API asynchronous execution
1819
- BUG#35912790: Binary strings are converted when using prepared statements
1920

2021
v8.2.0

lib/mysql/connector/aio/__init__.py

Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
# Copyright (c) 2023, Oracle and/or its affiliates.
2+
#
3+
# This program is free software; you can redistribute it and/or modify
4+
# it under the terms of the GNU General Public License, version 2.0, as
5+
# published by the Free Software Foundation.
6+
#
7+
# This program is also distributed with certain software (including
8+
# but not limited to OpenSSL) that is licensed under separate terms,
9+
# as designated in a particular file or component or in included license
10+
# documentation. The authors of MySQL hereby grant you an
11+
# additional permission to link the program and your derivative works
12+
# with the separately licensed software that they have included with
13+
# MySQL.
14+
#
15+
# Without limiting anything contained in the foregoing, this file,
16+
# which is part of MySQL Connector/Python, is also subject to the
17+
# Universal FOSS Exception, version 1.0, a copy of which can be found at
18+
# http://oss.oracle.com/licenses/universal-foss-exception.
19+
#
20+
# This program is distributed in the hope that it will be useful, but
21+
# WITHOUT ANY WARRANTY; without even the implied warranty of
22+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
23+
# See the GNU General Public License, version 2.0, for more details.
24+
#
25+
# You should have received a copy of the GNU General Public License
26+
# along with this program; if not, write to the Free Software Foundation, Inc.,
27+
# 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
28+
29+
"""MySQL Connector/Python - MySQL driver written in Python."""
30+
31+
__all__ = ["CMySQLConnection", "MySQLConnection", "connect"]
32+
33+
import random
34+
35+
from typing import Any
36+
37+
from ..constants import DEFAULT_CONFIGURATION
38+
from ..errors import Error, InterfaceError, ProgrammingError
39+
from ..pooling import ERROR_NO_CEXT
40+
from .abstracts import MySQLConnectionAbstract
41+
from .connection import MySQLConnection
42+
43+
try:
44+
import dns.exception
45+
import dns.resolver
46+
except ImportError:
47+
HAVE_DNSPYTHON = False
48+
else:
49+
HAVE_DNSPYTHON = True
50+
51+
52+
try:
53+
from .connection_cext import CMySQLConnection
54+
except ImportError:
55+
CMySQLConnection = None
56+
57+
58+
async def connect(*args: Any, **kwargs: Any) -> MySQLConnectionAbstract:
59+
"""Creates or gets a MySQL connection object.
60+
61+
In its simpliest form, `connect()` will open a connection to a
62+
MySQL server and return a `MySQLConnectionAbstract` subclass
63+
object such as `MySQLConnection` or `CMySQLConnection`.
64+
65+
When any connection pooling arguments are given, for example `pool_name`
66+
or `pool_size`, a pool is created or a previously one is used to return
67+
a `PooledMySQLConnection`.
68+
69+
Args:
70+
*args: N/A.
71+
**kwargs: For a complete list of possible arguments, see [1]. If no arguments
72+
are given, it uses the already configured or default values.
73+
74+
Returns:
75+
A `MySQLConnectionAbstract` subclass instance (such as `MySQLConnection` or
76+
a `CMySQLConnection`) instance.
77+
78+
Examples:
79+
A connection with the MySQL server can be established using either the
80+
`mysql.connector.connect()` method or a `MySQLConnectionAbstract` subclass:
81+
```
82+
>>> from mysql.connector.aio import MySQLConnection, HAVE_CEXT
83+
>>>
84+
>>> cnx1 = await mysql.connector.aio.connect(user='joe', database='test')
85+
>>> cnx2 = MySQLConnection(user='joe', database='test')
86+
>>> await cnx2.connect()
87+
>>>
88+
>>> cnx3 = None
89+
>>> if HAVE_CEXT:
90+
>>> from mysql.connector.aio import CMySQLConnection
91+
>>> cnx3 = CMySQLConnection(user='joe', database='test')
92+
```
93+
94+
References:
95+
[1]: https://dev.mysql.com/doc/connector-python/en/connector-python-connectargs.html
96+
"""
97+
# DNS SRV
98+
dns_srv = kwargs.pop("dns_srv") if "dns_srv" in kwargs else False
99+
100+
if not isinstance(dns_srv, bool):
101+
raise InterfaceError("The value of 'dns-srv' must be a boolean")
102+
103+
if dns_srv:
104+
if not HAVE_DNSPYTHON:
105+
raise InterfaceError(
106+
"MySQL host configuration requested DNS "
107+
"SRV. This requires the Python dnspython "
108+
"module. Please refer to documentation"
109+
)
110+
if "unix_socket" in kwargs:
111+
raise InterfaceError(
112+
"Using Unix domain sockets with DNS SRV lookup is not allowed"
113+
)
114+
if "port" in kwargs:
115+
raise InterfaceError(
116+
"Specifying a port number with DNS SRV lookup is not allowed"
117+
)
118+
if "failover" in kwargs:
119+
raise InterfaceError(
120+
"Specifying multiple hostnames with DNS SRV look up is not allowed"
121+
)
122+
if "host" not in kwargs:
123+
kwargs["host"] = DEFAULT_CONFIGURATION["host"]
124+
125+
try:
126+
srv_records = dns.resolver.query(kwargs["host"], "SRV")
127+
except dns.exception.DNSException:
128+
raise InterfaceError(
129+
f"Unable to locate any hosts for '{kwargs['host']}'"
130+
) from None
131+
132+
failover = []
133+
for srv in srv_records:
134+
failover.append(
135+
{
136+
"host": srv.target.to_text(omit_final_dot=True),
137+
"port": srv.port,
138+
"priority": srv.priority,
139+
"weight": srv.weight,
140+
}
141+
)
142+
143+
failover.sort(key=lambda x: (x["priority"], -x["weight"]))
144+
kwargs["failover"] = [
145+
{"host": srv["host"], "port": srv["port"]} for srv in failover
146+
]
147+
148+
# Failover
149+
if "failover" in kwargs:
150+
return await _get_failover_connection(**kwargs)
151+
152+
# Use C Extension by default
153+
use_pure = kwargs.get("use_pure", False)
154+
if "use_pure" in kwargs:
155+
del kwargs["use_pure"] # Remove 'use_pure' from kwargs
156+
if not use_pure and CMySQLConnection is None:
157+
raise ImportError(ERROR_NO_CEXT)
158+
159+
if CMySQLConnection and not use_pure:
160+
cnx = CMySQLConnection(*args, **kwargs)
161+
else:
162+
cnx = MySQLConnection(*args, **kwargs)
163+
await cnx.connect()
164+
return cnx
165+
166+
167+
async def _get_failover_connection(**kwargs: Any) -> MySQLConnectionAbstract:
168+
"""Return a MySQL connection and try to failover if needed.
169+
170+
An InterfaceError is raise when no MySQL is available. ValueError is
171+
raised when the failover server configuration contains an illegal
172+
connection argument. Supported arguments are user, password, host, port,
173+
unix_socket and database. ValueError is also raised when the failover
174+
argument was not provided.
175+
176+
Returns MySQLConnection instance.
177+
"""
178+
config = kwargs.copy()
179+
try:
180+
failover = config["failover"]
181+
except KeyError:
182+
raise ValueError("failover argument not provided") from None
183+
del config["failover"]
184+
185+
support_cnx_args = set(
186+
[
187+
"user",
188+
"password",
189+
"host",
190+
"port",
191+
"unix_socket",
192+
"database",
193+
"pool_name",
194+
"pool_size",
195+
"priority",
196+
]
197+
)
198+
199+
# First check if we can add all use the configuration
200+
priority_count = 0
201+
for server in failover:
202+
diff = set(server.keys()) - support_cnx_args
203+
if diff:
204+
arg = "s" if len(diff) > 1 else ""
205+
lst = ", ".join(diff)
206+
raise ValueError(
207+
f"Unsupported connection argument {arg} in failover: {lst}"
208+
)
209+
if hasattr(server, "priority"):
210+
priority_count += 1
211+
212+
server["priority"] = server.get("priority", 100)
213+
if server["priority"] < 0 or server["priority"] > 100:
214+
raise InterfaceError(
215+
"Priority value should be in the range of 0 to 100, "
216+
f"got : {server['priority']}"
217+
)
218+
if not isinstance(server["priority"], int):
219+
raise InterfaceError(
220+
"Priority value should be an integer in the range of 0 to "
221+
f"100, got : {server['priority']}"
222+
)
223+
224+
if 0 < priority_count < len(failover):
225+
raise ProgrammingError(
226+
"You must either assign no priority to any "
227+
"of the routers or give a priority for "
228+
"every router"
229+
)
230+
231+
server_directory = {}
232+
server_priority_list = []
233+
for server in sorted(failover, key=lambda x: x["priority"], reverse=True):
234+
if server["priority"] not in server_directory:
235+
server_directory[server["priority"]] = [server]
236+
server_priority_list.append(server["priority"])
237+
else:
238+
server_directory[server["priority"]].append(server)
239+
240+
for priority in server_priority_list:
241+
failover_list = server_directory[priority]
242+
for _ in range(len(failover_list)):
243+
last = len(failover_list) - 1
244+
index = random.randint(0, last)
245+
server = failover_list.pop(index)
246+
new_config = config.copy()
247+
new_config.update(server)
248+
new_config.pop("priority", None)
249+
try:
250+
return await connect(**new_config)
251+
except Error:
252+
# If we failed to connect, we try the next server
253+
pass
254+
255+
raise InterfaceError("Unable to connect to any of the target hosts")

0 commit comments

Comments
 (0)