Skip to content

Commit bcd5a31

Browse files
committed
WL12501: Connection compression
This worklog adds support for connection compression using the X Protocol. The supported compression algorithms are lz4_message and deflate_stream via third-party software python-lz4.
1 parent 89312ef commit bcd5a31

25 files changed

+1341
-659
lines changed

cpyint

lib/mysqlx/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@
6565
_SESS_OPTS = _SSL_OPTS + ["user", "password", "schema", "host", "port",
6666
"routers", "socket", "ssl-mode", "auth", "use-pure",
6767
"connect-timeout", "connection-attributes",
68-
"dns-srv"]
68+
"compression", "dns-srv"]
6969

7070
logging.getLogger(__name__).addHandler(logging.NullHandler())
7171

@@ -82,7 +82,7 @@
8282
TLS_VERSIONS = ["TLSv1", "TLSv1.1", "TLSv1.2", "TLSv1.3"]
8383

8484
TLS_V1_3_SUPPORTED = False
85-
if hasattr(ssl, "HAS_TLSv1_3") and ssl.HAS_TLSv1_3:
85+
if hasattr(ssl, "HAS_TLSv1_3") and ssl.HAS_TLSv1_3:
8686
TLS_V1_3_SUPPORTED = True
8787

8888

@@ -256,6 +256,15 @@ def _validate_settings(settings):
256256
except (AttributeError, ValueError):
257257
raise InterfaceError("Invalid Auth '{0}'".format(settings["auth"]))
258258

259+
if "compression" in settings:
260+
compression = settings["compression"].lower().strip()
261+
if compression not in ("preferred", "required", "disabled"):
262+
raise InterfaceError(
263+
"The connection property 'compression' acceptable values are: "
264+
"'preferred', 'required', or 'disabled'. The value '{0}' is "
265+
"not acceptable".format(settings["compression"]))
266+
settings["compression"] = compression
267+
259268
if "connection-attributes" in settings:
260269
validate_connection_attributes(settings)
261270

lib/mysqlx/connection.py

Lines changed: 95 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@
7878
from .crud import Schema
7979
from .constants import SSLMode, Auth
8080
from .helpers import escape, get_item_or_attr, iani_to_openssl_cs_name
81-
from .protocol import Protocol, MessageReaderWriter
81+
from .protocol import Protocol, MessageReader, MessageWriter, HAVE_LZ4
8282
from .result import Result, RowResult, SqlResult, DocResult
8383
from .statement import SqlStatement, AddStatement, quote_identifier
8484
from .protobuf import Protobuf
@@ -558,7 +558,6 @@ class Connection(object):
558558
def __init__(self, settings):
559559
self.settings = settings
560560
self.stream = SocketStream()
561-
self.reader_writer = None
562561
self.protocol = None
563562
self.keep_open = None
564563
self._user = settings.get("user")
@@ -617,10 +616,31 @@ def connect(self):
617616
router = self.router_manager.get_next_router()
618617
self.stream.connect(router.get_connection_params(),
619618
self._connect_timeout)
620-
self.reader_writer = MessageReaderWriter(self.stream)
621-
self.protocol = Protocol(self.reader_writer)
622-
self._handle_capabilities()
619+
reader = MessageReader(self.stream)
620+
writer = MessageWriter(self.stream)
621+
self.protocol = Protocol(reader, writer)
622+
623+
caps_data = self.protocol.get_capabilites().capabilities
624+
caps = {
625+
get_item_or_attr(cap, "name").lower():
626+
cap for cap in caps_data
627+
} if caps_data else {}
628+
629+
# Set TLS capabilities
630+
self._set_tls_capabilities(caps)
631+
632+
# Set connection attributes capabilities
633+
if "attributes" in self.settings:
634+
conn_attrs = self.settings["attributes"]
635+
self.protocol.set_capabilities(
636+
session_connect_attrs=conn_attrs)
637+
638+
# Set compression capabilities
639+
compression = self.settings.get("compression", "preferred")
640+
algorithm = None if compression == "disabled" \
641+
else self._set_compression_capabilities(caps, compression)
623642
self._authenticate()
643+
self.protocol.set_compression(algorithm)
624644
return
625645
except (socket.error, RuntimeError) as err:
626646
error = err
@@ -643,30 +663,31 @@ def connect(self):
643663
raise InterfaceError("Cannot connect to host: {0}".format(error))
644664
raise InterfaceError("Unable to connect to any of the target hosts", 4001)
645665

