Skip to content

Commit 4350f83

Browse files
committed
Merge branch 'wl10981-single-crud'
# Conflicts: # cdk/include/mysql/cdk/protocol/mysqlx.h # cdk/mysqlx/session.cc # devapi/tests/crud-t.cc # xapi/tests/xapi_crud-t.cc
2 parents d97a53c + 18567cd commit 4350f83

File tree

14 files changed

+400
-16
lines changed

14 files changed

+400
-16
lines changed

cdk/core/tests/session_crud-t.cc

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,6 @@ class Path
807807

808808
// =====================================================================
809809

810-
811810
CRUD_TEST_BEGIN(find)
812811
{
813812
Session sess(this);
@@ -1824,3 +1823,36 @@ CRUD_TEST_BEGIN(views)
18241823
cout <<"Done!" <<endl;
18251824
}
18261825
CRUD_TEST_END
1826+
1827+
1828+
CRUD_TEST_BEGIN(upsert)
1829+
{
1830+
Session sess(this);
1831+
SKIP_IF_SERVER_VERSION_LESS(sess, 8, 0, 3)
1832+
1833+
Doc_list doc_list;
1834+
doc_list.add(1, "coo", 10);
1835+
doc_list.add(2, "roo", 20);
1836+
doc_list.add(3, "moo", 30);
1837+
1838+
Reply r = sess.coll_add(coll, doc_list, &doc_list.params());
1839+
r.wait();
1840+
1841+
Doc_list upsert_list;
1842+
upsert_list.add(1, "zoo", 40);
1843+
r = sess.coll_add(coll, upsert_list,
1844+
&upsert_list.params(), true);
1845+
r.wait();
1846+
EXPECT_EQ(0, r.entry_count());
1847+
1848+
Doc_list no_upsert_list;
1849+
no_upsert_list.add(1, "noo", 50);
1850+
r = sess.coll_add(coll, no_upsert_list,
1851+
&no_upsert_list.params(), false);
1852+
r.wait();
1853+
EXPECT_EQ(1, r.entry_count());
1854+
1855+
cout << "Done!" << endl;
1856+
1857+
}
1858+
CRUD_TEST_END

cdk/core/tests/test.h

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,88 @@ class Core_test
6161
Session(Core_test *fixture)
6262
: cdk::Session(fixture->get_ds(), fixture->get_opts())
6363
{}
64+
65+
bool is_server_version_less(int test_upper_version,
66+
int test_lower_version,
67+
int test_release_version)
68+
{
69+
Reply reply(sql("SELECT VERSION()"));
70+
reply.wait();
71+
Cursor cursor(reply);
72+
73+
struct : cdk::Row_processor
74+
{
75+
cdk::string version;
76+
77+
virtual bool row_begin(row_count_t row)
78+
{ return true; }
79+
80+
virtual void row_end(row_count_t row)
81+
{}
82+
83+
virtual void field_null(col_count_t pos)
84+
{}
85+
86+
virtual size_t field_begin(col_count_t pos, size_t)
87+
{ return SIZE_MAX; }
88+
89+
size_t field_data(col_count_t pos, bytes data)
90+
{
91+
cdk::foundation::Codec<cdk::foundation::Type::STRING> codec;
92+
// Trim trailing \0
93+
bytes d1(data.begin(), data.end() - 1);
94+
codec.from_bytes(d1, version);
95+
return 0;
96+
}
97+
98+
virtual void field_end(col_count_t /*pos*/)
99+
{}
100+
101+
virtual void end_of_data()
102+
{}
103+
}
104+
prc;
105+
106+
cursor.get_rows(prc);
107+
cursor.wait();
108+
109+
std::stringstream version;
110+
version << std::string(prc.version);
111+
112+
int upper_version, minor_version, release_version;
113+
char sep;
114+
version >> upper_version;
115+
version >> sep;
116+
version >> minor_version;
117+
version >> sep;
118+
version >> release_version;
119+
120+
if ((upper_version < test_upper_version) ||
121+
(upper_version == test_upper_version &&
122+
minor_version << test_lower_version) ||
123+
(upper_version == test_upper_version &&
124+
minor_version == test_lower_version &&
125+
release_version < test_release_version))
126+
{
127+
return true;
128+
}
129+
return false;
130+
}
131+
64132
};
65133

66134
};
67135

