Skip to content

Commit 2035753

Browse files
committed
WL10974: Add Row locking methods to find and select operations
This patch add two methods `lock_shared` and `lock_exclusive` to the Colleciton.find() and Table.select() command chains. Multiple calls to these methods are allowed, but only the last call is considered as final locking mode. Tests have been added for regression.
1 parent 9b498ba commit 2035753

File tree

5 files changed

+240
-57
lines changed

5 files changed

+240
-57
lines changed

lib/mysqlx/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@
4242
DeleteStatement, UpdateStatement,
4343
CreateCollectionIndexStatement, CreateTableStatement,
4444
CreateViewStatement, AlterViewStatement, ColumnDef,
45-
GeneratedColumnDef, ForeignKeyDef, Expr)
45+
GeneratedColumnDef, ForeignKeyDef, Expr,
46+
ReadStatement)
4647

4748
_SPLIT = re.compile(r',(?![^\(\)]*\))')
4849
_PRIORITY = re.compile(r'^\(address=(.+),priority=(\d+)\)$', re.VERBOSE)

lib/mysqlx/protocol.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,14 @@ def send_find(self, stmt):
167167
if stmt._has_projection:
168168
msg["projection"] = stmt._projection_expr
169169
self._apply_filter(msg, stmt)
170+
171+
if stmt._lock_exclusive:
172+
msg["locking"] = \
173+
mysqlxpb_enum("Mysqlx.Crud.Find.RowLock.EXCLUSIVE_LOCK")
174+
elif stmt._lock_shared:
175+
msg["locking"] = \
176+
mysqlxpb_enum("Mysqlx.Crud.Find.RowLock.SHARED_LOCK")
177+
170178
self._writer.write_message(
171179
mysqlxpb_enum("Mysqlx.ClientMessages.Type.CRUD_FIND"), msg)
172180

lib/mysqlx/statement.py

Lines changed: 56 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -477,30 +477,37 @@ def execute(self):
477477
return self._connection.update(self)
478478

479479

480-
class FindStatement(FilterableStatement):
481-
"""A statement document selection on a Collection.
480+
class ReadStatement(FilterableStatement):
481+
"""Provide base functionality for Read operations
482482
483483
Args:
484-
collection (mysqlx.Collection): The Collection object.
485-
condition (Optional[str]): An optional expression to identify the
486-
documents to be retrieved. If not specified
487-
all the documents will be included on the
488-
result unless a limit is set.
484+
target (object): The target database object, it can be
485+
:class:`mysqlx.Collection` or :class:`mysqlx.Table`.
486+
doc_based (Optional[bool]): `True` if it is document based
487+
(default: `True`).
488+
condition (Optional[str]): Sets the search condition to filter
489+
documents or records.
489490
"""
490-
def __init__(self, collection, condition=None):
491-
super(FindStatement, self).__init__(collection, True, condition)
492-
493-
def fields(self, *fields):
494-
"""Sets a document field filter.
491+
def __init__(self, target, doc_based=True, condition=None):
492+
super(ReadStatement, self).__init__(target, doc_based, condition)
493+
self._lock_exclusive = False
494+
self._lock_shared = False
495495

496-
Args:
497-
*fields: The string expressions identifying the fields to be
498-
extracted.
496+
def lock_shared(self):
497+
"""Execute a read operation with SHARED LOCK. Only one lock can be
498+
active at a time.
499+
"""
500+
self._lock_exclusive = False
501+
self._lock_shared = True
502+
return self
499503

500-
Returns:
501-
mysqlx.FindStatement: FindStatement object.
504+
def lock_exclusive(self):
505+
"""Execute a read operation with EXCLUSIVE LOCK. Only one lock can be
506+
active at a time.
502507
"""
503-
return self._projection(*fields)
508+
self._lock_exclusive = True
509+
self._lock_shared = False
510+
return self
504511

