Skip to content

Commit dde41ce

Browse files
author
Bogdan Degtyariov
committed
CDK: Added support for row locking in Find queries
1 parent 9d3252e commit dde41ce

File tree

16 files changed

+392
-36
lines changed

16 files changed

+392
-36
lines changed

cdk/core/session.cc

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929

3030
namespace cdk {
3131

32-
3332
/*
3433
A class that creates a session from given data source.
3534
@@ -150,7 +149,6 @@ Session_builder::operator() (
150149
}
151150

152151
m_database = options.database();
153-
154152
return true;
155153
}
156154

cdk/include/mysql/cdk/api/query.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,11 @@ class Projection
129129
: public Expr_list< Projection_expr<EXPR> >
130130
{};
131131

132+
struct Lock_mode
133+
{
134+
enum value { NONE, SHARED, EXCLUSIVE };
135+
};
136+
132137

133138
/*
134139
View specifications.

cdk/include/mysql/cdk/common.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -521,7 +521,7 @@ typedef cdk::api::Limit<row_count_t> Limit;
521521
typedef cdk::api::Order_by<Expression> Order_by;
522522
typedef cdk::api::Sort_direction Sort_direction;
523523
typedef cdk::api::Doc_base<Value_processor> Param_source;
524-
524+
typedef cdk::api::Lock_mode::value Lock_mode_value;
525525

526526
using cdk::api::View_security;
527527
using cdk::api::View_algorithm;

cdk/include/mysql/cdk/mysqlx/session.h

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -382,6 +382,8 @@ class Cursor;
382382
class SessionAuthInterface;
383383

384384

385+
typedef protocol::mysqlx::api::Protocol_fields Protocol_fields;
386+
385387
class Session
386388
: public api::Diagnostics
387389
, public Async_op
@@ -414,6 +416,7 @@ class Session
414416
unsigned long m_id;
415417
bool m_expired;
416418
string m_cur_schema;
419+
uint64_t m_proto_fields = UINT64_MAX;
417420

418421
struct
419422
{
@@ -464,6 +467,7 @@ class Session
464467
{
465468
m_stmt_stats.clear();
466469
authenticate(options);
470+
check_protocol_fields();
467471
}
468472

469473
virtual ~Session();
@@ -478,6 +482,13 @@ class Session
478482
option_t is_valid();
479483
option_t check_valid();
480484

485+
/*
486+
Check that xplugin is supporting certain new fields in the protocol
487+
such as row locking, etc. The function sets binary flags in
488+
m_proto_fields member variable
489+
*/
490+
void check_protocol_fields();
491+
481492
/*
482493
Clear diagnostic information that accumulated for the session.
483494
Diagnostics interface methods such as Diagnostics::error_count()
@@ -526,7 +537,8 @@ class Session
526537
const Expr_list *group_by = NULL,
527538
const Expression *having = NULL,
528539
const Limit *lim = NULL,
529-
const Param_source *param = NULL);
540+
const Param_source *param = NULL,
541+
const Lock_mode_value lock_mode = Lock_mode_value::NONE);
530542
Reply_init &coll_update(const api::Table_ref&,
531543
const Expression*,
532544
const Update_spec&,
@@ -547,7 +559,8 @@ class Session
547559
const Expr_list *group_by = NULL,
548560
const Expression *having = NULL,
549561
const Limit *lim = NULL,
550-
const Param_source *param = NULL);
562+
const Param_source *param = NULL,
563+
const Lock_mode_value lock_mode = Lock_mode_value::NONE);
551564
Reply_init &table_insert(const Table_ref&,
552565
Row_source&,
553566
const api::Columns *cols,

cdk/include/mysql/cdk/protocol/mysqlx.h

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,6 @@ struct notice_scope
318318
};
319319

320320

321-
322321
/*
323322
A class to store SQL state values.
324323
*/
@@ -413,6 +412,7 @@ typedef cdk::api::Order_by<Expression> Order_by;
413412
typedef cdk::api::Sort_direction Sort_direction;
414413
typedef cdk::api::Projection<Expression> Projection;
415414
typedef cdk::api::Columns Columns;
415+
typedef cdk::api::Lock_mode::value Lock_mode_value;
416416

417417
typedef cdk::api::View_options View_options;
418418

@@ -433,6 +433,16 @@ struct Expectations:
433433
enum { NO_ERROR = 1, FIELD_EXISTS = 2 };
434434
};
435435