136+
#define SKIP_IF_SERVER_VERSION_LESS(CDK_SESS, x,y,z)\
137+
if (CDK_SESS.is_server_version_less(x, y, z)) \
138+
{\
139+
std::cerr <<"SKIPPED: " << \
140+
"Server version not supported (" \
141+
<< x << "." << y <<"." << ")" << z <<std::endl; \
142+
return; \
143+
}
144+
145+
68146
template <class X>
69147
struct Helper
70148
{

cdk/include/mysql/cdk/mysqlx/session.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,8 @@ class Session
522522

523523
Reply_init &coll_add(const Table_ref&,
524524
Doc_source&,
525-
const Param_source *param = NULL);
525+
const Param_source *param = NULL,
526+
bool upsert = false);
526527

527528
Reply_init &coll_remove(const Table_ref&,
528529
const Expression *expr = NULL,

cdk/include/mysql/cdk/protocol/mysqlx.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ struct Protocol_fields
440440
Enum values will be used as binary flags,
441441
so they must be as 2^N
442442
*/
443-
enum value { ROW_LOCKING = 1 /*, NEXT = 2, NEXT_NEXT = 4*/ };
443+
enum value { ROW_LOCKING = 1 , UPSERT = 2 };
444444
};
445445

446446
} // api namespace
@@ -566,12 +566,19 @@ class Protocol
566566
567567
@param args defines values of named parameters, if any are used in the
568568
expressions of the row source object
569+
570+
@param upsert Can be set true only in the document mode -- in that case
571+
an upsert variant of the Insert command is sent. If inserted document has
572+
the same id as an existing document in the collection, the upsert variant
573+
replaces the document in the collection with the new one. Without upsert
574+
flag such situation leads to error.
569575
*/
570576

571577
Op& snd_Insert(Data_model dm, api::Db_obj &obj,
572578
const api::Columns *columns,
573579
Row_source &data,
574-
const api::Args_map *args = NULL);
580+
const api::Args_map *args = NULL,
581+
bool upsert = false);
575582

576583
/**
577584
Send CRUD Update command.

cdk/include/mysql/cdk/session.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,15 +225,26 @@ class Session
225225
Documents to be inserted are given by a Doc_source object which is
226226
a sequence of expressions, each expression describing a single document.
227227
Note that a document can be represented as a JSON blob or as a structured
228-
document expression.
228+
document expression. In the latter case this expression can contain named
229+
parameters -- the values for these parameters are given by the `param`
230+
argument describing a key-value dictionary.
231+
232+
If `upsert` flag is set and a document being added has the same id as
233+
a document already present in the collection, the existing document is
234+
replaced by the new one. Otherwise, if `upsert` flag is false (the default)
235+
an error is reported if a document being added conflicts with an exisiting
236+
document in the collection.
229237
230238
Note: Server requires that inserted documents contain "_id" field with
231239
unique document id.
232240
*/
233241

234-
Reply_init coll_add(const api::Object_ref &coll, Doc_source &docs, const Param_source *param)
242+
Reply_init coll_add(const api::Object_ref &coll,
243+
Doc_source &docs,
244+
const Param_source *param,
245+
bool upsert = false)
235246
{
236-
return m_session->coll_add(coll, docs, param);
247+
return m_session->coll_add(coll, docs, param, upsert);
237248
}
238249

239250
/**

cdk/mysqlx/delayed_op.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ class SndInsertDocs
189189

190190
cdk::Doc_source &m_docs;
191191
const Param_source *m_param;
192+
bool m_upsert;
192193

193194
Proto_op* start()
194195
{
@@ -201,17 +202,20 @@ class SndInsertDocs
201202
*this,
202203
NULL,
203204
*this,
204-
&param_conv);
205+
&param_conv,
206+
m_upsert);
205207
}
206208

207209
public:
208210

209211
SndInsertDocs(Protocol& protocol, const api::Table_ref &coll,
210212
cdk::Doc_source &docs,
211-
const Param_source *param)
213+
const Param_source *param,
214+
bool upsert = false)
212215
: Crud_op_base(protocol, coll)
213216
, m_docs(docs)
214217
, m_param(param)
218+
, m_upsert(upsert)
215219
{}
216220

217221
private:

cdk/mysqlx/session.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ struct Proto_field_checker : public cdk::protocol::mysqlx::api::Expectations
8484
// Find(17) locking(12)
8585
m_data = bytes("17.12");
8686
break;
87+
case Protocol_fields::UPSERT:
88+
// Insert(18) upsert(6)
89+
m_data = bytes("18.6");
90+
break;
8791
default:
8892
return 0;
8993
}
@@ -373,6 +377,7 @@ void Session::check_protocol_fields()
373377
m_proto_fields = 0;
374378
/* More fields checks will be added here */
375379
m_proto_fields |= field_checker.is_supported(Protocol_fields::ROW_LOCKING);
380+
m_proto_fields |= field_checker.is_supported(Protocol_fields::UPSERT);
376381
}
377382
}
378383