505512
def group_by(self, *fields):
506513
"""Sets a grouping criteria for the resultset.
@@ -509,7 +516,7 @@ def group_by(self, *fields):
509516
*fields: The string expressions identifying the grouping criteria.
510517
511518
Returns:
512-
mysqlx.FindStatement: FindStatement object.
519+
mysqlx.ReadStatement: ReadStatement object.
513520
"""
514521
self._group_by(*fields)
515522
return self
@@ -523,7 +530,7 @@ def having(self, condition):
523530
the grouping criteria.
524531
525532
Returns:
526-
mysqlx.FindStatement: FindStatement object.
533+
mysqlx.ReadStatement: ReadStatement object.
527534
"""
528535
self._having(condition)
529536
return self
@@ -532,12 +539,38 @@ def execute(self):
532539
"""Execute the statement.
533540
534541
Returns:
535-
mysqlx.DocResult: DocResult object.
542+
mysqlx.Result: Result object.
536543
"""
537544
return self._connection.find(self)
538545

539546

540-
class SelectStatement(FilterableStatement):
547+
class FindStatement(ReadStatement):
548+
"""A statement document selection on a Collection.
549+
550+
Args:
551+
collection (mysqlx.Collection): The Collection object.
552+
condition (Optional[str]): An optional expression to identify the
553+
documents to be retrieved. If not specified
554+
all the documents will be included on the
555+
result unless a limit is set.
556+
"""
557+
def __init__(self, collection, condition=None):
558+
super(FindStatement, self).__init__(collection, True, condition)
559+
560+
def fields(self, *fields):
561+
"""Sets a document field filter.
562+
563+
Args:
564+
*fields: The string expressions identifying the fields to be
565+
extracted.
566+
567+
Returns:
568+
mysqlx.FindStatement: FindStatement object.
569+
"""
570+
return self._projection(*fields)
571+
572+
573+
class SelectStatement(ReadStatement):
541574
"""A statement for record retrieval operations on a Table.
542575
543576
Args:
@@ -560,40 +593,6 @@ def order_by(self, *clauses):
560593
self.sort(*clauses)
561594
return self
562595

563-
def group_by(self, *fields):
564-
"""Sets a grouping criteria for the resultset.
565-
566-
Args:
567-
*fields: The fields identifying the grouping criteria.
568-
569-
Returns:
570-
mysqlx.SelectStatement: SelectStatement object.
571-
"""
572-
self._group_by(*fields)
573-
return self
574-
575-
def having(self, condition):
576-
"""Sets a condition for records to be considered in agregate function
577-
operations.
578-
579-
Args:
580-
condition (str): A condition on the agregate functions used on the
581-
grouping criteria.
582-
583-
Returns:
584-
mysqlx.SelectStatement: SelectStatement object.
585-
"""
586-
self._having(condition)
587-
return self
588-
589-
def execute(self):
590-
"""Execute the statement.
591-
592-
Returns:
593-
mysqlx.RowResult: RowResult object.
594-
"""
595-
return self._connection.find(self)
596-
597596
def get_sql(self):
598597
where = " WHERE {0}".format(self._where) if self._has_where else ""
599598
group_by = " GROUP BY {0}".format(self._grouping_str) if \
@@ -611,6 +610,7 @@ def get_sql(self):
611610

612611
return stmt
613612

