Skip to content

Commit 13b27eb

Browse files
author
Bogdan Degtyariov
committed
Added CDK support for UPSERT command
1 parent dde41ce commit 13b27eb

File tree

9 files changed

+156
-12
lines changed

9 files changed

+156
-12
lines changed

cdk/core/tests/session_crud-t.cc

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -807,7 +807,6 @@ class Path
807807

808808
// =====================================================================
809809

810-
811810
CRUD_TEST_BEGIN(find)
812811
{
813812
Session sess(this);
@@ -1824,3 +1823,36 @@ CRUD_TEST_BEGIN(views)
18241823
cout <<"Done!" <<endl;
18251824
}
18261825
CRUD_TEST_END
1826+
1827+
1828+
CRUD_TEST_BEGIN(upsert)
1829+
{
1830+
Session sess(this);
1831+
SKIP_IF_SERVER_VERSION_LESS(sess, 8, 0, 3)
1832+
1833+
Doc_list doc_list;
1834+
doc_list.add(1, "coo", 10);
1835+
doc_list.add(2, "roo", 20);
1836+
doc_list.add(3, "moo", 30);
1837+
1838+
Reply r = sess.coll_add(coll, doc_list, &doc_list.params());
1839+
r.wait();
1840+
1841+
Doc_list upsert_list;
1842+
upsert_list.add(1, "zoo", 40);
1843+
r = sess.coll_add(coll, upsert_list,
1844+
&upsert_list.params(), true);
1845+
r.wait();
1846+
EXPECT_EQ(0, r.entry_count());
1847+
1848+
Doc_list no_upsert_list;
1849+
no_upsert_list.add(1, "noo", 50);
1850+
r = sess.coll_add(coll, no_upsert_list,
1851+
&no_upsert_list.params(), false);
1852+
r.wait();
1853+
EXPECT_EQ(1, r.entry_count());
1854+
1855+
cout << "Done!" << endl;
1856+
1857+
}
1858+
CRUD_TEST_END

cdk/core/tests/test.h

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,10 +61,88 @@ class Core_test
6161
Session(Core_test *fixture)
6262
: cdk::Session(fixture->get_ds(), fixture->get_opts())
6363
{}
64+
65+
bool is_server_version_less(int test_upper_version,
66+
int test_lower_version,
67+
int test_release_version)
68+
{
69+
Reply reply(sql("SELECT VERSION()"));
70+
reply.wait();
71+
Cursor cursor(reply);
72+
73+
struct : cdk::Row_processor
74+
{
75+
cdk::string version;
76+
77+
virtual bool row_begin(row_count_t row)
78+
{ return true; }
79+
80+
virtual void row_end(row_count_t row)
81+
{}
82+
83+
virtual void field_null(col_count_t pos)
84+
{}
85+
86+
virtual size_t field_begin(col_count_t pos, size_t)
87+
{ return SIZE_MAX; }
88+
89+
size_t field_data(col_count_t pos, bytes data)
90+
{
91+
cdk::foundation::Codec<cdk::foundation::Type::STRING> codec;
92+
// Trim trailing \0
93+
bytes d1(data.begin(), data.end() - 1);
94+
codec.from_bytes(d1, version);
95+
return 0;
96+
}
97+
98+
virtual void field_end(col_count_t /*pos*/)
99+
{}
100+
101+
virtual void end_of_data()
102+
{}
103+
}
104+
prc;
105+
106+
cursor.get_rows(prc);
107+
cursor.wait();
108+
109+
std::stringstream version;
110+
version << std::string(prc.version);
111+
112+
int upper_version, minor_version, release_version;
113+
char sep;
114+
version >> upper_version;
115+
version >> sep;
116+
version >> minor_version;
117+
version >> sep;
118+
version >> release_version;
119+
120+
if ((upper_version < test_upper_version) ||
121+
(upper_version == test_upper_version &&
122+
minor_version << test_lower_version) ||
123+
(upper_version == test_upper_version &&
124+
minor_version == test_lower_version &&
125+
release_version < test_release_version))
126+
{
127+
return true;
128+
}
129+
return false;
130+
}
131+
64132
};
65133

66134
};
67135

136+
#define SKIP_IF_SERVER_VERSION_LESS(CDK_SESS, x,y,z)\
137+
if (CDK_SESS.is_server_version_less(x, y, z)) \
138+
{\
139+
std::cerr <<"SKIPPED: " << \
140+
"Server version not supported (" \
141+
<< x << "." << y <<"." << ")" << z <<std::endl; \
142+
return; \
143+
}
144+
145+
68146
template <class X>
69147
struct Helper
70148
{

cdk/include/mysql/cdk/mysqlx/session.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -522,7 +522,8 @@ class Session
522522

523523
Reply_init &coll_add(const Table_ref&,
524524
Doc_source&,
525-
const Param_source *param = NULL);
525+
const Param_source *param = NULL,
526+
bool upsert = false);
526527

527528
Reply_init &coll_remove(const Table_ref&,
528529
const Expression *expr = NULL,

cdk/include/mysql/cdk/protocol/mysqlx.h

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -440,7 +440,7 @@ struct Protocol_fields
440440
Enum values will be used as binary flags,
441441
so they must be as 2^N
442442
*/
443-
enum value { ROW_LOCKING = 1 /*, NEXT = 2, NEXT_NEXT = 4*/ };
443+
enum value { ROW_LOCKING = 1 , UPSERT = 2 };
444444
};
445445

