Skip to content

Commit fc2c541

Browse files
committed
WL10975: Add Single document operations
These are commands that operate at a single document level, unlike the other CRUD commands that operate on all documents that match a filter. This feature adds 4 methods to `Collection`: 1. `Collection.replace_one` replaces a document identified with a document_id with the provided document. 2. `Collection.add_or_replace_one` does the same as replace_one, but this adds a new document if the document_id doesn't exist. 3. `Collection.get_one` returns a document matching a document_id. 4. `Collection.remove_one` removes a document matching a document_id. This patch also adds the upsert(val=True) method chainable to Table.insert() and Collection.add(). By default upsert is set to False. Setting this to True replaces the row matched with the provided key with the values provided. Tests have been added for regression.
1 parent d41a6f2 commit fc2c541

File tree

8 files changed

+215
-55
lines changed

8 files changed

+215
-55
lines changed

lib/mysqlx/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@
4545
CreateCollectionIndexStatement, CreateTableStatement,
4646
CreateViewStatement, AlterViewStatement, ColumnDef,
4747
GeneratedColumnDef, ForeignKeyDef, Expr,
48-
ReadStatement)
48+
ReadStatement, WriteStatement)
4949

5050
_SPLIT = re.compile(r',(?![^\(\)]*\))')
5151
_PRIORITY = re.compile(r'^\(address=(.+),priority=(\d+)\)$', re.VERBOSE)

lib/mysqlx/crud.py

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323

2424
"""Implementation of the CRUD database objects."""
2525

