Skip to content

Commit dbb6145

Browse files
committed
WL13497: DevAPI: Update multi-host support
1 parent c89926d commit dbb6145

File tree

4 files changed

+124
-120
lines changed

4 files changed

+124
-120
lines changed

cdk/core/session.cc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,9 +40,10 @@ using std::unique_ptr;
4040
/*
4141
A class that creates a session from given data source.
4242
43-
Instances of this calss are callable objects which can be used as visitors
43+
Instances of this class are callable objects which can be used as visitors
4444
for ds::Multi_source implementing in this case the failover logic.
4545
*/
46+
4647
struct Session_builder
4748
{
4849
using TLS = cdk::connection::TLS;
@@ -398,7 +399,7 @@ Session::Session(ds::Multi_source &ds)
398399
throw_error(
399400
1 == sb.m_attempts ?
400401
"Could not connect to the given data source" :
401-
"Could not connect ot any of the given data sources"
402+
"Could not connect to any of the given data sources"
402403
);
403404
}
404405

cdk/include/mysql/cdk/data_source.h

Lines changed: 69 additions & 108 deletions
Original file line numberDiff line numberDiff line change
@@ -353,14 +353,15 @@ namespace ds {
353353
#endif //_WIN32
354354
typedef mysql::TCPIP TCPIP_old;
355355

356+
356357
template <typename DS_t, typename DS_opt>
357358
struct DS_pair : public std::pair<DS_t, DS_opt>
358359
{
359360
DS_pair(const DS_pair&) = default;
360361
#ifdef HAVE_MOVE_CTORS
361362
DS_pair(DS_pair&&) = default;
362363
#endif
363-
DS_pair(DS_t &ds, DS_opt &opt) : std::pair<DS_t, DS_opt>(ds, opt)
364+
DS_pair(const DS_t &ds, const DS_opt &opt) : std::pair<DS_t, DS_opt>(ds, opt)
364365
{}
365366
};
366367

@@ -369,12 +370,12 @@ namespace ds {
369370
A data source which encapsulates several other data sources (all of which
370371
are assumed to hold the same data).
371372
372-
When adding data sources to a multi source, a priority and a weight can
373-
be specified. When a visitor is visiting the multi source, the data sources
374-
are presented to the visitor in decreasing priority order. If several data
375-
sources have the same priority, they are presented in random order, taking
376-
into account specified weights. If no priorities were specified, then data
377-
sources are presented in the order in which they were added.
373+
When adding data sources to a multi source, a priority can be specified.
374+
When a visitor is visiting the multi source, the data sources it contains
375+
are presented to the visitor in increasing priority order. If several data
376+
sources have the same priority, they are presented in random order. If
377+
no priorities were specified, then data sources are presented in the order
378+
in which they were added.
378379
379380
If priorities are specified, they must be specified for all data sources
380381
that are added to the multi source.
@@ -394,8 +395,8 @@ namespace ds {
394395
>
395396
DS_variant;
396397

397-
bool m_is_prioritized;
398-
unsigned short m_counter;
398+
bool m_is_prioritized = false;
399+
unsigned short m_counter = 0;
399400

400401
struct Prio
401402
{
@@ -412,79 +413,62 @@ namespace ds {
412413
}
413414
};
414415

415-
typedef std::multimap<Prio, DS_variant, std::greater<Prio>> DS_list;
416+
typedef std::multimap<Prio, DS_variant, std::less<Prio>> DS_list;
416417
DS_list m_ds_list;
417418
uint32_t m_total_weight = 0;
418419

419420
public:
420421

421-
Multi_source() : m_is_prioritized(false), m_counter(65535)
422-
{
423-
std::srand((unsigned int)time(NULL));
424-
}
422+
// Add data source without explicit priority.
425423

426424
template <class DS_t, class DS_opt>
427-
void add(const DS_t &ds, const DS_opt &opt,
428-
unsigned short prio, uint16_t weight = 1)
425+
void add(const DS_t& ds, const DS_opt& opt, uint16_t weight = 1)
429426
{
430-
if (m_ds_list.size() == 0)
427+
if (m_is_prioritized)
431428
{
432-
m_is_prioritized = (prio > 0);
429+
throw_error(
430+
"Adding un-prioritized items to prioritized list is not allowed"
431+
);
433432
}
434-
else
435-
{
436-
if (m_is_prioritized && prio == 0)
437-
throw Error(cdkerrc::generic_error,
438-
"Adding un-prioritized items to prioritized list is not allowed");
439433

440-
if (!m_is_prioritized && prio > 0)
441-
throw Error(cdkerrc::generic_error,
442-
"Adding prioritized items to un-prioritized list is not allowed");
443-
}
434+
m_ds_list.emplace(Prio{ m_counter++, weight }, DS_pair<DS_t, DS_opt>{ ds, opt });
435+
}
444436

445-
/*
446-
The internal placement of priorities will be as this:
447-
if list is a no-priority one the map has to retain the order of
448-
elements at the time of the placement. Therefore, it will count-down
449-
from max(unsigned short)
450-
*/
451-
DS_pair<DS_t, DS_opt> pair(const_cast<DS_t&>(ds),
452-
const_cast<DS_opt&>(opt));
453-
if (m_is_prioritized)
454-
m_ds_list.emplace(Prio({prio,weight}), pair);
455-
else
437+
// Add data source with priority.
438+
439+
template <class DS_t, class DS_opt>
440+
void add_prio(const DS_t &ds, const DS_opt &opt, unsigned short prio, uint16_t weight = 1)
441+
{
442+
if (m_ds_list.size() == 0)
443+
m_is_prioritized = true;
444+
445+
if (!m_is_prioritized)
456446
{
457-
/*
458-
When list is not prioritized the map should keep the order of elements.
459-
This is achieved by decrementing the counter every time a new element
460-
goes into the list.
461-
*/
462-
m_ds_list.emplace(Prio({m_counter--,weight}), pair);
447+
throw_error(
448+
"Adding prioritized items to un-prioritized list is not allowed"
449+
);
463450
}
464451

465-
m_total_weight += weight;
452+
m_ds_list.emplace(Prio{ prio, weight }, DS_pair<DS_t, DS_opt>{ ds, opt });
466453
}
467454

468-
private:
455+
private:
469456

470457
template <typename Visitor>
471458
struct Variant_visitor
472459
{
473-
Visitor *vis;
474-
bool stop_processing;
475-
476-
Variant_visitor() : stop_processing(false)
477-
{ }
460+
Visitor *vis = nullptr;
461+
bool stop_processing = false;
478462

479463
template <class DS_t, class DS_opt>
480464
void operator () (const DS_pair<DS_t, DS_opt> &ds_pair)
481465
{
466+
assert(vis);
482467
stop_processing = (bool)(*vis)(ds_pair.first, ds_pair.second);
483468
}
484469
};
485470

486-
487-
public:
471+
public:
488472

489473
/*
490474
Call visitor(ds,opts) for each data source ds with options
@@ -496,80 +480,59 @@ namespace ds {
496480
template <class Visitor>
497481
void visit(Visitor &visitor)
498482
{
483+
Variant_visitor<Visitor> variant_visitor;
484+
variant_visitor.vis = &visitor;
485+
486+
std::random_device rnd;
499487
bool stop_processing = false;
500-
std::vector<DS_variant*> same_prio;
501488
std::vector<uint16_t> weights;
502-
std::random_device generator;
489+
std::set<DS_variant*> same_prio;
503490

504491
for (auto it = m_ds_list.begin(); !stop_processing;)
505492
{
506-
DS_variant *item = NULL;
493+
if (it == m_ds_list.end())
494+
break;
507495

508-
if (m_is_prioritized)
509-
{
510-
if (same_prio.empty())
511-
{
512-
if (it == m_ds_list.end())
513-
break;
496+
assert(same_prio.empty());
514497

515-
// Get items with the same priority and store them in same_prio set
498+
{
499+
// Get items with the same priority and store them in same_prio set
516500

517-
auto same_range = m_ds_list.equal_range(it->first);
518-
it = same_range.second;
501+
auto same_range = m_ds_list.equal_range(it->first);
502+
it = same_range.second; // move it to the first element after the range
519503

520-
for (auto it1 = same_range.first; it1 != same_range.second; ++it1)
521-
{
522-
//If weight is not specified, we need to set all weight values
523-
//with same, os that discrete_distribution works as expected
524-
weights.push_back(it1->first.weight);
525-
same_prio.push_back(&(it1->second));
526-
}
504+
for (auto it1 = same_range.first; it1 != same_range.second; ++it1)
505+
{
506+
same_prio.insert(&(it1->second));
507+
weights.push_back(it1->first.weight);
527508
}
509+
}
528510

529-
std::discrete_distribution<int> distribution(
530-
weights.begin(), weights.end()
531-
);
532-
511+
for (size_t size = same_prio.size(); size > 0; size = same_prio.size())
512+
{
533513
auto el = same_prio.begin();
514+
size_t pos = 0;
534515

535-
int pos = 0;
536-
537-
if (same_prio.size() > 1)
516+
if (size > 1)
538517
{
539-
pos = distribution(generator);
518+
std::discrete_distribution<int> distr(
519+
weights.begin(), weights.end()
520+
);
521+
pos = distr(rnd);
540522
std::advance(el, pos);
541523
}
542524

543-
item = *el;
544-
same_prio.erase(same_prio.begin()+pos);
545-
weights.erase(weights.begin()+pos);
525+
(*el)->visit(variant_visitor);
526+
stop_processing = variant_visitor.stop_processing;
546527

547-
} // if (m_is_prioritized)
548-
else
549-
{
550-
if (it == m_ds_list.end())
528+
if (stop_processing)
551529
break;
552530

553-
// Just get the next item from the list if no priority is given
554-
item = &(it->second);
555-
++it;
531+
same_prio.erase(el);
532+
weights.erase(weights.begin() + pos);
556533
}
557534

558-
// Give values to the visitor
559-
Variant_visitor<Visitor> variant_visitor;
560-
variant_visitor.vis = &visitor;
561-
/*
562-
Cannot use lambda because auto type for lambdas is only
563-
supported in C++14
564-
*/
565-
item->visit(variant_visitor);
566-
stop_processing = variant_visitor.stop_processing;
567-
568-
/* Exit if visit reported true or if we advanced to the end of the list */
569-
if (stop_processing || it == m_ds_list.end())
570-
break;
571-
572-
} // for
535+
} // for m_ds_llist
573536
}
574537

