Skip to content

Commit ad659c0

Browse files
committed
Add raw interface for use with proxies
1 parent 1802d9b commit ad659c0

File tree

7 files changed

+147
-32
lines changed

7 files changed

+147
-32
lines changed

src/connection.cpp

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,11 @@ void Connection::on_read(const char* buf, size_t size) {
254254
host_->address_string().c_str());
255255

256256
if (response->stream() < 0) {
257+
if (!response->decode_response_body(false)) {
258+
LOG_ERROR("Error decoding event response body");
259+
defunct();
260+
continue;
261+
}
257262
if (response->opcode() == CQL_OPCODE_EVENT) {
258263
listener_->on_event(response->response_body());
259264
} else {
@@ -266,6 +271,12 @@ void Connection::on_read(const char* buf, size_t size) {
266271
RequestCallback::Ptr callback;
267272

268273
if (stream_manager_.get(response->stream(), callback)) {
274+
if (!response->decode_response_body(callback->request()->is_raw())) {
275+
LOG_ERROR("Error decoding response body");
276+
defunct();
277+
continue;
278+
}
279+
269280
switch (callback->state()) {
270281
case RequestCallback::REQUEST_STATE_READING:
271282
pending_reads_.remove(callback.get());

src/future.cpp

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,28 @@ const CassResult* cass_future_get_result(CassFuture* future) {
5757
}
5858

5959
Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
60-
if (!response || response->opcode() == CQL_OPCODE_ERROR) {
60+
if (!response || response->opcode() == CQL_OPCODE_ERROR || response->is_raw()) {
6161
return NULL;
6262
}
6363

6464
response->inc_ref();
6565
return CassResult::to(static_cast<ResultResponse*>(response.get()));
6666
}
6767

68+
const CassRawResult* cass_future_get_raw_result(CassFuture* future) {
69+
if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
70+
return NULL;
71+
}
72+
73+
Response::Ptr response(static_cast<ResponseFuture*>(future->from())->response());
74+
if (!response || response->opcode() == CQL_OPCODE_ERROR || !response->is_raw()) {
75+
return NULL;
76+
}
77+
78+
response->inc_ref();
79+
return CassRawResult::to(static_cast<RawResponse*>(response.get()));
80+
}
81+
6882
const CassPrepared* cass_future_get_prepared(CassFuture* future) {
6983
if (future->type() != Future::FUTURE_TYPE_RESPONSE) {
7084
return NULL;

src/request.hpp

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,8 @@ class Request : public RefCounted<Request> {
187187
void set_host(const Address& host) { host_.reset(new Address(host)); }
188188
const Address* host() const { return host_.get(); }
189189

190+
virtual bool is_raw() const { return false; }
191+
190192
virtual int encode(ProtocolVersion version, RequestCallback* callback, BufferVec* bufs) const = 0;
191193

192194
private:
@@ -212,6 +214,24 @@ class RoutableRequest : public Request {
212214
virtual bool get_routing_key(String* routing_key) const = 0;
213215
};
214216

217+
class RawRequest : public Request {
218+
public:
219+
RawRequest(uint8_t opcode, const char* buf, size_t buf_size)
220+
: Request(opcode)
221+
, buf_(buf, buf_size) { }
222+
223+
virtual bool is_raw() const { return true; }
224+
225+
virtual int encode(ProtocolVersion version, RequestCallback* callback, BufferVec* bufs) const {
226+
bufs->push_back(buf_);
227+
return static_cast<int>(buf_.size());
228+
}
229+
230+
private:
231+
Buffer buf_;
232+
};
233+
234+
215235
}}} // namespace datastax::internal::core
216236

217237
EXTERNAL_TYPE(datastax::internal::core::CustomPayload, CassCustomPayload)

src/request_handler.cpp

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -462,6 +462,11 @@ void RequestExecution::on_set(ResponseMessage* response) {
462462
current_host_->decrement_inflight_requests();
463463
Connection* connection = connection_;
464464

465+
if (response->response_body()->is_raw()) {
466+
set_response(response->response_body());
467+
return;
468+
}
469+
465470
switch (response->opcode()) {
466471
case CQL_OPCODE_RESULT:
467472
on_result_response(connection, response);

src/response.cpp

Lines changed: 55 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,18 @@
2828

2929
using namespace datastax::internal::core;
3030

31+
extern "C" {
32+
33+
void cass_raw_result_free(const CassRawResult* result) { result->dec_ref(); }
34+
35+
cass_uint8_t cass_raw_result_opcode(const CassRawResult* result) { return result->opcode(); }
36+
37+
const char* cass_raw_result_frame(const CassRawResult* result) { return result->data(); }
38+
39+
size_t cass_raw_result_frame_length(const CassRawResult* result) { return result->length(); }
40+
41+
}
42+
3143
/**
3244
* A dummy invalid protocol error response that's used to handle responses
3345
* encoded with deprecated protocol versions.
@@ -58,9 +70,7 @@ bool Response::decode_custom_payload(Decoder& decoder) {
5870
bool Response::decode_warnings(Decoder& decoder) { return decoder.decode_warnings(warnings_); }
5971

6072
bool ResponseMessage::allocate_body(int8_t opcode) {
61-
response_body_.reset();
6273
switch (opcode) {
63-
6474
case CQL_OPCODE_ERROR:
6575
response_body_.reset(new ErrorResponse());
6676
return true;
@@ -144,13 +154,11 @@ ssize_t ResponseMessage::decode(const char* input, size_t size) {
144154
// If a deprecated version of the protocol is encountered then we fake
145155
// an invalid protocol error.
146156
if (version_ < CASS_PROTOCOL_VERSION_V3) {
147-
response_body_.reset(new InvalidProtocolErrorResponse());
148-
} else if (!allocate_body(opcode_) || !response_body_) {
149-
return -1;
157+
invalid_protocol_error_ = true;
158+
} else {
159+
buffer_ = RefBuffer::Ptr(RefBuffer::create(length_));
160+
body_buffer_pos_ = buffer_->data();
150161
}
151-
152-
response_body_->set_buffer(length_);
153-
body_buffer_pos_ = response_body_->data();
154162
} else {
155163
// We haven't received all the data for the header. We consume the
156164
// entire buffer.
@@ -171,25 +179,9 @@ ssize_t ResponseMessage::decode(const char* input, size_t size) {
171179
memcpy(body_buffer_pos_, input_pos, needed);
172180
body_buffer_pos_ += needed;
173181
input_pos += needed;
174-
assert(body_buffer_pos_ == response_body_->data() + length_);
175-
Decoder decoder(response_body_->data(), length_, ProtocolVersion(version_));
182+
assert(body_buffer_pos_ == buffer_->data() + length_);
176183

177-
if (flags_ & CASS_FLAG_TRACING) {
178-
if (!response_body_->decode_trace_id(decoder)) return -1;
179-
}
180-
181-
if (flags_ & CASS_FLAG_WARNING) {
182-
if (!response_body_->decode_warnings(decoder)) return -1;
183-
}
184-
185-
if (flags_ & CASS_FLAG_CUSTOM_PAYLOAD) {
186-
if (!response_body_->decode_custom_payload(decoder)) return -1;
187-
}
188-
189-
if (!response_body_->decode(decoder)) {
190-
is_body_error_ = true;
191-
return -1;
192-
}
184+
response_decoder_ = Decoder(buffer_->data(), length_, ProtocolVersion(version_));
193185

194186
is_body_ready_ = true;
195187
} else {
@@ -202,3 +194,40 @@ ssize_t ResponseMessage::decode(const char* input, size_t size) {
202194

203195
return input_pos - input;
204196
}
197+
198+
bool ResponseMessage::decode_response_body(bool is_raw) {
199+
if (invalid_protocol_error_) {
200+
response_body_.reset(new InvalidProtocolErrorResponse());
201+
return true;
202+
}
203+
204+
if (is_raw) {
205+
response_body_.reset(new RawResponse(opcode_, length_));
206+
response_body_->set_buffer(buffer_);
207+
return true;
208+
}
209+
210+
if (!allocate_body(opcode_)) {
211+
return false;
212+
}
213+
214+
response_body_->set_buffer(buffer_);
215+
216+
if (flags_ & CASS_FLAG_TRACING) {
217+
if (!response_body_->decode_trace_id(response_decoder_)) return false;
218+
}
219+
220+
if (flags_ & CASS_FLAG_WARNING) {
221+
if (!response_body_->decode_warnings(response_decoder_)) return false;
222+
}
223+
224+
if (flags_ & CASS_FLAG_CUSTOM_PAYLOAD) {
225+
if (!response_body_->decode_custom_payload(response_decoder_)) return false;
226+
}
227+
228+
if (!response_body_->decode(response_decoder_)) {
229+
return false;
230+
}
231+
232+
return true;
233+
}

src/response.hpp

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,7 @@ class Response : public RefCounted<Response> {
4646

4747
const RefBuffer::Ptr& buffer() const { return buffer_; }
4848

49-
void set_buffer(size_t size) { buffer_ = RefBuffer::Ptr(RefBuffer::create(size)); }
49+
void set_buffer(const RefBuffer::Ptr& buffer) { buffer_ = buffer; }
5050

5151
bool has_tracing_id() const;
5252

@@ -62,6 +62,8 @@ class Response : public RefCounted<Response> {
6262

6363
bool decode_warnings(Decoder& decoder);
6464

65+
virtual bool is_raw() const { return false; }
66+
6567
virtual bool decode(Decoder& decoder) = 0;
6668

6769
private:
@@ -75,6 +77,24 @@ class Response : public RefCounted<Response> {
7577
DISALLOW_COPY_AND_ASSIGN(Response);
7678
};
7779

80+
class RawResponse : public Response {
81+
public:
82+
RawResponse(uint8_t opcode, size_t length)
83+
: Response(opcode)
84+
, length_(length) { }
85+
86+
size_t length() const { return length_; }
87+
88+
virtual bool is_raw() const { return true; }
89+
90+
virtual bool decode(Decoder& decoder) {
91+
return true; // Ignore decoding the body
92+
}
93+
94+
private:
95+
size_t length_;
96+
};
97+
7898
class ResponseMessage : public Allocated {
7999
public:
80100
ResponseMessage()
@@ -88,8 +108,8 @@ class ResponseMessage : public Allocated {
88108
, is_header_received_(false)
89109
, header_buffer_pos_(header_buffer_)
90110
, is_body_ready_(false)
91-
, is_body_error_(false)
92-
, body_buffer_pos_(NULL) {}
111+
, body_buffer_pos_(NULL)
112+
, invalid_protocol_error_(false) { }
93113

94114
uint8_t flags() const { return flags_; }
95115

@@ -103,6 +123,8 @@ class ResponseMessage : public Allocated {
103123

104124
ssize_t decode(const char* input, size_t size);
105125

126+
bool decode_response_body(bool is_raw);
127+
106128
private:
107129
bool allocate_body(int8_t opcode);
108130

@@ -120,14 +142,22 @@ class ResponseMessage : public Allocated {
120142
char* header_buffer_pos_;
121143

122144
bool is_body_ready_;
123-
bool is_body_error_;
124-
Response::Ptr response_body_;
125145
char* body_buffer_pos_;
146+
RefBuffer::Ptr buffer_;
147+
bool invalid_protocol_error_;
148+
Response::Ptr response_body_;
149+
150+
Decoder response_decoder_;
126151

127152
private:
128153
DISALLOW_COPY_AND_ASSIGN(ResponseMessage);
129154
};
130155

131156
}}} // namespace datastax::internal::core
132157

158+
159+
typedef struct CassRawResult_ CassRawResult;
160+
161+
EXTERNAL_TYPE(datastax::internal::core::RawResponse, CassRawResult)
162+
133163
#endif

src/session.cpp

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,12 @@ CassFuture* cass_session_execute_batch(CassSession* session, const CassBatch* ba
9999
return CassFuture::to(future.get());
100100
}
101101

102+
CassFuture* cass_session_execute_raw(CassSession* session, cass_uint8_t opcode, cass_uint8_t flags, const char* frame, size_t frame_size) {
103+
Future::Ptr future(session->execute(Request::ConstPtr(new RawRequest(opcode, frame, frame_size))));
104+
future->inc_ref();
105+
return CassFuture::to(future.get());
106+
}
107+
102108
const CassSchemaMeta* cass_session_get_schema_meta(const CassSession* session) {
103109
return CassSchemaMeta::to(new Metadata::SchemaSnapshot(session->cluster()->schema_snapshot()));
104110
}

0 commit comments

Comments
 (0)