26+
from .dbdoc import DbDoc
2627
from .errors import ProgrammingError
2728
from .statement import (FindStatement, AddStatement, RemoveStatement,
2829
ModifyStatement, SelectStatement, InsertStatement,
@@ -409,6 +410,46 @@ def drop_index(self, index_name):
409410
self._connection.execute_nonquery("xplugin", "drop_collection_index",
410411
False, self._schema.name, self._name, index_name)
411412

413+
def replace_one(self, doc_id, doc):
414+
"""Replaces the Document matching the document ID with a
415+
new document provided.
416+
417+
Args:
418+
doc_id (str): Document ID
419+
doc (DbDoc/dict): New Document
420+
"""
421+
return self.modify("_id = :id").set("$", doc) \
422+
.bind("id", doc_id).execute()
423+
424+
def add_or_replace_one(self, doc_id, doc):
425+
"""Upserts the Document matching the document ID with a
426+
new document provided.
427+
428+
Args:
429+
doc_id (str): Document ID
430+
doc (DbDoc/dict): New Document
431+
"""
432+
if not isinstance(doc, DbDoc):
433+
doc = DbDoc(doc)
434+
doc.ensure_id(doc_id)
435+
return self.add(doc).upsert(True).execute()
436+
437+
def get_one(self, doc_id):
438+
"""Returns a Document matching the Document ID.
439+
440+
Args:
441+
doc_id (str): Document ID
442+
"""
443+
return self.find("_id = :id").bind("id", doc_id).execute().fetch_one()
444+
445+
def remove_one(self, doc_id):
446+
"""Removes a Document matching the Document ID.
447+
448+
Args:
449+
doc_id (str): Document ID
450+
"""
451+
return self.remove("_id = :id").bind("id", doc_id).execute()
452+
412453

413454
class Table(DatabaseObject):
414455
"""Represents a database table on a schema.

lib/mysqlx/dbdoc.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import uuid
2828

2929
from .compat import STRING_TYPES
30+
from .errors import ProgrammingError
3031

3132

3233
class DbDoc(object):
@@ -47,14 +48,21 @@ def __init__(self, value):
4748
else:
4849
raise ValueError("Unable to handle type: {0}".format(type(value)))
4950

51+
def __setitem__(self, index, value):
52+
if index == "_id":
53+
raise ProgrammingError("Cannot modify _id")
54+
self.__dict__[index] = value
55+
5056
def __getitem__(self, index):
5157
return self.__dict__[index]
5258

5359
def keys(self):
5460
return self.__dict__.keys()
5561

56-
def ensure_id(self):
57-
if "_id" not in self.__dict__:
62+
def ensure_id(self, doc_id=None):
63+
if doc_id:
64+
self.__dict__["_id"] = doc_id
65+
elif "_id" not in self.__dict__:
5866
uuid1 = str(uuid.uuid1()).upper().split("-")
5967
uuid1.reverse()
6068
self.__dict__["_id"] = "".join(uuid1)

lib/mysqlx/expr.py

Lines changed: 47 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,9 @@
2121
# along with this program; if not, write to the Free Software
2222
# Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA
2323

24-
from .compat import STRING_TYPES, PY3
24+
from .compat import STRING_TYPES, BYTE_TYPES, UNICODE_TYPES, PY3
2525
from .helpers import get_item_or_attr
26+
from .dbdoc import DbDoc
2627
from .protobuf import Message, mysqlxpb_enum
2728

2829

@@ -269,6 +270,51 @@ def __str__(self):
269270

270271
# static protobuf helper functions
271272

273+
def build_expr(value):
274+
msg = Message("Mysqlx.Expr.Expr")
275+
if isinstance(value, (dict, DbDoc)):
276+
msg["type"] = mysqlxpb_enum("Mysqlx.Expr.Expr.Type.OBJECT")
277+
msg["object"] = build_object(value).get_message()
278+
elif isinstance(value, (list, tuple)):
279+
msg["type"] = mysqlxpb_enum("Mysqlx.Expr.Expr.Type.ARRAY")
280+
msg["array"] = build_array(value).get_message()
281+
else:
282+
msg["type"] = mysqlxpb_enum("Mysqlx.Expr.Expr.Type.LITERAL")
283+
msg["literal"] = build_scalar(value).get_message()
284+
return msg
285+
286+
def build_scalar(value):
287+
if isinstance(value, STRING_TYPES):
288+
return build_string_scalar(value)
289+
elif isinstance(value, BYTE_TYPES):
290+
return build_bytes_scalar(value)
291+
elif isinstance(value, bool):
292+
return build_bool_scalar(value)
293+
elif isinstance(value, int):
294+
return build_int_scalar(value)
295+
elif isinstance(value, float):
296+
return build_double_scalar(value)
297+
elif value is None:
298+
return build_null_scalar()
299+
raise ValueError("Unsupported data type: {0}.".format(type(value)))
300+
301+
def build_object(obj):
302+
if isinstance(obj, DbDoc):
303+
return build_object(obj.__dict__)
304+
305+
msg = Message("Mysqlx.Expr.Object")
306+
for key, value in obj.items():
307+
pair = Message("Mysqlx.Expr.Object.ObjectField")
308+
pair["key"] = key.encode() if isinstance(key, UNICODE_TYPES) else key
309+
pair["value"] = build_expr(value).get_message()
310+
msg["fld"].extend([pair.get_message()])
311+
return msg
312+
313+
def build_array(array):
314+
msg = Message("Mysqlx.Expr.Array")
315+
msg["value"].extend([build_expr(value).get_message() for value in array])
316+
return msg
317+
272318
def build_null_scalar():
273319
msg = Message("Mysqlx.Datatypes.Scalar")
274320
msg["type"] = mysqlxpb_enum("Mysqlx.Datatypes.Scalar.Type.V_NULL")

lib/mysqlx/protocol.py

Lines changed: 6 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@
2828
from .compat import STRING_TYPES, INT_TYPES
2929
from .dbdoc import DbDoc
3030
from .errors import InterfaceError, OperationalError, ProgrammingError
31-
from .expr import (ExprParser, build_null_scalar, build_string_scalar,
32-
build_bool_scalar, build_double_scalar, build_int_scalar)
31+
from .expr import ExprParser, build_expr, build_scalar, build_bool_scalar
3332
from .helpers import encode_to_bytes, get_item_or_attr
3433
from .result import ColumnMetaData
3534
from .protobuf import SERVER_MESSAGES, Message, mysqlxpb_enum
@@ -135,8 +134,7 @@ def get_binding_scalars(self, statement):
135134
raise ProgrammingError("Unable to find placeholder for "
136135
"parameter: {0}".format(name))
137136
pos = statement._binding_map[name]
138-
scalars[pos] = self.arg_object_to_scalar(binding["value"],
139-
not statement._doc_based)
137+
scalars[pos] = build_scalar(binding["value"]).get_message()
140138
return scalars
141139

142140
def _apply_filter(self, message, statement):
@@ -193,8 +191,7 @@ def send_update(self, stmt):
193191
operation["operation"] = update_op.update_type
194192
operation["source"] = update_op.source
195193
if update_op.value is not None:
196-
operation["value"] = self.arg_object_to_expr(
197-
update_op.value, not stmt._doc_based)
194+
operation["value"] = build_expr(update_op.value)
198195
msg["operation"].extend([operation.get_message()])
199196

200197
self._writer.write_message(
@@ -241,13 +238,12 @@ def send_insert(self, stmt):
241238
row = Message("Mysqlx.Crud.Insert.TypedRow")
242239
if isinstance(value, list):
243240
for val in value:
244-
obj = self.arg_object_to_expr(val, not stmt._doc_based)
245-
row["field"].extend([obj.get_message()])
241+
row["field"].extend([build_expr(val).get_message()])
246242
else:
247-
obj = self.arg_object_to_expr(value, not stmt._doc_based)
248-
row["field"].extend([obj.get_message()])
243+
row["field"].extend([build_expr(value).get_message()])
249244
msg["row"].extend([row.get_message()])
250245

246+
msg["upsert"] = stmt._upsert
251247
self._writer.write_message(
252248
mysqlxpb_enum("Mysqlx.ClientMessages.Type.CRUD_INSERT"), msg)
253249

@@ -338,39 +334,6 @@ def get_column_metadata(self, rs):
338334
columns.append(col)
339335
return columns
340336

341-
def arg_object_to_expr(self, value, allow_relational):
342-
literal = None
343-
if value is None:
344-
literal = build_null_scalar()
345-
if isinstance(value, bool):
346-
literal = build_bool_scalar(value)
347-
elif isinstance(value, INT_TYPES):
348-
literal = build_int_scalar(value)
349-
elif isinstance(value, (float)):
350-
literal = build_double_scalar(value)
351-
elif isinstance(value, STRING_TYPES):
352-
try:
353-
expression = ExprParser(value, allow_relational).expr()
354-
if expression.has_identifier():
355-
msg = Message("Mysqlx.Expr.Expr",
356-
literal=build_string_scalar(value))
357-
return msg
358-
return expression.serialize_to_string()
359-
except:
360-
literal = build_string_scalar(value)
361-
elif isinstance(value, DbDoc):
362-
literal = build_string_scalar(str(value))
363-
if literal is None:
364-
raise InterfaceError("Unsupported type: {0}".format(type(value)))
365-
366-
msg = Message("Mysqlx.Expr.Expr")
367-
msg["type"] = mysqlxpb_enum("Mysqlx.Expr.Expr.Type.LITERAL")
368-
msg["literal"] = literal
369-
return msg
370-
371-
def arg_object_to_scalar(self, value, allow_relational):
372-
return self.arg_object_to_expr(value, allow_relational).literal
373-
374337
def read_ok(self):
375338
msg = self._reader.read_message()
376339
if msg.type == "Mysqlx.Error":

lib/mysqlx/statement.py

Lines changed: 24 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -306,15 +306,34 @@ def execute(self):
306306
return SqlResult(self._connection)
307307

308308

309-
class AddStatement(Statement):
309+
class WriteStatement(Statement):
310+
"""Provide common write operation attributes
311+
"""
312+
def __init__(self, target, doc_based):
313+
super(WriteStatement, self).__init__(target, doc_based)
314+
self._values = []
315+
self._upsert = False
316+
317+
def upsert(self, val=True):
318+
"""Sets the upset flag to the boolean of the value provided.
319+
Setting of this flag allows updating of the matched rows/documents
320+
with the provided value.
321+
322+
Args:
323+
val (optional[bool]): Set or unset the upsert flag.
324+
"""
325+
self._upsert = val
326+
return self
327+
328+
329+
class AddStatement(WriteStatement):
310330
"""A statement for document addition on a collection.
311331
312332
Args:
313333
collection (mysqlx.Collection): The Collection object.
314334
"""
315335
def __init__(self, collection):
316-
super(AddStatement, self).__init__(target=collection)
317-
self._values = []
336+
super(AddStatement, self).__init__(collection, True)
318337
self._ids = []
319338

320339
def add(self, *values):
@@ -610,18 +629,16 @@ def get_sql(self):
610629

611630
return stmt
612631

613-
614-
class InsertStatement(Statement):
632+
class InsertStatement(WriteStatement):
615633
"""A statement for insert operations on Table.
616634
617635
Args:
618636
table (mysqlx.Table): The Table object.
619637
*fields: The fields to be inserted.
620638
"""
621639
def __init__(self, table, *fields):
622-
super(InsertStatement, self).__init__(target=table, doc_based=False)
640+
super(InsertStatement, self).__init__(table, False)
623641
self._fields = flexible_params(*fields)
624-
self._values = []
625642

626643
def values(self, *values):
627644
"""Set the values to be inserted.

src/mysqlxpb/mysqlx/protocol/mysqlx_crud.proto

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,7 @@ message Find {
150150
// :param projection: name of the columns to insert data into (empty if data_model is DOCUMENT)
151151
// :param row: set of rows to insert into the collection/table (a single expression with a JSON document literal or an OBJECT expression)
152152
// :param args: values for parameters used in row expressions
153+
// :param upsert: true if this should be treated as an Upsert (that is, update on duplicate key)
153154
// :Returns: :protobuf:msg:`Mysqlx.Resultset::`
154155
message Insert {
155156
required Collection collection = 1;
@@ -161,7 +162,8 @@ message Insert {
161162
repeated Mysqlx.Expr.Expr field = 1;
162163
};
163164
repeated TypedRow row = 4;
164-
repeated Mysqlx.Datatypes.Scalar args = 5;
165+
repeated Mysqlx.Datatypes.Scalar args = 5;
166+
optional bool upsert = 6 [default = false];
165167
};
166168

167169
// Update documents/rows in a collection/table

0 commit comments

Comments
 (0)