613+
614614
class InsertStatement(Statement):
615615
"""A statement for insert operations on Table.
616616

src/mysqlxpb/mysqlx/protocol/mysqlx_crud.proto

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,14 @@ message UpdateOperation {
122122
// :param order: sort-order in which the rows/document shall be returned in
123123
// :param grouping: column expression list for aggregation (GROUP BY)
124124
// :param grouping_criteria: filter criteria for aggregated groups
125+
// :param locking: perform row locking on matches
125126
// :Returns: :protobuf:msg:`Mysqlx.Resultset::`
126127
message Find {
128+
enum RowLock {
129+
SHARED_LOCK = 1; // Lock matching rows against updates
130+
EXCLUSIVE_LOCK = 2; // Lock matching rows so no other transaction can read or write to it
131+
};
132+
127133
required Collection collection = 2;
128134

129135
optional DataModel data_model = 3;
@@ -134,6 +140,7 @@ message Find {
134140
repeated Order order = 7;
135141
repeated Mysqlx.Expr.Expr grouping = 8;
136142
optional Mysqlx.Expr.Expr grouping_criteria = 9;
143+
optional RowLock locking = 12;
137144
};
138145

139146
// Insert documents/rows into a collection/table

tests/test_mysqlx_crud.py

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,8 @@
2727

2828
import logging
2929
import unittest
30+
import threading
31+
import time
3032

3133
import tests
3234
import mysqlx
@@ -443,6 +445,171 @@ def test_exists_in_database(self):
443445
self.assertTrue(collection.exists_in_database())
444446
self.schema.drop_collection(collection_name)
445447

448+
@unittest.skipIf(tests.MYSQL_VERSION < (8, 0, 3), "Row locks unavailable.")
449+
def test_lock_shared(self):
450+
collection_name = "collection_test"
451+
collection = self.schema.create_collection(collection_name)
452+
result = collection.add({"name": "Fred", "age": 21}).execute()
453+
454+
pause = threading.Event()
455+
locking = threading.Event()
456+
waiting = threading.Event()
457+
458+
errors = []
459+
460+
def client_a(pause, locking, waiting):
461+
sess1 = mysqlx.get_session(self.connect_kwargs)
462+
schema = sess1.get_schema(self.schema_name)
463+
collection = schema.get_collection(collection_name)
464+
465+
sess1.start_transaction()
466+
result = collection.find("name = 'Fred'").lock_shared().execute()
467+
locking.set()
468+
time.sleep(2)
469+
locking.clear()
470+
if waiting.is_set():
471+
errors.append("S-S lock test failure.")
472+
sess1.commit()
473+
return
474+
sess1.commit()
475+
476+
pause.set()
477+
478+
sess1.start_transaction()
479+
result = collection.find("name = 'Fred'").lock_shared().execute()
480+
locking.set()
481+
time.sleep(2)
482+
locking.clear()
483+
if not waiting.is_set():
484+
errors.append("S-X lock test failure.")
485+
sess1.commit()
486+
return
487+
sess1.commit()
488+
489+
def client_b(pause, locking, waiting):
490+
sess1 = mysqlx.get_session(self.connect_kwargs)
491+
schema = sess1.get_schema(self.schema_name)
492+
collection = schema.get_collection(collection_name)
493+
494+
if not locking.wait(2):
495+
return
496+
sess1.start_transaction()
497+
498+
waiting.set()
499+
result = collection.find("name = 'Fred'").lock_shared().execute()
500+
waiting.clear()
501+
502+
sess1.commit()
503+
504+
if not pause.wait(2):
505+
return
506+
507+
if not locking.wait(2):
508+
return
509+
sess1.start_transaction()
510+
waiting.set()
511+
result = collection.find("name = 'Fred'").lock_exclusive().execute()
512+
waiting.clear()
513+
sess1.commit()
514+
515+
client1 = threading.Thread(target=client_a,
516+
args=(pause, locking, waiting,))
517+
client2 = threading.Thread(target=client_b,
518+
args=(pause, locking, waiting,))
519+
520+
client1.start()
521+
client2.start()
522+
523+
client1.join()
524+
client2.join()
525+
526+
self.schema.drop_collection(collection_name)
527+
if errors:
528+
self.fail(errors[0])
529+
530+
@unittest.skipIf(tests.MYSQL_VERSION < (8, 0, 3), "Row locks unavailable.")
531+
def test_lock_exclusive(self):
532+
collection_name = "collection_test"
533+
collection = self.schema.create_collection(collection_name)
534+
result = collection.add({"name": "Fred", "age": 21}).execute()
535+
event = threading.Event()
536+
537+
pause = threading.Event()
538+
locking = threading.Event()
539+
waiting = threading.Event()
540+
541+
errors = []
542+
543+
def client_a(pause, locking, waiting):
544+
sess1 = mysqlx.get_session(self.connect_kwargs)
545+
schema = sess1.get_schema(self.schema_name)
546+
collection = schema.get_collection(collection_name)
547+
548+
sess1.start_transaction()
549+
result = collection.find("name = 'Fred'").lock_exclusive().execute()
550+
locking.set()
551+
time.sleep(2)
552+
locking.clear()
553+
if not waiting.is_set():
554+
sess1.commit()
555+
errors.append("X-X lock test failure.")
556+
return
557+
sess1.commit()
558+
559+
pause.set()
560+
561+
sess1.start_transaction()
562+
result = collection.find("name = 'Fred'").lock_exclusive().execute()
563+
locking.set()
564+
time.sleep(2)
565+
locking.clear()
566+
if not waiting.is_set():
567+
errors.append("X-S lock test failure.")
568+
sess1.commit()
569+
return
570+
sess1.commit()
571+
572+
def client_b(pause, locking, waiting):
573+
sess1 = mysqlx.get_session(self.connect_kwargs)
574+
schema = sess1.get_schema(self.schema_name)
575+
collection = schema.get_collection(collection_name)
576+
577+
if not locking.wait(2):
578+
return
579+
sess1.start_transaction()
580+
581+
waiting.set()
582+
result = collection.find("name = 'Fred'").lock_exclusive().execute()
583+
waiting.clear()
584+
585+
sess1.commit()
586+
587+
if not pause.wait(2):
588+
return
589+
590+
if not locking.wait(2):
591+
return
592+
sess1.start_transaction()
593+
waiting.set()
594+
result = collection.find("name = 'Fred'").lock_shared().execute()
595+
waiting.clear()
596+
sess1.commit()
597+
598+
client1 = threading.Thread(target=client_a,
599+
args=(pause, locking, waiting,))
600+
client2 = threading.Thread(target=client_b,
601+
args=(pause, locking, waiting,))
602+
603+
client1.start()
604+
client2.start()
605+
606+
client1.join()
607+
client2.join()
608+
609+
self.schema.drop_collection(collection_name)
610+
if errors:
611+
self.fail(errors[0])
612+
446613
def test_add(self):
447614
collection_name = "collection_test"
448615
collection = self.schema.create_collection(collection_name)

0 commit comments

Comments
 (0)