86
86
OperationalError ,
87
87
ProgrammingError ,
88
88
)
89
+ from .opentelemetry .constants import (
90
+ CONNECTION_SPAN_NAME ,
91
+ OPTION_CNX_SPAN ,
92
+ OPTION_CNX_TRACER ,
93
+ OTEL_ENABLED ,
94
+ )
95
+
96
+ if OTEL_ENABLED :
97
+ from .opentelemetry .instrumentation import (
98
+ end_span ,
99
+ record_exception_event ,
100
+ set_connection_span_attrs ,
101
+ trace ,
102
+ )
103
+
89
104
from .optionfiles import read_option_files
90
105
from .types import (
91
106
ConnAttrsType ,
92
107
DescriptionType ,
93
108
HandShakeType ,
94
- QueryAttrType ,
95
109
StrOrBytes ,
96
110
SupportedMysqlBinaryProtocolTypes ,
97
111
WarningType ,
@@ -142,6 +156,11 @@ class MySQLConnectionAbstract(ABC):
142
156
143
157
def __init__ (self ) -> None :
144
158
"""Initialize"""
159
+ # opentelemetry related
160
+ self ._tracer : Any = None
161
+ self ._span : Any = None
162
+ self .otel_context_propagation : bool = True
163
+
145
164
self ._client_flags : int = ClientFlag .get_default ()
146
165
self ._charset_id : int = 45
147
166
self ._sql_mode : Optional [str ] = None
@@ -187,7 +206,7 @@ def __init__(self) -> None:
187
206
]
188
207
189
208
self ._prepared_statements : Any = None
190
- self ._query_attrs : QueryAttrType = []
209
+ self ._query_attrs : Dict [ str , Any ] = {}
191
210
192
211
self ._ssl_active : bool = False
193
212
self ._auth_plugin : Optional [str ] = None
@@ -233,19 +252,32 @@ def have_next_result(self) -> bool:
233
252
return self ._have_next_result
234
253
235
254
@property
236
- def query_attrs (self ) -> QueryAttrType :
255
+ def query_attrs (self ) -> List [ Tuple [ str , Any ]] :
237
256
"""Return query attributes list."""
238
- return self ._query_attrs
257
+ return list ( self ._query_attrs . items ())
239
258
240
259
def query_attrs_append (
241
260
self , value : Tuple [str , SupportedMysqlBinaryProtocolTypes ]
242
261
) -> None :
243
- """Add element to the query attributes list."""
244
- self ._query_attrs .append (value )
262
+ """Add element to the query attributes list.
263
+
264
+ If an element in the query attributes list already matches
265
+ the attribute name provided, the new element will NOT be added.
266
+ """
267
+ attr_name , attr_value = value
268
+ if attr_name not in self ._query_attrs :
269
+ self ._query_attrs [attr_name ] = attr_value
270
+
271
+ def query_attrs_remove (self , name : str ) -> Any :
272
+ """Remove element by name from the query attributes list.
273
+
274
+ If no match, `None` is returned; else the corresponding value is returned.
275
+ """
276
+ return self ._query_attrs .pop (name , None )
245
277
246
278
def query_attrs_clear (self ) -> None :
247
279
"""Clear query attributes list."""
248
- del self ._query_attrs [:]
280
+ self ._query_attrs = {}
249
281
250
282
def _validate_tls_ciphersuites (self ) -> None :
251
283
"""Validates the tls_ciphersuites option."""
@@ -470,6 +502,10 @@ def config(self, **kwargs: Any) -> None:
470
502
471
503
Raises on errors.
472
504
"""
505
+ # opentelemetry related
506
+ self ._span = kwargs .pop (OPTION_CNX_SPAN , None )
507
+ self ._tracer = kwargs .pop (OPTION_CNX_TRACER , None )
508
+
473
509
config = kwargs .copy ()
474
510
if "dsn" in config :
475
511
raise NotSupportedError ("Data source name is not supported" )
@@ -1198,22 +1234,40 @@ def reconnect(self, attempts: int = 1, delay: int = 0) -> None:
1198
1234
Raises InterfaceError on errors.
1199
1235
"""
1200
1236
counter = 0
1201
- while counter != attempts :
1202
- counter = counter + 1
1203
- try :
1204
- self .disconnect ()
1205
- self .connect ()
1206
- if self .is_connected ():
1207
- break
1208
- except (Error , IOError ) as err :
1209
- if counter == attempts :
1210
- msg = (
1211
- f"Can not reconnect to MySQL after { attempts } "
1212
- f"attempt(s): { err } "
1213
- )
1214
- raise InterfaceError (msg ) from err
1215
- if delay > 0 :
1216
- sleep (delay )
1237
+ span = None
1238
+
1239
+ if self ._tracer :
1240
+ span = self ._tracer .start_span (
1241
+ name = CONNECTION_SPAN_NAME , kind = trace .SpanKind .CLIENT
1242
+ )
1243
+
1244
+ try :
1245
+ while counter != attempts :
1246
+ counter = counter + 1
1247
+ try :
1248
+ self .disconnect ()
1249
+ self .connect ()
1250
+ if self .is_connected ():
1251
+ break
1252
+ except (Error , IOError ) as err :
1253
+ if counter == attempts :
1254
+ msg = (
1255
+ f"Can not reconnect to MySQL after { attempts } "
1256
+ f"attempt(s): { err } "
1257
+ )
1258
+ raise InterfaceError (msg ) from err
1259
+ if delay > 0 :
1260
+ sleep (delay )
1261
+ except InterfaceError as interface_err :
1262
+ if OTEL_ENABLED :
1263
+ set_connection_span_attrs (self , span )
1264
+ record_exception_event (span , interface_err )
1265
+ end_span (span )
1266
+ raise
1267
+
1268
+ self ._span = span
1269
+ if OTEL_ENABLED :
1270
+ set_connection_span_attrs (self , self ._span )
1217
1271
1218
1272
@abstractmethod
1219
1273
def is_connected (self ) -> Any :
@@ -1553,7 +1607,7 @@ def close(self) -> Any:
1553
1607
@abstractmethod
1554
1608
def execute (
1555
1609
self ,
1556
- operation : Any ,
1610
+ operation : str ,
1557
1611
params : Union [Sequence [Any ], Dict [str , Any ]] = (),
1558
1612
multi : bool = False ,
1559
1613
) -> Any :
@@ -1577,7 +1631,7 @@ def execute(
1577
1631
1578
1632
@abstractmethod
1579
1633
def executemany (
1580
- self , operation : Any , seq_params : Sequence [Union [Sequence [Any ], Dict [str , Any ]]]
1634
+ self , operation : str , seq_params : Sequence [Union [Sequence [Any ], Dict [str , Any ]]]
1581
1635
) -> Any :
1582
1636
"""Execute the given operation multiple times
1583
1637
@@ -1710,7 +1764,7 @@ def fetchwarnings(self) -> Optional[List[WarningType]]:
1710
1764
"""Returns Warnings."""
1711
1765
return self ._warnings
1712
1766
1713
- def get_attributes (self ) -> Optional [List [Tuple [Any , Any ]]]:
1767
+ def get_attributes (self ) -> Optional [List [Tuple [str , Any ]]]:
1714
1768
"""Get the added query attributes so far."""
1715
1769
if hasattr (self , "_cnx" ):
1716
1770
return self ._cnx .query_attrs
@@ -1731,6 +1785,19 @@ def add_attribute(self, name: str, value: Any) -> None:
1731
1785
elif hasattr (self , "_connection" ):
1732
1786
self ._connection .query_attrs_append ((name , value ))
1733
1787
1788
+ def remove_attribute (self , name : str ) -> Any :
1789
+ """Remove a query attribute by name.
1790
+
1791
+ If no match, `None` is returned; else the corresponding value is returned.
1792
+ """
1793
+ if not isinstance (name , str ):
1794
+ raise ProgrammingError ("Parameter `name` must be a string type" )
1795
+ if hasattr (self , "_cnx" ):
1796
+ return self ._cnx .query_attrs_remove (name )
1797
+ if hasattr (self , "_connection" ):
1798
+ return self ._connection .query_attrs_remove (name )
1799
+ return None
1800
+
1734
1801
def clear_attributes (self ) -> None :
1735
1802
"""Remove all the query attributes."""
1736
1803
if hasattr (self , "_cnx" ):
0 commit comments