Skip to content

Commit a18fb56

Browse files
authored
Add API to get/set coordinator node (#489)
1 parent e18dd4a commit a18fb56

File tree

8 files changed

+92
-1
lines changed

8 files changed

+92
-1
lines changed

include/cassandra.h

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -227,6 +227,14 @@ typedef struct CassResult_ CassResult;
227227
*/
228228
typedef struct CassErrorResult_ CassErrorResult;
229229

230+
231+
/**
232+
* An object that represents a cluster node.
233+
*
234+
* @struct CassNode
235+
*/
236+
typedef struct CassNode_ CassNode;
237+
230238
/**
231239
* An object used to iterate over a group of rows, columns or collection values.
232240
*
@@ -5002,6 +5010,21 @@ cass_future_custom_payload_item(CassFuture* future,
50025010
const cass_byte_t** value,
50035011
size_t* value_size);
50045012

5013+
/**
5014+
* Gets the node that acted as coordinator for this query. If the future is not
5015+
* ready this method will wait for the future to be set.
5016+
*
5017+
* @param future
5018+
* @return The coordinator node that handled the query. The lifetime of this
5019+
* object is the same as the result object it came from. NULL can be returned
5020+
* if the future is not a response future or if an error occurs before a
5021+
* coordinator responds.
5022+
*
5023+
* @see cass_statement_set_node()
5024+
*/
5025+
CASS_EXPORT const CassNode*
5026+
cass_future_coordinator(CassFuture* future);
5027+
50055028
/***********************************************************************************
50065029
*
50075030
* Statement
@@ -5395,6 +5418,21 @@ cass_statement_set_host_inet(CassStatement* statement,
53955418
const CassInet* host,
53965419
int port);
53975420

5421+
/**
5422+
* Same as cass_statement_set_host(), but using the `CassNode` type. This can
5423+
* be used to re-query the same coordinator when used with the result of
5424+
* `cass_future_coordinator()`
5425+
*
5426+
* @param statement
5427+
* @param address
5428+
* @return CASS_OK if successful, otherwise an error occurred.
5429+
*
5430+
* @see cass_future_coordinator()
5431+
*/
5432+
CASS_EXPORT CassError
5433+
cass_statement_set_node(CassStatement* statement,
5434+
const CassNode* node);
5435+
53985436
/**
53995437
* Binds null to a query or bound statement at the specified index.
54005438
*

src/address.hpp

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
#include "allocated.hpp"
2121
#include "callback.hpp"
2222
#include "dense_hash_set.hpp"
23+
#include "external.hpp"
2324
#include "string.hpp"
2425
#include "vector.hpp"
2526

@@ -168,4 +169,6 @@ inline std::ostream& operator<<(std::ostream& os, const datastax::internal::core
168169

169170
} // namespace std
170171

172+
EXTERNAL_TYPE(datastax::internal::core::Address, CassNode)
173+
171174
#endif

src/future.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -163,6 +163,14 @@ CassError cass_future_custom_payload_item(CassFuture* future, size_t index, cons
163163
return CASS_OK;
164164
}
165165

166+
const CassNode* cass_future_coordinator(CassFuture* future) {
167+
if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
168+
return NULL;
169+
}
170+
const Address& node = static_cast<ResponseFuture*>(future->from())->address();
171+
return node.is_valid() ? CassNode::to(&node) : NULL;
172+
}
173+
166174
} // extern "C"
167175

168176
bool Future::set_callback(Future::Callback callback, void* data) {

src/request_handler.hpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class ResponseFuture : public Future {
119119
return false;
120120
}
121121

122-
Address address() {
122+
const Address& address() {
123123
ScopedMutex lock(&mutex_);
124124
internal_wait(lock);
125125
return address_;

src/statement.cpp

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -171,6 +171,14 @@ CassError cass_statement_set_host_inet(CassStatement* statement, const CassInet*
171171
return CASS_OK;
172172
}
173173

174+
CassError cass_statement_set_node(CassStatement* statement, const CassNode* node) {
175+
if (node == NULL) {
176+
return CASS_ERROR_LIB_BAD_PARAMS;
177+
}
178+
statement->set_host(*node->from());
179+
return CASS_OK;
180+
}
181+
174182
#define CASS_STATEMENT_BIND(Name, Params, Value) \
175183
CassError cass_statement_bind_##Name(CassStatement* statement, size_t index Params) { \
176184
return statement->set(index, Value); \

tests/src/integration/objects/result.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -220,6 +220,8 @@ class Result : public Object<const CassResult, cass_result_free> {
220220
return std::string(token, token_length);
221221
}
222222

223+
const CassNode* coordinator() { return cass_future_coordinator(future_.get()); }
224+
223225
private:
224226
/**
225227
* Future wrapped object

tests/src/integration/objects/statement.hpp

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -260,6 +260,13 @@ class Statement : public Object<CassStatement, cass_statement_free> {
260260
ASSERT_EQ(CASS_OK, cass_statement_set_host_inet(get(), host, port));
261261
}
262262

263+
/**
264+
* Set node to run statement on use `CassNode` type.
265+
*
266+
* @param node
267+
*/
268+
void set_node(const CassNode* node) { ASSERT_EQ(CASS_OK, cass_statement_set_node(get(), node)); }
269+
263270
/**
264271
* Set the paging size for the statement.
265272
*

tests/src/integration/tests/test_statement.cpp

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,31 @@ CASSANDRA_INTEGRATION_TEST_F(StatementTests, SetHostInet) {
6868
}
6969
}
7070

71+
/**
72+
* Set node on a statement and verify that query goes to the correct node.
73+
*
74+
* @test_category configuration
75+
* @expected_result The local "rpc_address" matches a second query to the same
76+
* coordinator.
77+
*/
78+
CASSANDRA_INTEGRATION_TEST_F(StatementTests, SetNode) {
79+
CHECK_FAILURE;
80+
81+
Statement statement("SELECT rpc_address FROM system.local");
82+
Result result1 = session_.execute(statement);
83+
Inet rpc_address1 = result1.first_row().column_by_name<Inet>("rpc_address");
84+
const CassNode* node = result1.coordinator();
85+
ASSERT_TRUE(node != NULL);
86+
87+
statement.set_node(node);
88+
89+
for (int i = 0; i < 4; ++i) {
90+
Result result2 = session_.execute(statement);
91+
Inet rpc_address2 = result1.first_row().column_by_name<Inet>("rpc_address");
92+
ASSERT_EQ(rpc_address1, rpc_address2);
93+
}
94+
}
95+
7196
/**
7297
* Set a host on a statement that has an invalid port.
7398
*

0 commit comments

Comments
 (0)