Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 2 additions & 20 deletions docs/wasm_filter.md
Original file line number Diff line number Diff line change
Expand Up @@ -975,7 +975,7 @@ class CallHandler : public GrpcCallHandler<google::protobuf::Empty> {
/* override onSuccess code */
}
/*
more callbacks such as onFailure, onCreateInitialMetadata
more callbacks such as onFailure
*/
};
```
Expand Down Expand Up @@ -1011,15 +1011,6 @@ Called when the async gRPC request fails. No further callbacks will be
invoked. *status* is returned grpc status. *error\_message* is the gRPC
status message or empty string if not present.

#### onCreateInitialMetadata

``` {.sourceCode .cpp}
void onCreateInitialMetadata()
```

Called when populating the headers to send with initial metadata. TODO:
how to add metadata?

#### cancel

``` {.sourceCode .cpp}
Expand All @@ -1043,7 +1034,7 @@ class StreamHandler : public GrpcStreamHandler<google::protobuf::Struct, google:
/* override onReceive code */
}
/*
more callbacks such as onCreateInitialMetadat, onReceiveTrailingMetadata, onReceive, onRemoteClose
more callbacks such as onReceiveTrailingMetadata, onReceive, onRemoteClose
*/
};
```
Expand Down Expand Up @@ -1090,15 +1081,6 @@ Close the stream locally and remotely (as needed). No further methods
may be invoked on the handler object and no further callbacks will be
invoked.

#### onCreateInitialMetadata

``` {.sourceCode .cpp}
void onCreateInitialMetadata()
```

Called when populating the headers to send with initial metadata. TODO:
how to add initial metadata?

#### onReceiveInitialMetadata

``` {.sourceCode .cpp}
Expand Down
57 changes: 27 additions & 30 deletions proxy_wasm_api.h
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,6 @@ class GrpcCallHandlerBase {

void cancel();

virtual void onCreateInitialMetadata(uint32_t /* headers */) {}
virtual void onSuccess(size_t body_size) = 0;
virtual void onFailure(GrpcStatus status) = 0;

Expand Down Expand Up @@ -229,7 +228,6 @@ class GrpcStreamHandlerBase {
// callbacks.
void reset();

virtual void onCreateInitialMetadata(uint32_t /* headers */) {}
virtual void onReceiveInitialMetadata(uint32_t /* headers */) {}
virtual void onReceiveTrailingMetadata(uint32_t /* trailers */) {}
virtual void onReceive(size_t body_size) = 0;
Expand Down Expand Up @@ -324,7 +322,6 @@ class RootContext : public ContextBase {
// Low level HTTP/gRPC interface.
virtual void onHttpCallResponse(uint32_t token, uint32_t headers, size_t body_size,
uint32_t trailers);
virtual void onGrpcCreateInitialMetadata(uint32_t token, uint32_t headers);
virtual void onGrpcReceiveInitialMetadata(uint32_t token, uint32_t headers);
virtual void onGrpcReceiveTrailingMetadata(uint32_t token, uint32_t trailers);
virtual void onGrpcReceive(uint32_t token, size_t body_size);
Expand All @@ -338,9 +335,11 @@ class RootContext : public ContextBase {
// NB: the message is the response if status == OK and an error message
// otherwise. Returns false on setup error.
WasmResult grpcSimpleCall(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds, GrpcSimpleCallCallback callback);
WasmResult grpcSimpleCall(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds,
std::function<void(size_t body_size)> success_callback,
Expand All @@ -352,16 +351,18 @@ class RootContext : public ContextBase {
failure_callback(status);
}
};
return grpcSimpleCall(service, service_name, method_name, request, timeout_milliseconds,
callback);
return grpcSimpleCall(service, service_name, method_name, initial_metadata, request,
timeout_milliseconds, callback);
}
// Returns false on setup error.
WasmResult grpcCallHandler(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds,
std::unique_ptr<GrpcCallHandlerBase> handler);
// Returns false on setup error.
WasmResult grpcStreamHandler(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
std::unique_ptr<GrpcStreamHandlerBase> handler);

private:
Expand Down Expand Up @@ -1174,19 +1175,28 @@ inline Histogram<Tags...> *Histogram<Tags...>::New(StringView name,
}

inline WasmResult grpcCall(StringView service, StringView service_name, StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds, uint32_t *token_ptr) {
void *metadata_ptr = nullptr;
size_t metadata_size = 0;
MakeHeaderStringPairsBuffer(initial_metadata, &metadata_ptr, &metadata_size);
std::string serialized_request;
request.SerializeToString(&serialized_request);
return proxy_grpc_call(service.data(), service.size(), service_name.data(), service_name.size(),
method_name.data(), method_name.size(), serialized_request.data(),
serialized_request.size(), timeout_milliseconds, token_ptr);
method_name.data(), method_name.size(), metadata_ptr, metadata_size,
serialized_request.data(), serialized_request.size(), timeout_milliseconds,
token_ptr);
}

inline WasmResult grpcStream(StringView service, StringView service_name, StringView method_name,
uint32_t *token_ptr) {
const HeaderStringPairs &initial_metadata, uint32_t *token_ptr) {
void *metadata_ptr = nullptr;
size_t metadata_size = 0;
MakeHeaderStringPairsBuffer(initial_metadata, &metadata_ptr, &metadata_size);
return proxy_grpc_stream(service.data(), service.size(), service_name.data(), service_name.size(),
method_name.data(), method_name.size(), token_ptr);
method_name.data(), method_name.size(), metadata_ptr, metadata_size,
token_ptr);
}

inline WasmResult grpcCancel(uint32_t token) { return proxy_grpc_cancel(token); }
Expand Down Expand Up @@ -1221,12 +1231,13 @@ inline void RootContext::onHttpCallResponse(uint32_t token, uint32_t headers, si

inline WasmResult RootContext::grpcSimpleCall(StringView service, StringView service_name,
StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds,
Context::GrpcSimpleCallCallback callback) {
uint32_t token = 0;
WasmResult result =
grpcCall(service, service_name, method_name, request, timeout_milliseconds, &token);
WasmResult result = grpcCall(service, service_name, method_name, initial_metadata, request,
timeout_milliseconds, &token);
if (result == WasmResult::Ok) {
asRoot()->simple_grpc_calls_[token] = std::move(callback);
}
Expand Down Expand Up @@ -1263,23 +1274,6 @@ inline void GrpcStreamHandlerBase::send(StringView message, bool end_of_stream)
}
}

inline void RootContext::onGrpcCreateInitialMetadata(uint32_t token, uint32_t headers) {
{
auto it = grpc_calls_.find(token);
if (it != grpc_calls_.end()) {
it->second->onCreateInitialMetadata(headers);
return;
}
}
{
auto it = grpc_streams_.find(token);
if (it != grpc_streams_.end()) {
it->second->onCreateInitialMetadata(headers);
return;
}
}
}

inline void RootContext::onGrpcReceiveInitialMetadata(uint32_t token, uint32_t headers) {
{
auto it = grpc_streams_.find(token);
Expand Down Expand Up @@ -1370,11 +1364,13 @@ inline void RootContext::onGrpcClose(uint32_t token, GrpcStatus status) {

inline WasmResult RootContext::grpcCallHandler(StringView service, StringView service_name,
StringView method_name,
const HeaderStringPairs &initial_metadata,
const google::protobuf::MessageLite &request,
uint32_t timeout_milliseconds,
std::unique_ptr<GrpcCallHandlerBase> handler) {
uint32_t token = 0;
auto result = grpcCall(service, service_name, method_name, request, timeout_milliseconds, &token);
auto result = grpcCall(service, service_name, method_name, initial_metadata, request,
timeout_milliseconds, &token);
if (result == WasmResult::Ok) {
handler->token_ = token;
handler->context_ = this;
Expand All @@ -1385,9 +1381,10 @@ inline WasmResult RootContext::grpcCallHandler(StringView service, StringView se

inline WasmResult RootContext::grpcStreamHandler(StringView service, StringView service_name,
StringView method_name,
const HeaderStringPairs &initial_metadata,
std::unique_ptr<GrpcStreamHandlerBase> handler) {
uint32_t token = 0;
auto result = grpcStream(service, service_name, method_name, &token);
auto result = grpcStream(service, service_name, method_name, initial_metadata, &token);
if (result == WasmResult::Ok) {
handler->token_ = token;
handler->context_ = this;
Expand Down
2 changes: 2 additions & 0 deletions proxy_wasm_externs.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,13 @@ extern "C" WasmResult proxy_http_call(const char *uri_ptr, size_t uri_size, void
extern "C" WasmResult proxy_grpc_call(const char *service_ptr, size_t service_size,
const char *service_name_ptr, size_t service_name_size,
const char *method_name_ptr, size_t method_name_size,
size_t initial_metadata_pairs_size, const char *request_ptr,
const char *request_ptr, size_t request_size,
uint32_t timeout_milliseconds, uint32_t *token_ptr);
extern "C" WasmResult proxy_grpc_stream(const char *service_ptr, size_t service_size,
const char *service_name_ptr, size_t service_name_size,
const char *method_name_ptr, size_t method_name_size,
size_t initial_metadata_pairs_size, const char *request_ptr,
uint32_t *token_ptr);
extern "C" WasmResult proxy_grpc_cancel(uint32_t token);
extern "C" WasmResult proxy_grpc_close(uint32_t token);
Expand Down
5 changes: 0 additions & 5 deletions proxy_wasm_intrinsics.cc
Original file line number Diff line number Diff line change
Expand Up @@ -243,11 +243,6 @@ extern "C" PROXY_WASM_KEEPALIVE void proxy_on_http_call_response(uint32_t contex
->onHttpCallResponse(token, headers, static_cast<size_t>(body_size), trailers);
}

extern "C" PROXY_WASM_KEEPALIVE void
proxy_on_grpc_create_initial_metadata(uint32_t context_id, uint32_t token, uint32_t headers) {
getRootContext(context_id)->onGrpcCreateInitialMetadata(token, headers);
}

extern "C" PROXY_WASM_KEEPALIVE void
proxy_on_grpc_receive_initial_metadata(uint32_t context_id, uint32_t token, uint32_t headers) {
getRootContext(context_id)->onGrpcReceiveInitialMetadata(token, headers);
Expand Down