646-
def _handle_capabilities(self):
647-
"""Handle capabilities.
666+
def _set_tls_capabilities(self, caps):
667+
"""Sets the TLS capabilities.
668+
669+
Args:
670+
caps (dict): Dictionary with the server capabilities.
648671
649672
Raises:
650673
:class:`mysqlx.OperationalError`: If SSL is not enabled at the
651674
server.
652675
:class:`mysqlx.RuntimeError`: If support for SSL is not available
653676
in Python.
677+
678+
.. versionadded:: 8.0.21
654679
"""
655680
if self.settings.get("ssl-mode") == SSLMode.DISABLED:
656681
return
682+
657683
if self.stream.is_socket():
658684
if self.settings.get("ssl-mode"):
659685
_LOGGER.warning("SSL not required when using Unix socket.")
660686
return
661687

662-
try:
663-
data = self.protocol.get_capabilites().capabilities
664-
if not (get_item_or_attr(data[0], "name").lower() == "tls"
665-
if data else False):
666-
self.close_connection()
667-
raise OperationalError("SSL not enabled at server")
668-
except (AttributeError, KeyError):
669-
pass
688+
if "tls" not in caps:
689+
self.close_connection()
690+
raise OperationalError("SSL not enabled at server")
670691

671692
is_ol7 = False
672693
if platform.system() == "Linux":
@@ -694,6 +715,65 @@ def _handle_capabilities(self):
694715
conn_attrs = self.settings["attributes"]
695716
self.protocol.set_capabilities(session_connect_attrs=conn_attrs)
696717

718+
def _set_compression_capabilities(self, caps, compression):
719+
"""Sets the compression capabilities.
720+
721+
If compression is available, negociates client and server algorithms.
722+
Using the following priority:
723+
724+
1) lz4_message
725+
2) deflate_stream
726+
727+
Args:
728+
caps (dict): Dictionary with the server capabilities.
729+
compression (str): The compression connection setting.
730+
731+
Returns:
732+
str: The compression algorithm.
733+
734+
.. versionadded:: 8.0.21
735+
"""
736+
compression_data = caps.get("compression")
737+
if compression_data is None:
738+
msg = "Compression requested but the server does not support it"
739+
if compression == "required":
740+
raise NotSupportedError(msg)
741+
else:
742+
_LOGGER.warning(msg)
743+
return None
744+
745+
compression_dict = {}
746+
if isinstance(compression_data, dict): # C extension is being used
747+
for fld in compression_data["value"]["obj"]["fld"]:
748+
compression_dict[fld["key"]] = [
749+
value["scalar"]["v_string"]["value"].decode("utf-8")
750+
for value in fld["value"]["array"]["value"]
751+
]
752+
else:
753+
for fld in compression_data.value.obj.fld:
754+
compression_dict[fld.key] = [
755+
value.scalar.v_string.value.decode("utf-8")
756+
for value in fld.value.array.value
757+
]
758+
759+
server_algorithms = compression_dict.get("algorithm", [])
760+
if HAVE_LZ4 and "lz4_message" in server_algorithms:
761+
algorithm = "lz4_message"
762+
else:
763+
algorithm = "deflate_stream"
764+
765+
if algorithm not in server_algorithms:
766+
msg = ("Compression requested but the compression algorithm "
767+
"negotiation failed")
768+
if compression == "required":
769+
raise InterfaceError(msg)
770+
else:
771+
_LOGGER.warning(msg)
772+
return None
773+
774+
self.protocol.set_capabilities(compression={"algorithm": algorithm})
775+
return algorithm
776+
697777
def _authenticate(self):
698778
"""Authenticate with the MySQL server."""
699779
auth = self.settings.get("auth")

lib/mysqlx/protobuf/__init__.py

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@
5656
"Mysqlx.Sql.StmtExecuteOk"),
5757
("Mysqlx.ServerMessages.Type.RESULTSET_FETCH_DONE_MORE_OUT_PARAMS",
5858
"Mysqlx.Resultset.FetchDoneMoreOutParams"),
59+
("Mysqlx.ServerMessages.Type.COMPRESSION",
60+
"Mysqlx.Connection.Compression"),
5961
)
6062

6163
PROTOBUF_REPEATED_TYPES = [list]
@@ -74,6 +76,7 @@
7476
try:
7577
from . import mysqlx_connection_pb2
7678
from . import mysqlx_crud_pb2
79+
from . import mysqlx_cursor_pb2
7780
from . import mysqlx_datatypes_pb2
7881
from . import mysqlx_expect_pb2
7982
from . import mysqlx_expr_pb2
@@ -168,6 +171,8 @@
168171
mysqlx_connection_pb2.DESCRIPTOR.serialized_pb))
169172
_DESCRIPTOR_DB.Add(descriptor_pb2.FileDescriptorProto.FromString(
170173
mysqlx_crud_pb2.DESCRIPTOR.serialized_pb))
174+
_DESCRIPTOR_DB.Add(descriptor_pb2.FileDescriptorProto.FromString(
175+
mysqlx_cursor_pb2.DESCRIPTOR.serialized_pb))
171176
_DESCRIPTOR_DB.Add(descriptor_pb2.FileDescriptorProto.FromString(
172177
mysqlx_datatypes_pb2.DESCRIPTOR.serialized_pb))
173178
_DESCRIPTOR_DB.Add(descriptor_pb2.FileDescriptorProto.FromString(
@@ -213,6 +218,10 @@ def enum_value(key):
213218
def serialize_message(msg):
214219
return msg.SerializeToString()
215220

221+
@staticmethod
222+
def serialize_partial_message(msg):
223+
return msg.SerializePartialToString()
224+
216225
@staticmethod
217226
def parse_message(msg_type_name, payload):
218227
msg = _mysqlxpb_pure.new_message(msg_type_name)
@@ -346,10 +355,21 @@ def serialize_to_string(self):
346355
"""Serializes a message to a string.
347356
348357
Returns:
349-
string: A string representing a message containing parsed data.
358+
str: A string representing a message containing parsed data.
350359
"""
351360
return Protobuf.mysqlxpb.serialize_message(self._msg)
352361

362+
def serialize_partial_to_string(self):
363+
"""Serializes the protocol message to a binary string.
364+
365+
This method is similar to serialize_to_string but doesn't check if the
366+
message is initialized.
367+
368+
Returns:
369+
str: A string representation of the partial message.
370+
"""
371+
return Protobuf.mysqlxpb.serialize_partial_message(self._msg)
372+
353373
@property
354374
def type(self):
355375
"""string: Message type name."""
@@ -366,9 +386,26 @@ def parse(msg_type_name, payload):
366386
367387
Returns:
368388
dict: The dictionary representing a message containing parsed data.
389+
390+
.. versionadded:: 8.0.21
369391
"""
370392
return Protobuf.mysqlxpb.parse_message(msg_type_name, payload)
371393

394+
@staticmethod
395+
def byte_size(msg):
396+
"""Returns the size of the message in bytes.
397+
398+
Args:
399+
msg (mysqlx.protobuf.Message): MySQL X Protobuf Message.
400+
401+
Returns:
402+
int: Size of the message in bytes.
403+
404+
.. versionadded:: 8.0.21
405+
"""
406+
return msg.ByteSize() if Protobuf.use_pure \
407+
else len(encode_to_bytes(msg.serialize_to_string()))
408+
372409
@staticmethod
373410
def parse_from_server(msg_type, payload):
374411
"""Creates a new server-side message, initialized with parsed data.

0 commit comments

Comments
 (0)