Skip to content

Commit 62087f3

Browse files
committed
Merge branch 'wl13947'
2 parents a025322 + 36e4b4b commit 62087f3

File tree

13 files changed

+1025
-38
lines changed

13 files changed

+1025
-38
lines changed

cdk/include/mysql/cdk/data_source.h

Lines changed: 34 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,20 +197,36 @@ class Protocol_options
197197
REQUIRED
198198
};
199199

200+
enum compression_algorithm_t {
201+
NONE,
202+
DEFLATE_STREAM,
203+
LZ4_MESSAGE,
204+
ZSTD_STREAM
205+
};
206+
200207
virtual auth_method_t auth_method() const = 0;
201208
virtual compression_mode_t compression() const = 0;
202209

210+
using Compression_algorithms = std::vector<compression_algorithm_t>;
211+
virtual const Compression_algorithms& compression_algorithms() const = 0;
212+
203213
};
204214

205215

206216
class Options
207217
: public ds::Options<Protocol_options>,
208218
public foundation::connection::Socket_base::Options
209219
{
220+
public:
221+
222+
typedef std::vector<compression_algorithm_t> Compression_algorithms;
223+
210224
protected:
211225

212226
auth_method_t m_auth_method = DEFAULT;
213-
compression_mode_t m_compression = DISABLED;
227+
compression_mode_t m_compression = PREFERRED;
228+
bool m_has_compression_alg = false;
229+
Compression_algorithms m_compression_algorithms;
214230

215231
public:
216232

@@ -241,6 +257,23 @@ class Options
241257
return m_compression;
242258
}
243259

260+
void add_compression_alg(compression_algorithm_t val)
261+
{
262+
m_has_compression_alg = true;
263+
m_compression_algorithms.push_back(val);
264+
}
265+
266+
const Compression_algorithms& compression_algorithms() const
267+
{
268+
if (m_has_compression_alg)
269+
return m_compression_algorithms;
270+
271+
static Compression_algorithms default_compression_algorithms =
272+
{ ZSTD_STREAM, LZ4_MESSAGE, DEFLATE_STREAM };
273+
274+
return default_compression_algorithms ;
275+
}
276+
244277
};
245278

246279

cdk/include/mysql/cdk/mysqlx/session.h

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,8 @@ class Session
253253

254254
typedef ds::Options<ds::mysqlx::Protocol_options> Options;
255255
using compression_mode_t = ds::mysqlx::Protocol_options::compression_mode_t;
256+
using compression_algorithm_t =
257+
ds::mysqlx::Protocol_options::compression_algorithm_t;
256258

257259
template <class C>
258260
Session(C &conn, const Options &options)
@@ -267,7 +269,7 @@ class Session
267269

