Skip to content

Commit 65ba13a

Browse files
Bogdan Degtyariovsilvakid
Bogdan Degtyariov
authored andcommitted
WL#13701 DevAPI: Improve session pool logic
1 parent 6f9cfb2 commit 65ba13a

File tree

6 files changed

+963
-43
lines changed

6 files changed

+963
-43
lines changed

cdk/core/session.cc

Lines changed: 24 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ struct Session_builder
5656
bool m_throw_errors = false;
5757
scoped_ptr<Error> m_error;
5858
unsigned m_attempts = 0;
59+
size_t m_id = 0;
5960

6061
Session_builder(bool throw_errors = false)
6162
: m_throw_errors(throw_errors)
@@ -76,11 +77,15 @@ struct Session_builder
7677
3. If a bail-out error was detected, throws that error.
7778
*/
7879

79-
bool operator() (const ds::TCPIP &ds, const ds::TCPIP::Options &options);
80+
bool operator() (size_t, const ds::TCPIP &ds,
81+
const ds::TCPIP::Options &options);
82+
8083
#ifndef WIN32
81-
bool operator() (const ds::Unix_socket&ds, const ds::Unix_socket::Options &options);
84+
bool operator() (size_t, const ds::Unix_socket&ds,
85+
const ds::Unix_socket::Options &options);
8286
#endif
83-
bool operator() (const ds::TCPIP_old &ds, const ds::TCPIP_old::Options &options);
87+
bool operator() (size_t, const ds::TCPIP_old &ds,
88+
const ds::TCPIP_old::Options &options);
8489

8590
/*
8691
Make a connection attempt using the given connection object. Returns true if
@@ -152,6 +157,7 @@ bool Session_builder::connect(Conn &connection)
152157

153158
bool
154159
Session_builder::operator() (
160+
size_t id,
155161
const ds::TCPIP &ds,
156162
const ds::TCPIP::Options &options
157163
)
@@ -213,12 +219,14 @@ Session_builder::operator() (
213219
}
214220

215221
m_database = options.database();
222+
m_id = id;
216223
return true;
217224
}
218225

219226
#ifndef WIN32
220227
bool
221228
Session_builder::operator() (
229+
size_t id,
222230
const ds::Unix_socket &ds,
223231
const ds::Unix_socket::Options &options
224232
)
@@ -235,7 +243,7 @@ Session_builder::operator() (
235243
m_conn.reset(connection.release());
236244

237245
m_database = options.database();
238-
246+
m_id = id;
239247
return true;
240248
}
241249

@@ -244,6 +252,7 @@ Session_builder::operator() (
244252

245253
bool
246254
Session_builder::operator() (
255+
size_t,
247256
const ds::TCPIP_old&,
248257
const ds::TCPIP_old::Options&
249258
)
@@ -252,7 +261,6 @@ Session_builder::operator() (
252261
//return false;
253262
}
254263

255-
256264
#ifdef WITH_SSL
257265

258266
Session_builder::TLS*
@@ -366,30 +374,33 @@ Session::Session(ds::TCPIP &ds, const ds::TCPIP::Options &options)
366374
{
367375
Session_builder sb(true); // throw errors if detected
368376

369-
sb(ds, options);
377+
sb(0, ds, options);
370378

371379
assert(sb.m_sess);
372380

373381
m_session = sb.m_sess;
374382
m_connection = sb.m_conn.release();
383+
m_id = sb.m_id;
375384
}
376385

377386

378387
struct ds::Multi_source::Access
379388
{
380389
template <class Visitor>
381-
static void visit(Multi_source &ds, Visitor &visitor)
382-
{ ds.visit(visitor); }
390+
static void visit(Multi_source &ds, Visitor &visitor,
391+
Multi_source::ep_filter_t ep_filter)
392+
{ ds.visit(visitor, ep_filter); }
383393
};
384394

385395

386-
Session::Session(ds::Multi_source &ds)
396+
397+
Session::Session(ds::Multi_source &ds, ds::Multi_source::ep_filter_t ep_filter)
387398
: m_session(NULL)
388399
, m_connection(NULL)
389400
{
390401
Session_builder sb;
391402

392-
ds::Multi_source::Access::visit(ds, sb);
403+
ds::Multi_source::Access::visit(ds, sb, ep_filter);
393404

394405
if (!sb.m_sess)
395406
{
@@ -406,6 +417,7 @@ Session::Session(ds::Multi_source &ds)
406417
m_session = sb.m_sess;
407418
m_database = sb.m_database;
408419
m_connection = sb.m_conn.release();
420+
m_id = sb.m_id;
409421
}
410422

411423

@@ -416,12 +428,13 @@ Session::Session(ds::Unix_socket &ds, const ds::Unix_socket::Options &options)
416428
{
417429
Session_builder sb(true); // throw errors if detected
418430

419-
sb(ds, options);
431+
sb(0, ds, options);
420432

421433
assert(sb.m_sess);
422434

423435
m_session = sb.m_sess;
424436
m_connection = sb.m_conn.release();
437+
m_id = sb.m_id;
425438
}
426439
#endif //#ifndef WIN32
427440

cdk/include/mysql/cdk/data_source.h

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
PUSH_SYS_WARNINGS_CDK
3737
#include <functional>
3838
#include <algorithm>
39+
#include <iterator>
3940
#include <set>
4041
#include <random>
4142
#include "api/expression.h"
@@ -451,8 +452,10 @@ namespace ds {
451452

452453
struct Prio
453454
{
455+
size_t id;
454456
unsigned short prio;
455457
uint16_t weight;
458+
456459
operator unsigned short() const
457460
{
458461
return prio;
@@ -482,7 +485,10 @@ namespace ds {
482485
);
483486
}
484487

485-
m_ds_list.emplace(Prio{ m_counter++, weight }, DS_pair<DS_t, DS_opt>{ ds, opt });
488+
m_ds_list.emplace(
489+
Prio{ m_ds_list.size() + 1, m_counter++, weight },
490+
DS_pair<DS_t, DS_opt>{ ds, opt }
491+
);
486492
}
487493

488494
// Add data source with priority.
@@ -500,44 +506,54 @@ namespace ds {
500506
);
501507
}
502508

503-
m_ds_list.emplace(Prio{ prio, weight }, DS_pair<DS_t, DS_opt>{ ds, opt });
509+
m_ds_list.emplace(Prio{ m_ds_list.size() + 1, prio, weight },
510+
DS_pair<DS_t, DS_opt>{ ds, opt });
504511
}
505512

506513
private:
507514

508515
template <typename Visitor>
509516
struct Variant_visitor
510517
{
518+
511519
Visitor *vis = nullptr;
520+
size_t id = 0;
512521
bool stop_processing = false;
513522

514523
template <class DS_t, class DS_opt>
515-
void operator () (const DS_pair<DS_t, DS_opt> &ds_pair)
524+
void operator () (
525+
const DS_pair<DS_t, DS_opt> &ds_pair
526+
)
516527
{
517528
assert(vis);
518-
stop_processing = (bool)(*vis)(ds_pair.first, ds_pair.second);
529+
stop_processing = (bool)(*vis)(id, ds_pair.first, ds_pair.second);
519530
}
520531
};
521532

522533
public:
523534

535+
using ep_filter_t = std::function<bool(size_t)>;
536+
524537
/*
525-
Call visitor(ds,opts) for each data source ds with options
526-
opts in the list. Do it in decreasing priority order, choosing
527-
randomly among data sources with the same priority.
538+
Call visitor(id, ds,opts) for each data source ds with options
539+
opts in the list, where id is the data source identifier.
540+
Do it in decreasing priority order, choosing
541+
randomly among data sources with the same priority. If `ep_filter`
542+
is given, ignore data sources for which this function returns true.
543+
528544
If visitor(...) call returns true, stop the process.
529545
*/
530546