575538
void clear()
@@ -647,9 +610,7 @@ namespace ds {
647610
Options::TLS_options tls(m_opts.get_tls());
648611
tls.set_host_name(el.name);
649612
opt1.set_tls(tls);
650-
//Prio is negative because for URI prio, less is better, but for
651-
//SRV record, more is better
652-
src.add(ds::TCPIP(el.name, el.port), opt1, -el.prio, el.weight);
613+
src.add_prio(ds::TCPIP(el.name, el.port), opt1, el.prio, el.weight);
653614
}
654615

655616
return src;

common/session.cc

Lines changed: 20 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -401,15 +401,15 @@ void Settings_impl::get_data_source(cdk::ds::Multi_source &src)
401401
int prio = m_data.m_user_priorities ? -1 : 100;
402402

403403
/*
404-
Look for a priority after host/socket setting. If prio >= 0 then implicit
405-
priorities are used and in that case only sanity checks are done.
406-
Otherwise we expect that priority is explicitly given in the settings and
407-
throw error if this is not the case.
404+
Look for a priority after host/socket setting. If explicit priorities
405+
are used, then we expect the priority setting to be present and we throw
406+
error if this is not the case. Otherwise the given defalut priority is
407+
not changed and only sanity checks are done.
408408
*/
409409

410410
auto check_prio = [this](iterator &it, int &prio) {
411411

412-
if (0 > prio)
412+
if (m_data.m_user_priorities)
413413
{
414414
if (it == end() || Session_option_impl::PRIORITY != it->first)
415415
throw_error("No priority specified for host ...");
@@ -420,6 +420,12 @@ void Settings_impl::get_data_source(cdk::ds::Multi_source &src)
420420

421421
assert(0 <= prio && prio <= 100);
422422

423+
/*
424+
Convert from decreasing priorities to increasing priorities used
425+
by cdk::Multi_source.
426+
*/
427+
prio = 100 - prio;
428+
423429
/*
424430
If there are more options, there should be no PRIORITY option
425431
at this point.
@@ -475,7 +481,7 @@ void Settings_impl::get_data_source(cdk::ds::Multi_source &src)
475481
}
476482
#endif
477483

478-
src.add(cdk::ds::TCPIP(host, port), opts, (unsigned short)prio);
484+
src.add_prio(cdk::ds::TCPIP(host, port), opts, (unsigned short)prio);
479485
};
480486

481487
/*
@@ -497,13 +503,17 @@ void Settings_impl::get_data_source(cdk::ds::Multi_source &src)
497503

498504
check_prio(it, prio);
499505

500-
src.add(cdk::ds::Unix_socket(socket_path),
506+
src.add_prio(cdk::ds::Unix_socket(socket_path),
501507
(cdk::ds::Unix_socket::Options&)opts,
502508
(unsigned short)prio);
503509

504510
};
505511
#endif
506512

513+
514+
// default prioirty of 1 is used if priorities are not explicitly specified
515+
static const int default_prio = 1;
516+
507517
/*
508518
Go through options and look for ones which define connections.
509519
*/
@@ -513,18 +523,18 @@ void Settings_impl::get_data_source(cdk::ds::Multi_source &src)
513523
switch (it->first)
514524
{
515525
case Session_option_impl::HOST:
516-
add_host(it, prio--); break;
526+
add_host(it, default_prio); break;
517527

518528
case Session_option_impl::SOCKET:
519-
add_socket(it, prio--); break;
529+
add_socket(it, default_prio); break;
520530

521531
/*
522532
Note: if m_host_cnt > 0 then a HOST setting must be before PORT setting,
523533
so the case above should cover that HOST/PORT pair.
524534
*/
525535
case Session_option_impl::PORT:
526536
assert(0 == m_data.m_host_cnt);
527-
add_host(it, prio--);
537+
add_host(it, default_prio);
528538
break;
529539

530540
default:

0 commit comments

Comments
 (0)