446446
} // api namespace
@@ -566,12 +566,19 @@ class Protocol
566566
567567
@param args defines values of named parameters, if any are used in the
568568
expressions of the row source object
569+
570+
@param upsert Can be set true only in the document mode -- in that case
571+
an upsert variant of the Insert command is sent. If inserted document has
572+
the same id as an existing document in the collection, the upsert variant
573+
replaces the document in the collection with the new one. Without upsert
574+
flag such situation leads to error.
569575
*/
570576

571577
Op& snd_Insert(Data_model dm, api::Db_obj &obj,
572578
const api::Columns *columns,
573579
Row_source &data,
574-
const api::Args_map *args = NULL);
580+
const api::Args_map *args = NULL,
581+
bool upsert = false);
575582

576583
/**
577584
Send CRUD Update command.

cdk/include/mysql/cdk/session.h

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -225,15 +225,26 @@ class Session
225225
Documents to be inserted are given by a Doc_source object which is
226226
a sequence of expressions, each expression describing a single document.
227227
Note that a document can be represented as a JSON blob or as a structured
228-
document expression.
228+
document expression. In the latter case this expression can contain named
229+
parameters -- the values for these parameters are given by the `param`
230+
argument describing a key-value dictionary.
231+
232+
If `upsert` flag is set and a document being added has the same id as
233+
a document already present in the collection, the existing document is
234+
replaced by the new one. Otherwise, if `upsert` flag is false (the default)
235+
an error is reported if a document being added conflicts with an exisiting
236+
document in the collection.
229237
230238
Note: Server requires that inserted documents contain "_id" field with
231239
unique document id.
232240
*/
233241

234-
Reply_init coll_add(const api::Object_ref &coll, Doc_source &docs, const Param_source *param)
242+
Reply_init coll_add(const api::Object_ref &coll,
243+
Doc_source &docs,
244+
const Param_source *param,
245+
bool upsert = false)
235246
{
236-
return m_session->coll_add(coll, docs, param);
247+
return m_session->coll_add(coll, docs, param, upsert);
237248
}
238249

239250
/**

cdk/mysqlx/delayed_op.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ class SndInsertDocs
189189

190190
cdk::Doc_source &m_docs;
191191
const Param_source *m_param;
192+
bool m_upsert;
192193

193194
Proto_op* start()
194195
{
@@ -201,17 +202,20 @@ class SndInsertDocs
201202
*this,
202203
NULL,
203204
*this,
204-
&param_conv);
205+
&param_conv,
206+
m_upsert);
205207
}
206208

207209
public:
208210

209211
SndInsertDocs(Protocol& protocol, const api::Table_ref &coll,
210212
cdk::Doc_source &docs,
211-
const Param_source *param)
213+
const Param_source *param,
214+
bool upsert = false)
212215
: Crud_op_base(protocol, coll)
213216
, m_docs(docs)
214217
, m_param(param)
218+
, m_upsert(upsert)
215219
{}
216220

217221
private:

cdk/mysqlx/session.cc

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,10 @@ struct Proto_field_checker : public cdk::protocol::mysqlx::api::Expectations
8484
// Find(17) locking(12)
8585
m_data = bytes("17.12");
8686
break;
87+
case Protocol_fields::UPSERT:
88+
// Insert(18) upsert(6)
89+
m_data = bytes("18.6");
90+
break;
8791
default:
8892
return 0;
8993
}
@@ -373,6 +377,7 @@ void Session::check_protocol_fields()
373377
m_proto_fields = 0;
374378
/* More fields checks will be added here */
375379
m_proto_fields |= field_checker.is_supported(Protocol_fields::ROW_LOCKING);
380+
m_proto_fields |= field_checker.is_supported(Protocol_fields::UPSERT);
376381
}
377382
}
378383

@@ -467,10 +472,11 @@ void Session::rollback()
467472

468473
Reply_init& Session::coll_add(const Table_ref &coll,
469474
Doc_source &docs,
470-
const Param_source *param)
475+
const Param_source *param,
476+
bool upsert)
471477
{
472478
return set_command(
473-
new SndInsertDocs(m_protocol, coll, docs, param)
479+
new SndInsertDocs(m_protocol, coll, docs, param, upsert)
474480
);
475481
}
476482

cdk/protocol/mysqlx/crud.cc

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -555,7 +555,8 @@ Protocol::snd_Insert(
555555
api::Db_obj &db_obj,
556556
const api::Columns *columns,
557557
Row_source &rs,
558-
const api::Args_map *args)
558+
const api::Args_map *args,
559+
bool upsert)
559560
{
560561
Mysqlx::Crud::Insert insert;
561562

@@ -584,6 +585,8 @@ Protocol::snd_Insert(
584585
rs.process(row_builder);
585586
}
586587

588+
insert.set_upsert(upsert);
589+
587590
return get_impl().snd_start(insert, msg_type::cli_CrudInsert);
588591
}
589592

devapi/tests/crud-t.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2209,6 +2209,8 @@ TEST_F(Crud, single_document)
22092209
{
22102210
SKIP_IF_NO_XPLUGIN;
22112211

2212+
SKIP_IF_SERVER_VERSION_LESS(8, 0, 2);
2213+
22122214
cout << "Creating session..." << endl;
22132215

22142216
Session sess(this);

0 commit comments

Comments
 (0)