436+
437+
struct Protocol_fields
438+
{
439+
/*
440+
Enum values will be used as binary flags,
441+
so they must be as 2^N
442+
*/
443+
enum value { ROW_LOCKING = 1 /*, NEXT = 2, NEXT_NEXT = 4*/ };
444+
};
445+
436446
} // api namespace
437447

438448

@@ -466,10 +476,12 @@ struct Find_spec : public Select_spec
466476

467477
typedef api::Projection Projection;
468478
typedef api::Expr_list Expr_list;
479+
typedef api::Lock_mode_value Lock_mode_value;
469480

470481
virtual const Projection* project() const = 0;
471482
virtual const Expr_list* group_by() const = 0;
472483
virtual const Expression* having() const = 0;
484+
virtual Lock_mode_value locking() const = 0;
473485
};
474486

475487

cdk/include/mysql/cdk/session.h

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -282,10 +282,12 @@ class Session
282282
const Expr_list *group_by = NULL,
283283
const Expression *having = NULL,
284284
const Limit *lim = NULL,
285-
const Param_source *param = NULL)
285+
const Param_source *param = NULL,
286+
const Lock_mode_value lock_mode = Lock_mode_value::NONE
287+
)
286288
{
287289
return m_session->coll_find(coll, view, expr, proj, order_by,
288-
group_by, having, lim, param);
290+
group_by, having, lim, param, lock_mode);
289291
}
290292

291293
/**
@@ -334,10 +336,11 @@ class Session
334336
const Expr_list *group_by = NULL,
335337
const Expression *having = NULL,
336338
const Limit* lim = NULL,
337-
const Param_source *param = NULL)
339+
const Param_source *param = NULL,
340+
const Lock_mode_value lock_mode = Lock_mode_value::NONE)
338341
{
339342
return m_session->table_select(tab, view, expr, proj, order_by,
340-
group_by, having, lim, param);
343+
group_by, having, lim, param, lock_mode);
341344
}
342345

343346
/**

cdk/mysqlx/delayed_op.h

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,7 @@ class SndFind
567567
Projection_converter m_proj_conv;
568568
Expr_list_converter m_group_by_conv;
569569
Expr_converter m_having_conv;
570+
Lock_mode_value m_lock_mode;
570571

571572
Proto_op* start()
572573
{
@@ -583,11 +584,13 @@ class SndFind
583584
const cdk::Expr_list *group_by = NULL,
584585
const cdk::Expression *having = NULL,
585586
const cdk::Limit *lim = NULL,
586-
const cdk::Param_source *param = NULL
587+
const cdk::Param_source *param = NULL,
588+
const Lock_mode_value locking = Lock_mode_value::NONE
587589
)
588590
: Select_op_base(protocol, coll, expr, order_by, lim, param)
589591
, m_proj_conv(proj)
590592
, m_group_by_conv(group_by), m_having_conv(having)
593+
, m_lock_mode(locking)
591594
{}
592595

593596
private:
@@ -607,6 +610,11 @@ class SndFind
607610
return m_having_conv.get();
608611
}
609612

613+
Lock_mode_value locking() const
614+
{
615+
return m_lock_mode;
616+
}
617+
610618
friend class SndViewCrud<DM>;
611619
};
612620

cdk/mysqlx/session.cc

Lines changed: 97 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,76 @@ POP_SYS_WARNINGS
3636
namespace cdk {
3737
namespace mysqlx {
3838

39+
/*
40+
A structure to check if xplugin we are connecting supports a
41+
specific field
42+
*/
43+
struct Proto_field_checker : public cdk::protocol::mysqlx::api::Expectations
44+
{
45+
cdk::bytes m_data;
46+
cdk::protocol::mysqlx::Protocol &m_proto;
47+
48+
Proto_field_checker(cdk::protocol::mysqlx::Protocol &proto) :
49+
m_proto(proto)
50+
{}
51+
52+
struct Check_reply_prc : cdk::protocol::mysqlx::Reply_processor
53+
{
54+
unsigned int m_code = 0;
55+
56+
void error(unsigned int code, short int,
57+
cdk::protocol::mysqlx::sql_state_t, const string &)
58+
{
59+
m_code = code;
60+
}
61+
62+
void ok(string)
63+
{
64+
m_code = 0;
65+
}
66+
};
67+
68+
void process(Processor &prc) const
69+
{
70+
prc.list_begin();
71+
prc.list_el()->set(FIELD_EXISTS, m_data);
72+
prc.list_end();
73+
}
74+
75+
/*
76+
This method sets the expectation and returns
77+
the field flag if it is supported, otherwise 0 is returned.
78+
*/
79+
uint64_t is_supported(Protocol_fields::value v)
80+
{
81+
switch (v)
82+
{
83+
case Protocol_fields::ROW_LOCKING:
84+
// Find(17) locking(12)
85+
m_data = bytes("17.12");
86+
break;
87+
default:
88+
return 0;
89+
}
90+
m_proto.snd_Expect_Open(*this, false).wait();
91+
92+
Check_reply_prc prc;
93+
m_proto.rcv_Reply(prc).wait();
94+
uint64_t ret = prc.m_code == 0 ? (uint64_t)v : 0;
95+
96+
if (prc.m_code == 0 || prc.m_code == 5168)
97+
{
98+
/*
99+
The expectation block needs to be closed if no error
100+
or expectation failed error (5168)
101+
*/
102+
m_proto.snd_Expect_Close().wait();
103+
m_proto.rcv_Reply(prc).wait();
104+
}
105+
return ret;
106+
}
107+
};
108+
39109