268270
if (options.compression() != compression_mode_t::DISABLED)
269271
{
270-
compression = negotiate_compression();
272+
compression = negotiate_compression(options.compression_algorithms());
271273

272274
if (compression == Compression_type::NONE &&
273275
options.compression() == compression_mode_t::REQUIRED)
@@ -286,16 +288,13 @@ class Session
286288

287289
/*
288290
Get the most suitable compression type supported by the server.
289-
Function prioritizes compression types as follows:
290-
LZ4
291-
DEFLATE
292291
293-
If the higher priority type is not available the function will
294-
check a lower priority type.
292+
Vector order is used for priorization, so first element has the higher
293+
priority.
295294
296295
NONE is returned if the server does not support compression
297296
*/
298-
Compression_type::value negotiate_compression();
297+
Compression_type::value negotiate_compression(const std::vector<compression_algorithm_t>& algorithms);
299298

300299
virtual ~Session();
301300

cdk/mysqlx/session.cc

Lines changed: 53 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ POP_SYS_WARNINGS_CDK
4343
namespace cdk {
4444
namespace mysqlx {
4545

46-
Compression_type::value Session::negotiate_compression()
46+
Compression_type::value Session::negotiate_compression(
47+
const std::vector<cdk::mysqlx::Session::compression_algorithm_t>& algorithms)
4748
{
4849
Compression_type::value compression = Compression_type::NONE;
4950

@@ -84,27 +85,59 @@ Compression_type::value Session::negotiate_compression()
8485
} cap_prc;
8586

8687
/*
87-
The compression types must be attempted with increaing
88+
The compression types must be attempted with increasing
8889
priority. The last successful type will be applied.
8990
*/
90-
compress_caps.m_algorithm = "deflate_stream";
91-
m_protocol.snd_CapabilitiesSet(compress_caps).wait();
92-
compress_caps.m_algorithm = "lz4_message";
93-
m_protocol.snd_CapabilitiesSet(compress_caps).wait();
94-
compress_caps.m_algorithm = "zstd_stream";
95-
m_protocol.snd_CapabilitiesSet(compress_caps).wait();
96-
97-
m_protocol.rcv_Reply(cap_prc).wait();
98-
if (cap_prc.m_compression_ok)
99-
compression = Compression_type::DEFLATE;
100-
101-
m_protocol.rcv_Reply(cap_prc).wait();
102-
if (cap_prc.m_compression_ok)
103-
compression = Compression_type::LZ4;
104-
105-
m_protocol.rcv_Reply(cap_prc).wait();
106-
if (cap_prc.m_compression_ok)
107-
compression = Compression_type::ZSTD;
91+
92+
for(auto alg = algorithms.rbegin(); alg != algorithms.rend(); ++alg)
93+
{
94+
switch (*alg) {
95+
case cdk::mysqlx::Session::compression_algorithm_t::DEFLATE_STREAM:
96+
compress_caps.m_algorithm = "deflate_stream";
97+
m_protocol.snd_CapabilitiesSet(compress_caps).wait();
98+
break;
99+
case cdk::mysqlx::Session::compression_algorithm_t::LZ4_MESSAGE:
100+
compress_caps.m_algorithm = "lz4_message";
101+
m_protocol.snd_CapabilitiesSet(compress_caps).wait();
102+
break;
103+
case cdk::mysqlx::Session::compression_algorithm_t::ZSTD_STREAM:
104+
compress_caps.m_algorithm = "zstd_stream";
105+
m_protocol.snd_CapabilitiesSet(compress_caps).wait();
106+
break;
107+
case cdk::mysqlx::Session::compression_algorithm_t::NONE:
108+
break;
109+
default:
110+
//Add algorithm name here
111+
assert(false);
112+
}
113+
}
114+
115+
116+
for(auto alg = algorithms.rbegin(); alg != algorithms.rend(); ++alg)
117+
{
118+
switch (*alg) {
119+
case cdk::mysqlx::Session::compression_algorithm_t::DEFLATE_STREAM:
120+
m_protocol.rcv_Reply(cap_prc).wait();
121+
if (cap_prc.m_compression_ok)
122+
compression = Compression_type::DEFLATE;
123+
break;
124+
case cdk::mysqlx::Session::compression_algorithm_t::LZ4_MESSAGE:
125+
m_protocol.rcv_Reply(cap_prc).wait();
126+
if (cap_prc.m_compression_ok)
127+
compression = Compression_type::LZ4;
128+
break;
129+
case cdk::mysqlx::Session::compression_algorithm_t::ZSTD_STREAM:
130+
m_protocol.rcv_Reply(cap_prc).wait();
131+
if (cap_prc.m_compression_ok)
132+
compression = Compression_type::ZSTD;
133+
break;
134+
case cdk::mysqlx::Session::compression_algorithm_t::NONE:
135+
break;
136+
default:
137+
//New algorithm adder....
138+
assert(false);
139+
}
140+
}
108141

109142
return compression;
110143
}

common/session.cc

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,29 @@ TCPIP_options::compression_mode_t get_compression(unsigned m)
241241
return CDK_type(0); // quiet compiler warnings
242242
}
243243

244+
TCPIP_options::compression_algorithm_t get_compression_algorithm(std::string alg)
245+
{
246+
std::string algorithm_name = to_upper(alg);
247+
248+
using CDK_type = TCPIP_options::compression_algorithm_t;
249+
250+
static std::map<std::string,CDK_type> alg_map= {
251+
{"DEFLATE_STREAM",CDK_type::DEFLATE_STREAM},
252+
{"DEFLATE",CDK_type::DEFLATE_STREAM}, //ALIAS
253+
{"LZ4_MESSAGE",CDK_type::LZ4_MESSAGE},
254+
{"LZ4",CDK_type::LZ4_MESSAGE}, //ALIAS
255+
{"ZSTD_STREAM",CDK_type::ZSTD_STREAM},
256+
{"ZSTD",CDK_type::ZSTD_STREAM}, //ALIAS
257+
};
258+
259+
auto it = alg_map.find(algorithm_name);
260+
261+
if(it == alg_map.end())
262+
return CDK_type::NONE;
263+
264+
return it->second;
265+
}
266+
244267
TLS_options::SSL_MODE get_ssl_mode(unsigned m)
245268
{
246269
using DevAPI_type = Settings_impl::SSL_mode;
@@ -395,6 +418,28 @@ void prepare_options(
395418
(unsigned)settings.get(Option::COMPRESSION).get_uint()));
396419
}
397420