531547
template <class Visitor>
532-
void visit(Visitor &visitor)
548+
void visit(Visitor &visitor, ep_filter_t ep_filter = nullptr)
533549
{
534550
Variant_visitor<Visitor> variant_visitor;
535551
variant_visitor.vis = &visitor;
536552

537553
std::random_device rnd;
538554
bool stop_processing = false;
539555
std::vector<uint16_t> weights;
540-
std::set<DS_variant*> same_prio;
556+
std::set<std::pair<size_t,DS_variant&>> same_prio;
541557

542558
for (auto it = m_ds_list.begin(); !stop_processing;)
543559
{
@@ -555,7 +571,10 @@ namespace ds {
555571

556572
for (auto it1 = same_range.first; it1 != same_range.second; ++it1)
557573
{
558-
same_prio.insert(&(it1->second));
574+
if (ep_filter && ep_filter(it1->first.id))
575+
continue;
576+
577+
same_prio.emplace(it1->first.id, it1->second);
559578
weights.push_back(it1->first.weight);
560579
total_weight += it1->first.weight;
561580
}
@@ -601,7 +620,8 @@ namespace ds {
601620
std::advance(el, pos);
602621
}
603622

604-
(*el)->visit(variant_visitor);
623+
variant_visitor.id = el->first;
624+
el->second.visit(variant_visitor);
605625
stop_processing = variant_visitor.stop_processing;
606626

607627
if (stop_processing)

cdk/include/mysql/cdk/session.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ class Session
5656
mysqlx::Session *m_session;
5757
const mysqlx::string *m_database;
5858
api::Connection *m_connection;
59+
size_t m_id = 0;
5960

6061
typedef Reply::Initializer Reply_init;
6162

@@ -68,7 +69,7 @@ class Session
6869
Session(ds::TCPIP &ds,
6970
const ds::TCPIP::Options &options = ds::TCPIP::Options());
7071

71-
Session(ds::Multi_source&);
72+
Session(ds::Multi_source&, ds::Multi_source::ep_filter_t = nullptr);
7273

7374
#ifndef _WIN32
7475
Session(ds::Unix_socket &ds,
@@ -99,6 +100,16 @@ class Session
99100
m_connection->close();
100101
}
101102

103+
/*
104+
If this session was created from a ds::Multi_source, returns id of the single
105+
data source inside the multi-source that was used to create this session.
106+
Otherwise returns 0.
107+
*/
108+
109+
size_t id() {
110+
return m_id;
111+
}
112+
102113
/*
103114
Transactions
104115
------------

0 commit comments

Comments
 (0)