40110
class error_category_server : public foundation::error_category_base
41111
{
@@ -290,11 +360,23 @@ Session::~Session()
290360
option_t Session::is_valid()
291361
{
292362
wait();
293-
294363
return m_isvalid;
295364
}
296365

297366

367+
void Session::check_protocol_fields()
368+
{
369+
if (m_proto_fields == UINT64_MAX)
370+
{
371+
wait();
372+
Proto_field_checker field_checker(m_protocol);
373+
m_proto_fields = 0;
374+
/* More fields checks will be added here */
375+
m_proto_fields |= field_checker.is_supported(Protocol_fields::ROW_LOCKING);
376+
}
377+
}
378+
379+
298380
option_t Session::check_valid()
299381
{
300382
//TODO: contact server to check session
@@ -413,12 +495,17 @@ Reply_init& Session::coll_find(const Table_ref &coll,
413495
const Expr_list *group_by,
414496
const Expression *having,
415497
const Limit *lim,
416-
const Param_source *param)
498+
const Param_source *param,
499+
const Lock_mode_value lock_mode)
417500
{
501+
if (lock_mode != Lock_mode_value::NONE &&
502+
!(m_proto_fields & Protocol_fields::ROW_LOCKING))
503+
throw_error("Row locking is not supported by this version of the server");
504+
418505
SndFind<protocol::mysqlx::DOCUMENT> *find
419506
= new SndFind<protocol::mysqlx::DOCUMENT>(
420507
m_protocol, coll, expr, proj, order_by,
421-
group_by, having, lim, param
508+
group_by, having, lim, param, lock_mode
422509
);
423510

424511
if (view)
@@ -470,12 +557,17 @@ Reply_init& Session::table_select(const Table_ref &coll,
470557
const Expr_list *group_by,
471558
const Expression *having,
472559
const Limit *lim,
473-
const Param_source *param)
560+
const Param_source *param,
561+
const Lock_mode_value lock_mode)
474562
{
563+
if (lock_mode != Lock_mode_value::NONE &&
564+
!(m_proto_fields & Protocol_fields::ROW_LOCKING))
565+
throw_error("Row locking is not supported by this version of the server");
566+
475567
SndFind<protocol::mysqlx::TABLE> *find
476568
= new SndFind<protocol::mysqlx::TABLE>(
477569
m_protocol, coll, expr, proj, order_by,
478-
group_by, having, lim, param
570+
group_by, having, lim, param, lock_mode
479571
);
480572

481573
if (view)

cdk/protocol/mysqlx/crud.cc

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -432,7 +432,6 @@ struct Group_by_traits
432432
}
433433
};
434434

435-
436435
void set_find(Mysqlx::Crud::Find &msg,
437436
Data_model dm, const Find_spec &fs, const api::Args_map *args)
438437
{
@@ -467,6 +466,20 @@ void set_find(Mysqlx::Crud::Find &msg,
467466
expr_builder.reset(*msg.mutable_grouping_criteria());
468467
fs.having()->process(expr_builder);
469468
}
469+
470+
switch (fs.locking())
471+
{
472+
case api::Lock_mode_value::EXCLUSIVE:
473+
msg.set_locking(Mysqlx::Crud::Find_RowLock_EXCLUSIVE_LOCK);
474+
break;
475+
case api::Lock_mode_value::SHARED:
476+
msg.set_locking(Mysqlx::Crud::Find_RowLock_SHARED_LOCK);
477+
break;
478+
case api::Lock_mode_value::NONE:
479+
default: // do nothing
480+
break;
481+
}
482+
470483
}
471484

472485

0 commit comments

Comments
 (0)