@@ -467,10 +472,11 @@ void Session::rollback()
467472

468473
Reply_init& Session::coll_add(const Table_ref &coll,
469474
Doc_source &docs,
470-
const Param_source *param)
475+
const Param_source *param,
476+
bool upsert)
471477
{
472478
return set_command(
473-
new SndInsertDocs(m_protocol, coll, docs, param)
479+
new SndInsertDocs(m_protocol, coll, docs, param, upsert)
474480
);
475481
}
476482

cdk/protocol/mysqlx/crud.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,8 @@ Protocol::snd_Insert(
555555
api::Db_obj &db_obj,
556556
const api::Columns *columns,
557557
Row_source &rs,
558-
const api::Args_map *args)
558+
const api::Args_map *args,
559+
bool upsert)
559560
{
560561
Mysqlx::Crud::Insert insert;
561562

@@ -584,6 +585,8 @@ Protocol::snd_Insert(
584585
rs.process(row_builder);
585586
}
586587

588+
insert.set_upsert(upsert);
589+
587590
return get_impl().snd_start(insert, msg_type::cli_CrudInsert);
588591
}
589592

devapi/collection_crud.cc

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -85,13 +85,15 @@ class Op_collection_add
8585
std::vector<mysqlx::GUID> m_id_list;
8686
bool m_generated_id;
8787
unsigned m_pos;
88+
bool m_upsert = false;
8889

8990

90-
Op_collection_add(Collection &coll)
91+
Op_collection_add(Collection &coll, bool upsert = false)
9192
: Op_base(coll)
9293
, m_coll(coll)
9394
, m_generated_id(true)
9495
, m_pos(0)
96+
, m_upsert(upsert)
9597
{}
9698

9799
Executable_impl* clone() const override
@@ -116,7 +118,8 @@ class Op_collection_add
116118
// Issue coll_add statement where documents are described by list
117119
// of expressions defined by this instance.
118120

119-
return new cdk::Reply(get_cdk_session().coll_add(m_coll, *this, NULL));
121+
return new cdk::Reply(get_cdk_session()
122+
.coll_add(m_coll, *this, NULL, m_upsert));
120123
}
121124

122125

@@ -214,6 +217,7 @@ class Op_collection_add
214217

215218

216219
friend mysqlx::CollectionAdd;
220+
friend mysqlx::internal::CollectionReplace;
217221
};
218222

219223

@@ -546,6 +550,14 @@ class Op_collection_modify
546550
add_where(expr);
547551
}
548552

553+
Op_collection_modify(const Op_collection_modify &other)
554+
: Op_select(other)
555+
, m_coll(other.m_coll)
556+
, m_update(other.m_update)
557+
{
558+
m_update_it = m_update.end();
559+
}
560+
549561
Executable_impl* clone() const override
550562
{
551563
return new Op_collection_modify(*this);
@@ -640,6 +652,7 @@ class Op_collection_modify
640652
}
641653

642654
friend mysqlx::CollectionModify;
655+
friend mysqlx::internal::CollectionReplace;
643656
};
644657

645658

@@ -653,3 +666,50 @@ CollectionModify::CollectionModify(
653666
}
654667
CATCH_AND_WRAP
655668
}
669+
670+
internal::CollectionReplace::CollectionReplace(Collection &coll,
671+
const mysqlx::string &id,
672+
mysqlx::internal::ExprValue &&val,
673+
bool upsert)
674+
{
675+
try
676+
{
677+
if (upsert)
678+
{
679+
Op_collection_add *add_impl = new Op_collection_add(coll, upsert);
680+
reset(add_impl);
681+
string str = val;
682+
str.erase(str.rfind('}'));
683+
684+
std::stringstream sstream;
685+
// If ':' is present in the string the document is not empty
686+
if (str.find(':'))
687+
sstream << ", ";
688+
// It is safe to append the _id at the end
689+
sstream << "\"_id\": \"" << id << "\"}";
690+
str.append((string)sstream.str());
691+
add_impl->add_json(str);
692+
}
693+
else
694+
{
695+
reset(new Op_collection_modify(coll, "_id = :id"));
696+
697+
if (val.isExpression() || val.getType() != Value::STRING)
698+
{
699+
get_impl()->add_operation(Op_collection_modify::SET,
700+
"$",
701+
std::move(val));
702+
}
703+
else
704+
{
705+
get_impl()->add_operation(Op_collection_modify::SET,
706+
"$",
707+
internal::expr(std::move(val)));
708+
}
709+
710+
get_impl()->add_param("id", id);
711+
}
712+
}
713+
CATCH_AND_WRAP
714+
}
715+

0 commit comments

Comments
 (0)