421+
if (settings.has_option(Option::COMPRESSION_ALGORITHMS))
422+
{
423+
bool has_algs = false;
424+
for (const auto &opt_val : settings)
425+
{
426+
switch(opt_val.first)
427+
{
428+
case Option::COMPRESSION_ALGORITHMS:
429+
has_algs = true;
430+
opts.add_compression_alg(
431+
get_compression_algorithm(
432+
opt_val.second.get_string())
433+
);
434+
}
435+
}
436+
if(!has_algs)
437+
{
438+
//Inform that option was used but nothing was set
439+
opts.add_compression_alg(TCPIP_options::NONE);
440+
}
441+
}
442+
398443
// DNS+SRV
399444

400445
if(settings.has_option(Option::DNS_SRV))

common/settings.h

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -313,6 +313,11 @@ class Settings_impl::Setter
313313
m_data.m_tls_vers = true;
314314
break;
315315

316+
case Session_option_impl::COMPRESSION_ALGORITHMS:
317+
m_multi = !m_data.m_compression_algorithms;
318+
m_data.m_compression_algorithms = true;
319+
break;
320+
316321
default:
317322
{
318323
std::stringstream err_msg;
@@ -1076,6 +1081,23 @@ Settings_impl::Setter::set_option<
10761081
add_option((int)Settings_impl::Session_option_impl::TLS_VERSIONS, val);
10771082
}
10781083

1084+
template<>
1085+
inline void
1086+
Settings_impl::Setter::set_option<
1087+
Settings_impl::Session_option_impl::COMPRESSION_ALGORITHMS
1088+
>(const std::string &val)
1089+
{
1090+
m_data.m_compression_algorithms = true; // record that the option was set
1091+
1092+
// If in multi mode, the value is a single list element, otherwise
1093+
// the value can be a comma separated list
1094+
1095+
if (!m_multi)
1096+
set_comma_separated((int)Settings_impl::Session_option_impl::COMPRESSION_ALGORITHMS, val);
1097+
else
1098+
add_option((int)Settings_impl::Session_option_impl::COMPRESSION_ALGORITHMS, val);
1099+
}
1100+
10791101

10801102
// Generic add_option() method.
10811103

@@ -1098,6 +1120,7 @@ void Settings_impl::Setter::add_option(int opt, const T &val)
10981120

10991121
case Session_option_impl::TLS_CIPHERSUITES:
11001122
case Session_option_impl::TLS_VERSIONS:
1123+
case Session_option_impl::COMPRESSION_ALGORITHMS:
11011124
if (m_multi)
11021125
{
11031126
options.emplace_back(opt, val);
@@ -1237,6 +1260,9 @@ void Settings_impl::Setter::null()
12371260
case Session_option_impl::USER:
12381261
throw_error("Option ... can not be unset");
12391262
break;
1263+
case Session_option_impl::COMPRESSION_ALGORITHMS:
1264+
//It has compression algorithms, so don't use default
1265+
m_data.m_compression_algorithms = true;
12401266
case Session_option_impl::LAST:
12411267
break;
12421268
default:
@@ -1373,7 +1399,7 @@ void Settings_impl::Setter::key_val(const std::string &key, const std::string &v
13731399
}
13741400
break;
13751401
default:
1376-
key_val(get_uri_option(key))->scalar()->str(val);
1402+
key_val(option)->scalar()->str(val);
13771403
}
13781404
}
13791405
catch (const std::out_of_range&)
@@ -1427,6 +1453,7 @@ void Settings_impl::Setter::key_val(const std::string &key,
14271453

14281454
case Settings_impl::Session_option_impl::TLS_CIPHERSUITES:
14291455
case Settings_impl::Session_option_impl::TLS_VERSIONS:
1456+
case Settings_impl::Session_option_impl::COMPRESSION_ALGORITHMS:
14301457
{
14311458
auto *prc = key_val(option)->arr();
14321459
if (!prc)

0 commit comments

Comments
 (0)