diff --git a/.bazelrc b/.bazelrc index 86e814d38..161b2866d 100644 --- a/.bazelrc +++ b/.bazelrc @@ -1,3 +1,4 @@ +build:compdb --build_tag_filters=-nocompdb # Pass CC, CXX and PATH from the environment. build --action_env=CC build --action_env=CXX diff --git a/.github/workflows/format.yml b/.github/workflows/format.yml index a09aa7a64..8989bf5a7 100644 --- a/.github/workflows/format.yml +++ b/.github/workflows/format.yml @@ -28,9 +28,6 @@ on: - 'envoy-release/**' - 'istio-release/**' - schedule: - - cron: '0 0 * * *' - concurrency: group: ${{ github.head_ref || github.run_id }}-${{ github.workflow }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index 5d311c819..eeb3f4675 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -28,9 +28,6 @@ on: - 'envoy-release/**' - 'istio-release/**' - schedule: - - cron: '0 0 * * *' - concurrency: group: ${{ github.head_ref || github.run_id }}-${{ github.workflow }} diff --git a/bazel/dependencies.bzl b/bazel/dependencies.bzl index 49db6d7ba..b6a4cb576 100644 --- a/bazel/dependencies.bzl +++ b/bazel/dependencies.bzl @@ -39,7 +39,7 @@ def proxy_wasm_cpp_host_dependencies(): "wasm32-unknown-unknown", "wasm32-wasi", ], - version = "1.68.0", + version = "1.62.1", ) rust_repository_set( name = "rust_linux_s390x", @@ -48,7 +48,7 @@ def proxy_wasm_cpp_host_dependencies(): "wasm32-unknown-unknown", "wasm32-wasi", ], - version = "1.68.0", + version = "1.62.1", ) zig_register_toolchains( diff --git a/bazel/external/wamr.BUILD b/bazel/external/wamr.BUILD index 8c2955d57..dcf8d87ef 100644 --- a/bazel/external/wamr.BUILD +++ b/bazel/external/wamr.BUILD @@ -12,11 +12,24 @@ filegroup( cmake( name = "wamr_lib", generate_args = [ + # disable WASI "-DWAMR_BUILD_LIBC_WASI=0", - "-DWAMR_BUILD_MULTI_MODULE=0", + "-DWAMR_BUILD_LIBC_BUILTIN=0", + # MVP + "-DWAMR_BUILD_BULK_MEMORY=1", + "-DWAMR_BUILD_REF_TYPES=1", "-DWAMR_BUILD_TAIL_CALL=1", - "-DWAMR_DISABLE_HW_BOUND_CHECK=0", - "-DWAMR_DISABLE_STACK_HW_BOUND_CHECK=1", + # WAMR private features + "-DWAMR_BUILD_MULTI_MODULE=0", + # Some tests have indicated that the following three factors have + # a minimal impact on performance. + # - Get function names from name section + "-DWAMR_BUILD_CUSTOM_NAME_SECTION=1", + "-DWAMR_BUILD_LOAD_CUSTOM_SECTION=1", + # - Show Wasm call stack if met a trap + "-DWAMR_BUILD_DUMP_CALL_STACK=1", + # Cache module files + "-DWAMR_BUILD_WASM_CACHE=0", "-GNinja", ] + select({ "@proxy_wasm_cpp_host//bazel:engine_wamr_jit": [ @@ -26,6 +39,8 @@ cmake( "-DWAMR_BUILD_INTERP=0", "-DWAMR_BUILD_JIT=1", "-DWAMR_BUILD_SIMD=1", + # linux perf. only for jit and aot + # "-DWAMR_BUILD_LINUX_PERF=1", ], "//conditions:default": [ "-DWAMR_BUILD_AOT=0", diff --git a/bazel/repositories.bzl b/bazel/repositories.bzl index 60a48548c..a8f555e98 100644 --- a/bazel/repositories.bzl +++ b/bazel/repositories.bzl @@ -19,6 +19,13 @@ load("@bazel_tools//tools/build_defs/repo:utils.bzl", "maybe") def proxy_wasm_cpp_host_repositories(): # Bazel extensions. + maybe( + http_archive, + name = "bazel_compdb", + strip_prefix = "bazel-compilation-database-0.5.2", + url = "https://github.com/grailbio/bazel-compilation-database/archive/0.5.2.tar.gz", + ) + maybe( http_archive, name = "bazel_skylib", @@ -74,10 +81,10 @@ def proxy_wasm_cpp_host_repositories(): maybe( http_archive, name = "rules_rust", - sha256 = "e3fe2a255589d128c5e59e407ee57c832533f25ce14cc23605d368cf507ce08d", - strip_prefix = "rules_rust-0.24.1", + sha256 = "f3d443e9ad1eca99fbcade1c649adbd8200753cf22e47846b3105a43a550273b", + strip_prefix = "rules_rust-0.8.1", # NOTE: Update Rust version in bazel/dependencies.bzl. - url = "https://github.com/bazelbuild/rules_rust/archive/0.24.1.tar.gz", + url = "https://github.com/bazelbuild/rules_rust/archive/0.8.1.tar.gz", patches = ["@proxy_wasm_cpp_host//bazel/external:rules_rust.patch"], patch_args = ["-p1"], ) @@ -96,9 +103,9 @@ def proxy_wasm_cpp_host_repositories(): maybe( http_archive, name = "proxy_wasm_cpp_sdk", - sha256 = "89792fc1abca331f29f99870476a04146de5e82ff903bdffca90e6729c1f2470", - strip_prefix = "proxy-wasm-cpp-sdk-95bb82ce45c41d9100fd1ec15d2ffc67f7f3ceee", - urls = ["https://github.com/proxy-wasm/proxy-wasm-cpp-sdk/archive/95bb82ce45c41d9100fd1ec15d2ffc67f7f3ceee.tar.gz"], + sha256 = "cab5efa54c0cec8eb17c0a2f6ce72b9cd84ebba2b332e919187f963a5d7cfaa1", + strip_prefix = "proxy-wasm-cpp-sdk-47bb9cd141a151415ad6a597ed60c78bea2ce0b7", + urls = ["https://github.com/higress-group/proxy-wasm-cpp-sdk/archive/47bb9cd141a151415ad6a597ed60c78bea2ce0b7.tar.gz"], ) # Test dependencies. @@ -162,10 +169,10 @@ def proxy_wasm_cpp_host_repositories(): http_archive, name = "com_github_bytecodealliance_wasm_micro_runtime", build_file = "@proxy_wasm_cpp_host//bazel/external:wamr.BUILD", - # WAMR-1.2.1 - sha256 = "7548d4bbea8dbb9b005e83bd571f93a12fb3f0b5e87a8b0130f004dd92df4b0b", - strip_prefix = "wasm-micro-runtime-WAMR-1.2.1", - url = "https://github.com/bytecodealliance/wasm-micro-runtime/archive/refs/tags/WAMR-1.2.1.zip", + # WAMR-2.1.1 + sha256 = "a0824762abbcbb3dd6b7bb07530f198ece5d792a12a879bc2a99100590fdb151", + strip_prefix = "wasm-micro-runtime-WAMR-2.1.1", + url = "https://github.com/bytecodealliance/wasm-micro-runtime/archive/refs/tags/WAMR-2.1.1.zip", ) native.bind( diff --git a/include/proxy-wasm/context.h b/include/proxy-wasm/context.h index 12937041f..a91abee06 100644 --- a/include/proxy-wasm/context.h +++ b/include/proxy-wasm/context.h @@ -136,6 +136,7 @@ class ContextBase : public RootInterface, public StreamInterface, public HeaderInterface, public HttpCallInterface, + public RedisCallInterface, public GrpcCallInterface, public GrpcStreamInterface, public MetricsInterface, @@ -151,6 +152,7 @@ class ContextBase : public RootInterface, virtual ~ContextBase(); WasmBase *wasm() const { return wasm_; } + void clearWasm() { wasm_ = nullptr; } uint32_t id() const { return id_; } // The VM Context used for calling "malloc" has an id_ == 0. bool isVmContext() const { return id_ == 0; } @@ -214,6 +216,10 @@ class ContextBase : public RootInterface, // Async call response. void onHttpCallResponse(HttpCallToken token, uint32_t headers, uint32_t body_size, uint32_t trailers) override; + + // Redis + void onRedisCallResponse(RedisCallToken token, uint32_t status, uint32_t response_size) override; + // Grpc void onGrpcReceiveInitialMetadata(GrpcToken token, uint32_t elements) override; void onGrpcReceive(GrpcToken token, uint32_t response_size) override; @@ -264,6 +270,12 @@ class ContextBase : public RootInterface, unimplemented(); return nullptr; } + + WasmResult setBuffer(WasmBufferType /* type */, size_t /* start */, size_t /* length */, + std::string_view /* data */) override { + return unimplemented(); + } + bool endOfStream(WasmStreamType /* type */) override { unimplemented(); return true; @@ -276,6 +288,16 @@ class ContextBase : public RootInterface, return unimplemented(); } + // Redis + WasmResult redisInit(std::string_view /* target */, std::string_view /* username */, + std::string_view /* password */, int /* timeout_millisconds */) override { + return unimplemented(); + } + WasmResult redisCall(std::string_view /* target */, std::string_view /* query */, + uint32_t * /* token_ptr */) override { + return unimplemented(); + } + // gRPC WasmResult grpcCall(std::string_view /* grpc_service */, std::string_view /* service_name */, std::string_view /* method_name */, const Pairs & /* initial_metadata */, @@ -331,6 +353,22 @@ class ContextBase : public RootInterface, std::string_view /* details */) override { return unimplemented(); } + + // Inject Data + WasmResult injectEncodedDataToFilterChain(std::string_view /* body_text */, bool /* end_stream */) override { + return unimplemented(); + } + + // Get all enpoint metrics of upstream cluster + WasmResult getUpstreamHosts(StringPairs * /* result */) override { + return unimplemented(); + } + + // Override upstream host and bypass lb policy + WasmResult setUpstreamOverrideHost(std::string_view /* address */) override { + return unimplemented(); + } + void clearRouteCache() override { unimplemented(); } void failStream(WasmStreamType stream_type) override { closeStream(stream_type); } diff --git a/include/proxy-wasm/context_interface.h b/include/proxy-wasm/context_interface.h index 48fce76d9..5a765c755 100644 --- a/include/proxy-wasm/context_interface.h +++ b/include/proxy-wasm/context_interface.h @@ -31,12 +31,14 @@ namespace proxy_wasm { using Pairs = std::vector>; using PairsWithStringValues = std::vector>; +using StringPairs = std::vector>; using TimerToken = uint32_t; using HttpCallToken = uint32_t; using GrpcToken = uint32_t; using GrpcStatusCode = uint32_t; using SharedQueueDequeueToken = uint32_t; using SharedQueueEnqueueToken = uint32_t; +using RedisCallToken = uint32_t; // TODO: update SDK and use this. enum class ProxyAction : uint32_t { @@ -161,6 +163,12 @@ struct RootInterface : public RootGrpcInterface { */ virtual void onHttpCallResponse(HttpCallToken token, uint32_t headers, uint32_t body_size, uint32_t trailers) = 0; + /** + * Called on a Root Context when a response arrives for an outstanding redisCall(). + * @param token is the token returned by the corresponding redisCall(). + */ + virtual void onRedisCallResponse(RedisCallToken token, uint32_t status, + uint32_t response_size) = 0; /** * Called on a Root Context when an Inter-VM shared queue message has arrived. @@ -248,6 +256,14 @@ struct HttpInterface { // Call just before the Context is deleted. See RootInterface. virtual void onDelete() = 0; + + // Inject encoded data to filter chain + virtual WasmResult injectEncodedDataToFilterChain(std::string_view /* body_text */, bool /* end_stream */) = 0; + + // Get all enpoint metrics of upstream cluster + virtual WasmResult getUpstreamHosts(StringPairs * /* result */) = 0; + // Override upstream host and bypass lb policy + virtual WasmResult setUpstreamOverrideHost(std::string_view /* address */) = 0; }; /** @@ -328,6 +344,9 @@ struct StreamInterface { */ virtual const BufferInterface *getBuffer(WasmBufferType type) = 0; + virtual WasmResult setBuffer(WasmBufferType type, size_t start, size_t length, + std::string_view data) = 0; + /** * Provides the end-of-stream status of the given flow (if any) or false. End of stream occurs * when a connection is closed (or half-closed) either locally or remotely. @@ -416,6 +435,14 @@ struct HttpCallInterface { int timeout_milliseconds, HttpCallToken *token_ptr) = 0; }; +struct RedisCallInterface { + virtual ~RedisCallInterface() = default; + virtual WasmResult redisInit(std::string_view target, std::string_view username, + std::string_view password, int timeout_milliseconds) = 0; + virtual WasmResult redisCall(std::string_view target, std::string_view query, + RedisCallToken *token_ptr) = 0; +}; + struct GrpcCallInterface { virtual ~GrpcCallInterface() = default; /** diff --git a/include/proxy-wasm/exports.h b/include/proxy-wasm/exports.h index 376a4d3b6..db8092236 100644 --- a/include/proxy-wasm/exports.h +++ b/include/proxy-wasm/exports.h @@ -78,6 +78,9 @@ Word send_local_response(Word response_code, Word response_code_details_ptr, Word response_code_details_size, Word body_ptr, Word body_size, Word additional_response_header_pairs_ptr, Word additional_response_header_pairs_size, Word grpc_status); +Word inject_encoded_data_to_filter_chain(Word body_ptr, Word body_size, Word end_stream); +Word set_upstream_override_host(Word address_ptr, Word address_size); +Word get_upstream_hosts(Word ptr, Word size); Word clear_route_cache(); Word get_shared_data(Word key_ptr, Word key_size, Word value_ptr_ptr, Word value_size_ptr, Word cas_ptr); @@ -104,6 +107,10 @@ Word get_response_body_buffer_bytes(Word start, Word length, Word ptr_ptr, Word Word http_call(Word uri_ptr, Word uri_size, Word header_pairs_ptr, Word header_pairs_size, Word body_ptr, Word body_size, Word trailer_pairs_ptr, Word trailer_pairs_size, Word timeout_milliseconds, Word token_ptr); +Word redis_init(Word service_ptr, Word service_size, Word username_ptr, Word username_size, + Word passowrd_ptr, Word password_size, Word timeout_milliseconds); +Word redis_call(Word service_ptr, Word service_size, Word query_ptr, Word query_size, + Word token_ptr); Word define_metric(Word metric_type, Word name_ptr, Word name_size, Word metric_id_ptr); Word increment_metric(Word metric_id, int64_t offset); Word record_metric(Word metric_id, uint64_t value); @@ -133,21 +140,51 @@ Word wasi_unstable_path_open(Word fd, Word dir_flags, Word path, Word path_len, int64_t fs_rights_base, int64_t fg_rights_inheriting, Word fd_flags, Word nwritten_ptr); Word wasi_unstable_fd_prestat_get(Word fd, Word buf_ptr); +Word wasi_unstable_fd_filestat_get(Word fd, Word buf_ptr); Word wasi_unstable_fd_prestat_dir_name(Word fd, Word path_ptr, Word path_len); Word wasi_unstable_fd_write(Word fd, Word iovs, Word iovs_len, Word nwritten_ptr); Word wasi_unstable_fd_read(Word, Word, Word, Word); Word wasi_unstable_fd_seek(Word, int64_t, Word, Word); Word wasi_unstable_fd_close(Word); Word wasi_unstable_fd_fdstat_get(Word fd, Word statOut); +Word wasi_unstable_fd_fdstat_set_flags(Word fd, Word flags); Word wasi_unstable_environ_get(Word, Word); Word wasi_unstable_environ_sizes_get(Word count_ptr, Word buf_size_ptr); Word wasi_unstable_args_get(Word argc_ptr, Word argv_buf_size_ptr); Word wasi_unstable_args_sizes_get(Word argc_ptr, Word argv_buf_size_ptr); +Word wasi_unstable_sched_yield(); +Word wasi_unstable_poll_oneoff(Word in, Word out, Word nsubscriptions, Word nevents); void wasi_unstable_proc_exit(Word); Word wasi_unstable_clock_time_get(Word, uint64_t, Word); +Word wasi_unstable_clock_res_get(Word, Word); +Word wasi_unstable_fd_advise(Word, uint64_t, uint64_t, Word); +Word wasi_unstable_fd_allocate(Word, uint64_t, uint64_t); +Word wasi_unstable_fd_datasync(Word); +Word wasi_unstable_fd_fdstat_set_rights(Word, uint64_t, uint64_t); +Word wasi_unstable_fd_filestat_set_size(Word, uint64_t); +Word wasi_unstable_fd_filestat_set_times(Word, uint64_t, uint64_t, Word); +Word wasi_unstable_fd_pread(Word, Word, Word, uint64_t, Word); +Word wasi_unstable_fd_pwrite(Word, Word, Word, uint64_t, Word); +Word wasi_unstable_fd_readdir(Word, Word, Word, uint64_t, Word); +Word wasi_unstable_fd_renumber(Word, Word); +Word wasi_unstable_fd_sync(Word); +Word wasi_unstable_fd_tell(Word, Word); +Word wasi_unstable_path_create_directory(Word, Word, Word); +Word wasi_unstable_path_filestat_set_times(Word, Word, Word, Word, uint64_t, uint64_t, Word); +Word wasi_unstable_path_link(Word, Word, Word, Word, Word, Word); +Word wasi_unstable_path_readlink(Word, Word, Word, Word, Word, Word); +Word wasi_unstable_path_remove_directory(Word, Word, Word); +Word wasi_unstable_path_rename(Word, Word, Word, Word, Word, Word); +Word wasi_unstable_path_symlink(Word, Word, Word, Word); +Word wasi_unstable_path_unlink_file(Word, Word, Word); +Word wasi_unstable_sock_accept(Word, Word, Word); +Word wasi_unstable_sock_recv(Word, Word, Word, Word, Word, Word); +Word wasi_unstable_sock_send(Word, Word, Word, Word, Word); +Word wasi_unstable_sock_shutdown(Word, Word); Word wasi_unstable_random_get(Word, Word); Word pthread_equal(Word left, Word right); void emscripten_notify_memory_growth(Word); +Word wasi_unstable_path_filestat_get(Word fd, Word flags, Word path, Word path_len, Word buf); // Support for embedders, not exported to Wasm. @@ -163,16 +200,28 @@ void emscripten_notify_memory_growth(Word); _f(get_current_time_nanoseconds) _f(define_metric) \ _f(increment_metric) _f(record_metric) _f(get_metric) \ _f(set_effective_context) _f(done) \ - _f(call_foreign_function) + _f(call_foreign_function) _f(redis_init) \ + _f(redis_call) _f(get_upstream_hosts) \ + _f(inject_encoded_data_to_filter_chain) \ + _f(set_upstream_override_host) #define FOR_ALL_HOST_FUNCTIONS_ABI_SPECIFIC(_f) \ _f(get_configuration) _f(continue_request) _f(continue_response) _f(clear_route_cache) \ _f(continue_stream) _f(close_stream) _f(get_log_level) #define FOR_ALL_WASI_FUNCTIONS(_f) \ - _f(fd_write) _f(fd_read) _f(fd_seek) _f(fd_close) _f(fd_fdstat_get) _f(environ_get) \ - _f(environ_sizes_get) _f(args_get) _f(args_sizes_get) _f(clock_time_get) _f(random_get) \ - _f(proc_exit) _f(path_open) _f(fd_prestat_get) _f(fd_prestat_dir_name) + _f(fd_write) _f(fd_read) _f(fd_seek) _f(fd_close) _f(fd_fdstat_get) _f(fd_fdstat_set_flags) \ + _f(fd_fdstat_set_rights) _f(environ_get) _f(environ_sizes_get) _f(args_get) \ + _f(args_sizes_get) _f(clock_time_get) _f(clock_res_get) _f(fd_advise) _f(fd_allocate) \ + _f(fd_datasync) _f(fd_filestat_set_size) _f(fd_filestat_set_times) _f(fd_pread) \ + _f(fd_pwrite) _f(fd_readdir) _f(fd_renumber) _f(fd_sync) _f(fd_tell) \ + _f(path_create_directory) _f(path_filestat_set_times) _f(path_link) \ + _f(path_readlink) _f(path_remove_directory) _f(path_rename) \ + _f(path_symlink) _f(path_unlink_file) _f(sock_accept) _f(sock_recv) \ + _f(sock_send) _f(sock_shutdown) _f(random_get) _f(sched_yield) \ + _f(poll_oneoff) _f(proc_exit) _f(path_open) \ + _f(fd_prestat_get) _f(fd_prestat_dir_name) \ + _f(path_filestat_get) _f(fd_filestat_get) // Helpers to generate a stub to pass to VM, in place of a restricted proxy-wasm capability. #define _CREATE_PROXY_WASM_STUB(_fn) \ diff --git a/include/proxy-wasm/null_plugin.h b/include/proxy-wasm/null_plugin.h index b33187ce0..8d5d80436 100644 --- a/include/proxy-wasm/null_plugin.h +++ b/include/proxy-wasm/null_plugin.h @@ -92,6 +92,8 @@ class NullPlugin : public NullVmPlugin { void onHttpCallResponse(uint64_t context_id, uint64_t token, uint64_t headers, uint64_t body_size, uint64_t trailers); + void onRedisCallResponse(uint64_t context_id, uint64_t token, uint64_t status, uint64_t response); + void onGrpcReceive(uint64_t context_id, uint64_t token, size_t body_size); void onGrpcClose(uint64_t context_id, uint64_t token, uint64_t status_code); void onGrpcReceiveInitialMetadata(uint64_t context_id, uint64_t token, uint64_t headers); diff --git a/include/proxy-wasm/sdk.h b/include/proxy-wasm/sdk.h index 105854373..56d773d2f 100644 --- a/include/proxy-wasm/sdk.h +++ b/include/proxy-wasm/sdk.h @@ -44,6 +44,7 @@ using FilterMetadataStatus = internal::FilterMetadataStatus; using FilterTrailersStatus = internal::FilterTrailersStatus; using FilterDataStatus = internal::FilterDataStatus; using GrpcStatus = internal::GrpcStatus; +using RedisStatus = internal::RedisStatus; using MetricType = internal::MetricType; using CloseType = internal::CloseType; diff --git a/include/proxy-wasm/wasm.h b/include/proxy-wasm/wasm.h index 9fa2bda1f..95d7a5b78 100644 --- a/include/proxy-wasm/wasm.h +++ b/include/proxy-wasm/wasm.h @@ -17,6 +17,7 @@ #include +#include #include #include #include @@ -72,10 +73,22 @@ class WasmBase : public std::enable_shared_from_this { return it->second; return nullptr; } + void clearWasmInContext() { + for (auto &item : contexts_) { + if (item.second != nullptr) { + item.second->clearWasm(); + } + } + contexts_.clear(); + } uint32_t allocContextId(); bool isFailed() { return failed_ != FailState::Ok; } FailState fail_state() { return failed_; } + // Rebuild state management + bool shouldRebuild() const { return should_rebuild_; } + void setShouldRebuild(bool should_rebuild) { should_rebuild_ = should_rebuild; } + const std::string &vm_configuration() const; const std::string &moduleBytecode() const { return module_bytecode_; } @@ -174,6 +187,7 @@ class WasmBase : public std::enable_shared_from_this { HttpCall = 0, GrpcCall = 1, GrpcStream = 2, + RedisCall = 3, }; static const uint32_t kCalloutTypeMask = 0x3; // Enough to cover the 3 types. static const uint32_t kCalloutIncrement = 0x4; // Enough to cover the 3 types. @@ -186,6 +200,9 @@ class WasmBase : public std::enable_shared_from_this { bool isGrpcStreamId(uint32_t callout_id) { return (callout_id & kCalloutTypeMask) == static_cast(CalloutType::GrpcStream); } + bool isRedisCallId(uint32_t callout_id) { + return (callout_id & kCalloutTypeMask) == static_cast(CalloutType::RedisCall); + } uint32_t nextHttpCallId() { // TODO(PiotrSikora): re-add rollover protection (requires at least 1 billion callouts). return next_http_call_id_ += kCalloutIncrement; @@ -198,6 +215,10 @@ class WasmBase : public std::enable_shared_from_this { // TODO(PiotrSikora): re-add rollover protection (requires at least 1 billion callouts). return next_grpc_stream_id_ += kCalloutIncrement; } + uint32_t nextRedisCallId() { + // TODO(PiotrSikora): re-add rollover protection (requires at least 1 billion callouts). + return next_redis_call_id_ += kCalloutIncrement; + } protected: friend class ContextBase; @@ -244,6 +265,7 @@ class WasmBase : public std::enable_shared_from_this { WasmCallWord<2> on_request_headers_abi_01_; WasmCallWord<3> on_request_headers_abi_02_; + WasmCallWord<3> on_request_headers_abi_03_; WasmCallWord<3> on_request_body_; WasmCallWord<2> on_request_trailers_; WasmCallWord<2> on_request_metadata_; @@ -256,6 +278,8 @@ class WasmBase : public std::enable_shared_from_this { WasmCallVoid<5> on_http_call_response_; + WasmCallVoid<4> on_redis_call_response_; + WasmCallVoid<3> on_grpc_receive_; WasmCallVoid<3> on_grpc_close_; WasmCallVoid<3> on_grpc_create_initial_metadata_; @@ -275,15 +299,16 @@ class WasmBase : public std::enable_shared_from_this { _f(on_downstream_connection_close) _f(on_upstream_connection_close) _f(on_request_body) \ _f(on_request_trailers) _f(on_request_metadata) _f(on_response_body) \ _f(on_response_trailers) _f(on_response_metadata) _f(on_http_call_response) \ - _f(on_grpc_receive) _f(on_grpc_close) _f(on_grpc_receive_initial_metadata) \ - _f(on_grpc_receive_trailing_metadata) _f(on_queue_ready) _f(on_done) \ - _f(on_log) _f(on_delete) + _f(on_redis_call_response) _f(on_grpc_receive) _f(on_grpc_close) \ + _f(on_grpc_receive_initial_metadata) \ + _f(on_grpc_receive_trailing_metadata) _f(on_queue_ready) _f(on_done) \ + _f(on_log) _f(on_delete) // Capabilities which are allowed to be linked to the module. If this is empty, restriction // is not enforced. AllowedCapabilitiesMap allowed_capabilities_; - std::shared_ptr base_wasm_handle_; + std::weak_ptr base_wasm_handle_weak_; // Used by the base_wasm to enable non-clonable thread local Wasm(s) to be constructed. std::string module_bytecode_; @@ -296,6 +321,7 @@ class WasmBase : public std::enable_shared_from_this { std::string vm_configuration_; bool stop_iteration_ = false; FailState failed_ = FailState::Ok; // Wasm VM fatal error. + bool should_rebuild_ = false; // Wasm VM rebuild flag. // Plugin Stats/Metrics uint32_t next_counter_metric_id_ = static_cast(MetricType::Counter); @@ -306,6 +332,7 @@ class WasmBase : public std::enable_shared_from_this { uint32_t next_http_call_id_ = static_cast(CalloutType::HttpCall); uint32_t next_grpc_call_id_ = static_cast(CalloutType::GrpcCall); uint32_t next_grpc_stream_id_ = static_cast(CalloutType::GrpcStream); + uint32_t next_redis_call_id_ = static_cast(CalloutType::RedisCall); // Actions to be done after the call into the VM returns. std::deque> after_vm_call_actions_; @@ -334,8 +361,26 @@ class WasmHandleBase : public std::enable_shared_from_this { std::shared_ptr &wasm() { return wasm_base_; } + void setRecoverVmCallback(std::function()> &&f) { + recover_vm_callback_ = std::move(f); + } + + // Rebuild the wasm vm and generate a new wasm handle + bool rebuild(std::shared_ptr &new_handle) { + assert(new_handle == nullptr); + if (recover_vm_callback_ == nullptr) { + return true; + } + new_handle = recover_vm_callback_(); + if (!new_handle) { + return false; + } + return true; + } + protected: std::shared_ptr wasm_base_; + std::function()> recover_vm_callback_; std::unordered_map plugin_canary_cache_; }; @@ -359,15 +404,51 @@ class PluginHandleBase : public std::enable_shared_from_this { ~PluginHandleBase() { if (wasm_handle_) { wasm_handle_->wasm()->startShutdown(plugin_->key()); + wasm_handle_->wasm()->wasm_vm()->removeFailCallback(plugin_handle_key_); } } std::shared_ptr &plugin() { return plugin_; } std::shared_ptr &wasm() { return wasm_handle_->wasm(); } + std::shared_ptr &wasmHandle() { return wasm_handle_; } + + void setPluginHandleKey(std::string_view key) { plugin_handle_key_ = std::string(key); } + + void setRecoverPluginCallback( + std::function(std::shared_ptr &)> &&f) { + recover_plugin_callback_ = std::move(f); + } + + // Rebuild the wasm plugin and generate a new plugin handle + bool rebuild(std::shared_ptr &new_handle) { + assert(new_handle == nullptr); + if (recover_plugin_callback_ == nullptr) { + return true; + } + std::shared_ptr new_wasm_handle; + if (!wasm_handle_->rebuild(new_wasm_handle)) { + std::cerr << "wasmHandle rebuild failed" + << "\n"; + return false; + } + new_handle = recover_plugin_callback_(new_wasm_handle); + if (!new_handle) { + std::cerr << "pluginHandle rebuild failed" + << "\n"; + return false; + } + return true; + } protected: std::shared_ptr plugin_; std::shared_ptr wasm_handle_; + std::function(std::shared_ptr &)> + recover_plugin_callback_; + +private: + // key for the plugin handle, used to identify the key in fail callbacks + std::string plugin_handle_key_; }; using PluginHandleFactory = std::function( @@ -383,8 +464,9 @@ std::shared_ptr getOrCreateThreadLocalPlugin( void clearWasmCachesForTesting(); inline const std::string &WasmBase::vm_configuration() const { - if (base_wasm_handle_) - return base_wasm_handle_->wasm()->vm_configuration_; + auto base_wasm_handle = base_wasm_handle_weak_.lock(); + if (base_wasm_handle) + return base_wasm_handle->wasm()->vm_configuration_; return vm_configuration_; } @@ -392,15 +474,17 @@ inline void *WasmBase::allocMemory(uint64_t size, uint64_t *address) { if (!malloc_) { return nullptr; } - wasm_vm_->setRestrictedCallback( - true, {// logging (Proxy-Wasm) - "env.proxy_log", - // logging (stdout/stderr) - "wasi_unstable.fd_write", "wasi_snapshot_preview1.fd_write", - // time - "wasi_unstable.clock_time_get", "wasi_snapshot_preview1.clock_time_get"}); + // wasm_vm_->setRestrictedCallback( + // true, {// logging (Proxy-Wasm) + // "env.proxy_log", + // // logging (stdout/stderr) + // "wasi_unstable.fd_write", "wasi_snapshot_preview1.fd_write", + // // time + // "wasi_unstable.clock_time_get", "wasi_snapshot_preview1.clock_time_get", + // // go runtime gc sleep + // "wasi_unstable.poll_oneoff", "wasi_snapshot_preview1.poll_oneoff"}); Word a = malloc_(vm_context(), size); - wasm_vm_->setRestrictedCallback(false); + // wasm_vm_->setRestrictedCallback(false); if (!a.u64_) { return nullptr; } diff --git a/include/proxy-wasm/wasm_api_impl.h b/include/proxy-wasm/wasm_api_impl.h index 899af7e41..46b421238 100644 --- a/include/proxy-wasm/wasm_api_impl.h +++ b/include/proxy-wasm/wasm_api_impl.h @@ -102,6 +102,19 @@ proxy_send_local_response(uint32_t response_code, const char *response_code_deta WR(body_ptr), WS(body_size), WR(additional_response_header_pairs_ptr), WS(additional_response_header_pairs_size), WS(grpc_status))); } +inline WasmResult +proxy_inject_encoded_data_to_filter_chain(const char *body_ptr, size_t body_size, bool end_stream) { + return wordToWasmResult(exports::inject_encoded_data_to_filter_chain( + WR(body_ptr), WS(body_size), WS(end_stream))); +} +inline WasmResult +proxy_set_upstream_override_host(const char *address_ptr, size_t address_size) { + return wordToWasmResult(exports::set_upstream_override_host(WR(address_ptr), WS(address_size))); +} +inline WasmResult +proxy_get_upstream_hosts(const char **ptr, size_t *size) { + return wordToWasmResult(exports::get_upstream_hosts(WR(ptr), WR(size))); +} inline WasmResult proxy_clear_route_cache() { return wordToWasmResult(exports::clear_route_cache()); @@ -211,6 +224,23 @@ inline WasmResult proxy_http_call(const char *uri_ptr, size_t uri_size, void *he WR(trailer_pairs_ptr), WS(trailer_pairs_size), WS(timeout_milliseconds), WR(token_ptr))); } + +// Redis +// Returns token, used in callback onRedisCallResponse +inline WasmResult proxy_redis_call(const char *service_ptr, size_t service_size, + const char *query_ptr, size_t query_size, uint32_t *token_ptr) { + return wordToWasmResult(exports::redis_call(WR(service_ptr), WS(service_size), WR(query_ptr), + WS(query_size), WR(token_ptr))); +} +inline WasmResult proxy_redis_init(const char *service_ptr, size_t service_size, + const char *username_ptr, size_t username_size, + const char *password_ptr, size_t password_size, + uint32_t timeout_milliseconds) { + return wordToWasmResult(exports::redis_init(WR(service_ptr), WS(service_size), WR(username_ptr), + WS(username_size), WR(password_ptr), + WS(password_size), WS(timeout_milliseconds))); +} + // gRPC // Returns token, used in gRPC callbacks (onGrpc...) inline WasmResult proxy_grpc_call(const char *service_ptr, size_t service_size, diff --git a/include/proxy-wasm/wasm_vm.h b/include/proxy-wasm/wasm_vm.h index a573212e0..bc44e7f64 100644 --- a/include/proxy-wasm/wasm_vm.h +++ b/include/proxy-wasm/wasm_vm.h @@ -70,9 +70,9 @@ using WasmCallWord = std::function) _f(proxy_wasm::WasmCallVoid<1>) _f(proxy_wasm::WasmCallVoid<2>) \ - _f(proxy_wasm::WasmCallVoid<3>) _f(proxy_wasm::WasmCallVoid<5>) \ - _f(proxy_wasm::WasmCallWord<1>) _f(proxy_wasm::WasmCallWord<2>) \ - _f(proxy_wasm::WasmCallWord<3>) + _f(proxy_wasm::WasmCallVoid<3>) _f(proxy_wasm::WasmCallVoid<4>) \ + _f(proxy_wasm::WasmCallVoid<5>) _f(proxy_wasm::WasmCallWord<1>) \ + _f(proxy_wasm::WasmCallWord<2>) _f(proxy_wasm::WasmCallWord<3>) // These are templates and its helper for constructing signatures of functions callbacks from Wasm // VMs. @@ -114,23 +114,31 @@ using WasmCallback_WWmW = Word (*)(Word, uint64_t, Word); using WasmCallback_WWWWWWllWW = Word (*)(Word, Word, Word, Word, Word, int64_t, int64_t, Word, Word); using WasmCallback_dd = double (*)(double); +// Additional callback types for new WASI functions +using WasmCallback_WWWWmm = Word (*)(Word, Word, Word, Word, uint64_t, uint64_t); +using WasmCallback_WWWWmmW = Word (*)(Word, Word, Word, Word, uint64_t, uint64_t, Word); +using WasmCallback_WWmm = Word (*)(Word, uint64_t, uint64_t); +using WasmCallback_WWmmW = Word (*)(Word, uint64_t, uint64_t, Word); +using WasmCallback_WWWWmW = Word (*)(Word, Word, Word, uint64_t, Word); #define FOR_ALL_WASM_VM_IMPORTS(_f) \ _f(proxy_wasm::WasmCallbackVoid<0>) _f(proxy_wasm::WasmCallbackVoid<1>) \ _f(proxy_wasm::WasmCallbackVoid<2>) _f(proxy_wasm::WasmCallbackVoid<3>) \ _f(proxy_wasm::WasmCallbackVoid<4>) _f(proxy_wasm::WasmCallbackWord<0>) \ _f(proxy_wasm::WasmCallbackWord<1>) _f(proxy_wasm::WasmCallbackWord<2>) \ - _f(proxy_wasm::WasmCallbackWord<3>) _f(proxy_wasm::WasmCallbackWord<4>) \ - _f(proxy_wasm::WasmCallbackWord<5>) _f(proxy_wasm::WasmCallbackWord<6>) \ - _f(proxy_wasm::WasmCallbackWord<7>) _f(proxy_wasm::WasmCallbackWord<8>) \ - _f(proxy_wasm::WasmCallbackWord<9>) \ - _f(proxy_wasm::WasmCallbackWord<10>) \ - _f(proxy_wasm::WasmCallbackWord<12>) \ - _f(proxy_wasm::WasmCallback_WWl) \ - _f(proxy_wasm::WasmCallback_WWlWW) \ - _f(proxy_wasm::WasmCallback_WWm) \ - _f(proxy_wasm::WasmCallback_WWmW) \ - _f(proxy_wasm::WasmCallback_WWWWWWllWW) \ + _f(proxy_wasm::WasmCallbackWord<3>) _f(proxy_wasm::WasmCallbackWord<4>) _f( \ + proxy_wasm::WasmCallbackWord<5>) _f(proxy_wasm::WasmCallbackWord<6>) \ + _f(proxy_wasm::WasmCallbackWord<7>) _f(proxy_wasm::WasmCallbackWord<8>) _f( \ + proxy_wasm::WasmCallbackWord<9>) _f(proxy_wasm::WasmCallbackWord<10>) \ + _f(proxy_wasm::WasmCallbackWord<12>) _f(proxy_wasm::WasmCallback_WWl) \ + _f(proxy_wasm::WasmCallback_WWlWW) _f(proxy_wasm::WasmCallback_WWm) \ + _f(proxy_wasm::WasmCallback_WWmW) \ + _f(proxy_wasm::WasmCallback_WWWWWWllWW) \ + _f(proxy_wasm::WasmCallback_WWWWmm) \ + _f(proxy_wasm::WasmCallback_WWWWmmW) \ + _f(proxy_wasm::WasmCallback_WWmm) \ + _f(proxy_wasm::WasmCallback_WWmmW) \ + _f(proxy_wasm::WasmCallback_WWWWmW) \ _f(proxy_wasm::WasmCallback_dd) enum class Cloneable { @@ -139,7 +147,13 @@ enum class Cloneable { InstantiatedModule // VMs can be cloned from an instantiated module. }; -enum class AbiVersion { ProxyWasm_0_1_0, ProxyWasm_0_2_0, ProxyWasm_0_2_1, Unknown }; +enum class AbiVersion { + ProxyWasm_0_1_0, + ProxyWasm_0_2_0, + ProxyWasm_0_2_1, + ProxyWasm_0_2_100, + Unknown +}; class NullPlugin; @@ -174,6 +188,7 @@ enum class FailState : int { StartFailed = 5, ConfigureFailed = 6, RuntimeError = 7, + RecoverError = 8, }; // Wasm VM instance. Provides the low level WASM interface. @@ -313,12 +328,27 @@ class WasmVm { void fail(FailState fail_state, std::string_view message) { integration()->error(message); failed_ = fail_state; - for (auto &callback : fail_callbacks_) { + for (auto & [key, callback] : fail_callbacks_) { callback(fail_state); } } - void addFailCallback(std::function fail_callback) { - fail_callbacks_.push_back(fail_callback); + + /** + * Generates id for fail callbacks allowing direct insertion of the function. + * Note: if fail callback needs to be removed later, must provide specific key. + */ + void addFailCallback(std::function fail_callback) { + static int id = 0; + std::string key = std::to_string(id++); + addFailCallback(key, std::move(fail_callback)); + } + + void addFailCallback(const std::string& key, std::function fail_callback) { + fail_callbacks_[key] = std::move(fail_callback); + } + + void removeFailCallback(const std::string& key) { + fail_callbacks_.erase(key); } bool isHostFunctionAllowed(const std::string &name) { @@ -338,7 +368,7 @@ class WasmVm { protected: std::unique_ptr integration_; FailState failed_ = FailState::Ok; - std::vector> fail_callbacks_; + std::unordered_map> fail_callbacks_; private: bool restricted_callback_{false}; diff --git a/src/bytecode_util.cc b/src/bytecode_util.cc index 70a373e01..d0d7dc868 100644 --- a/src/bytecode_util.cc +++ b/src/bytecode_util.cc @@ -83,6 +83,10 @@ bool BytecodeUtil::getAbiVersion(std::string_view bytecode, proxy_wasm::AbiVersi ret = AbiVersion::ProxyWasm_0_2_1; return true; } + if (export_name == "proxy_abi_version_0_2_100") { + ret = AbiVersion::ProxyWasm_0_2_100; + return true; + } } // Skip export's index. if (!parseVarint(pos, end, export_name_size)) { diff --git a/src/context.cc b/src/context.cc index 5353a52a5..42b4a7ec0 100644 --- a/src/context.cc +++ b/src/context.cc @@ -13,6 +13,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include #include @@ -317,14 +319,21 @@ template static uint32_t headerSize(const P &p) { return p ? p->siz FilterHeadersStatus ContextBase::onRequestHeaders(uint32_t headers, bool end_of_stream) { CHECK_FAIL_HTTP(FilterHeadersStatus::Continue, FilterHeadersStatus::StopAllIterationAndWatermark); - if (!wasm_->on_request_headers_abi_01_ && !wasm_->on_request_headers_abi_02_) { + if (!wasm_->on_request_headers_abi_01_ && !wasm_->on_request_headers_abi_02_ && + !wasm_->on_request_headers_abi_03_) { return FilterHeadersStatus::Continue; } DeferAfterCallActions actions(this); - const auto result = wasm_->on_request_headers_abi_01_ - ? wasm_->on_request_headers_abi_01_(this, id_, headers) - : wasm_->on_request_headers_abi_02_(this, id_, headers, - static_cast(end_of_stream)); + uint64_t result; + if (wasm_->on_request_headers_abi_01_) { + result = wasm_->on_request_headers_abi_01_(this, id_, headers); + } else if (wasm_->on_request_headers_abi_02_) { + result = + wasm_->on_request_headers_abi_02_(this, id_, headers, static_cast(end_of_stream)); + } else if (wasm_->on_request_headers_abi_03_) { + result = + wasm_->on_request_headers_abi_03_(this, id_, headers, static_cast(end_of_stream)); + } CHECK_FAIL_HTTP(FilterHeadersStatus::Continue, FilterHeadersStatus::StopAllIterationAndWatermark); return convertVmCallResultToFilterHeadersStatus(result); } @@ -420,6 +429,14 @@ void ContextBase::onHttpCallResponse(uint32_t token, uint32_t headers, uint32_t wasm_->on_http_call_response_(this, id_, token, headers, body_size, trailers); } +void ContextBase::onRedisCallResponse(uint32_t token, uint32_t status, uint32_t response_size) { + if (isFailed() || !wasm_->on_redis_call_response_) { + return; + } + DeferAfterCallActions actions(this); + wasm_->on_redis_call_response_(this, id_, token, status, response_size); +} + void ContextBase::onQueueReady(uint32_t token) { if (!isFailed() && wasm_->on_queue_ready_) { DeferAfterCallActions actions(this); @@ -493,7 +510,8 @@ FilterHeadersStatus ContextBase::convertVmCallResultToFilterHeadersStatus(uint64 result > static_cast(FilterHeadersStatus::StopAllIterationAndWatermark)) { return FilterHeadersStatus::StopAllIterationAndWatermark; } - if (result == static_cast(FilterHeadersStatus::StopIteration)) { + if ((wasm_->on_request_headers_abi_01_ || wasm_->on_request_headers_abi_02_) && + result == static_cast(FilterHeadersStatus::StopIteration)) { // Always convert StopIteration (pause processing headers, but continue processing body) // to StopAllIterationAndWatermark (pause all processing), since the former breaks all // assumptions about HTTP processing. @@ -527,7 +545,7 @@ FilterMetadataStatus ContextBase::convertVmCallResultToFilterMetadataStatus(uint ContextBase::~ContextBase() { // Do not remove vm context which has the same lifetime as wasm_. - if (id_ != 0U) { + if (id_ != 0U && wasm_ != nullptr) { wasm_->contexts_.erase(id_); } } diff --git a/src/exports.cc b/src/exports.cc index 0290dcf0f..7876cc7aa 100644 --- a/src/exports.cc +++ b/src/exports.cc @@ -159,6 +159,58 @@ Word send_local_response(Word response_code, Word response_code_details_ptr, return WasmResult::Ok; } +Word inject_encoded_data_to_filter_chain(Word body_ptr, Word body_size, Word end_stream) { + auto *context = contextOrEffectiveContext(); + auto body = context->wasmVm()->getMemory(body_ptr, body_size); + if (!body) { + return WasmResult::InvalidMemoryAccess; + } + context->injectEncodedDataToFilterChain(body.value(), end_stream != 0U); + return WasmResult::Ok; +} + +Word get_upstream_hosts(Word ptr_ptr, Word size_ptr) { + auto *context = contextOrEffectiveContext(); + StringPairs pairs; + auto result = context->getUpstreamHosts(&pairs); + if (result != WasmResult::Ok) { + return result; + } + if (pairs.empty()) { + if (!context->wasm()->copyToPointerSize("", ptr_ptr, size_ptr)) { + return WasmResult::InvalidMemoryAccess; + } + return WasmResult::Ok; + } + uint64_t size = PairsUtil::pairsSize(pairs); + uint64_t ptr = 0; + char *buffer = static_cast(context->wasm()->allocMemory(size, &ptr)); + if (buffer == nullptr) { + return WasmResult::InvalidMemoryAccess; + } + if (!PairsUtil::marshalPairs(pairs, buffer, size)) { + return WasmResult::InvalidMemoryAccess; + } + if (!context->wasmVm()->setWord(ptr_ptr, Word(ptr))) { + return WasmResult::InvalidMemoryAccess; + } + if (!context->wasmVm()->setWord(size_ptr, Word(size))) { + return WasmResult::InvalidMemoryAccess; + } + return WasmResult::Ok; +} + +Word set_upstream_override_host(Word address_ptr, Word address_size) { + auto *context = contextOrEffectiveContext(); + auto address = context->wasmVm()->getMemory(address_ptr, address_size); + if (!address) { + return WasmResult::InvalidMemoryAccess; + } + context->setUpstreamOverrideHost(address.value()); + return WasmResult::Ok; +} + + Word clear_route_cache() { auto *context = contextOrEffectiveContext(); context->clearRouteCache(); @@ -497,15 +549,11 @@ Word set_buffer_bytes(Word type, Word start, Word length, Word data_ptr, Word da return WasmResult::BadArgument; } auto *context = contextOrEffectiveContext(); - auto *buffer = context->getBuffer(static_cast(type.u64_)); - if (buffer == nullptr) { - return WasmResult::NotFound; - } auto data = context->wasmVm()->getMemory(data_ptr, data_size); if (!data) { return WasmResult::InvalidMemoryAccess; } - return buffer->copyFrom(start, length, data.value()); + return context->setBuffer(static_cast(type.u64_), start, length, data.value()); } Word http_call(Word uri_ptr, Word uri_size, Word header_pairs_ptr, Word header_pairs_size, @@ -533,6 +581,35 @@ Word http_call(Word uri_ptr, Word uri_size, Word header_pairs_ptr, Word header_p return result; } +Word redis_init(Word service_ptr, Word service_size, Word username_ptr, Word username_size, + Word passowrd_ptr, Word password_size, Word timeout_milliseconds) { + auto *context = contextOrEffectiveContext()->root_context(); + auto service = context->wasmVm()->getMemory(service_ptr, service_size); + auto username = context->wasmVm()->getMemory(username_ptr, username_size); + auto password = context->wasmVm()->getMemory(passowrd_ptr, password_size); + if (!service || !username || !password) { + return WasmResult::InvalidMemoryAccess; + } + return context->redisInit(service.value(), username.value(), password.value(), + timeout_milliseconds); +} + +Word redis_call(Word service_ptr, Word service_size, Word query_ptr, Word query_size, + Word token_ptr) { + auto *context = contextOrEffectiveContext()->root_context(); + auto service = context->wasmVm()->getMemory(service_ptr, service_size); + auto query = context->wasmVm()->getMemory(query_ptr, query_size); + uint32_t token = 0; + // NB: try to write the token to verify the memory before starting the async + // operation. + if (!context->wasm()->setDatatype(token_ptr, token)) { + return WasmResult::InvalidMemoryAccess; + } + auto result = context->redisCall(service.value(), query.value(), &token); + context->wasm()->setDatatype(token_ptr, token); + return result; +} + Word define_metric(Word metric_type, Word name_ptr, Word name_size, Word metric_id_ptr) { auto *context = contextOrEffectiveContext(); auto name = context->wasmVm()->getMemory(name_ptr, name_size); @@ -646,6 +723,9 @@ Word grpc_send(Word token, Word message_ptr, Word message_size, Word end_stream) return context->grpcSend(token, message.value(), end_stream != 0U); } +// WASIp1 typings in comments sourced from +// https://github.com/WebAssembly/wasi-libc/blob/446cb3f1aa21f9b1a1eab372f82d65d19003e924/libc-bottom-half/headers/public/wasi/api.h + // __wasi_errno_t path_open(__wasi_fd_t fd, __wasi_lookupflags_t dirflags, const char *path, // size_t path_len, __wasi_oflags_t oflags, __wasi_rights_t fs_rights_base, __wasi_rights_t // fs_rights_inheriting, __wasi_fdflags_t fdflags, __wasi_fd_t *retptr0) @@ -666,6 +746,11 @@ Word wasi_unstable_fd_prestat_dir_name(Word /*fd*/, Word /*path_ptr*/, Word /*pa return 52; // __WASI_ERRNO_ENOSYS } +//__wasi_errno_t __wasi_fd_filestat_get(__wasi_fd_t fd,__wasi_filestat_t *retptr0) +Word wasi_unstable_fd_filestat_get(Word /*fd*/, Word /*buf_ptr*/) { + return 8; // __WASI_ERRNO_BADF +} + // Implementation of writev-like() syscall that redirects stdout/stderr to Envoy // logs. Word writevImpl(Word fd, Word iovs, Word iovs_len, Word *nwritten_ptr) { @@ -756,6 +841,14 @@ Word wasi_unstable_fd_close(Word /*fd*/) { return 0; } +// __wasi_errno_t __wasi_path_filestat_get(__wasi_fd_t fd,__wasi_lookupflags_t flags,const char +// *path,size_t path_len,__wasi_filestat_t *buf); + +Word wasi_unstable_path_filestat_get(Word /*fd*/, Word /*flags*/, Word /*path*/, Word /*path_len*/, + Word /*buf*/) { + return 58; // __WASI_ENOTSUP +} + // __wasi_errno_t __wasi_fd_fdstat_get(__wasi_fd_t fd, __wasi_fdstat_t *stat) Word wasi_unstable_fd_fdstat_get(Word fd, Word statOut) { // We will only support this interface on stdout and stderr @@ -776,6 +869,13 @@ Word wasi_unstable_fd_fdstat_get(Word fd, Word statOut) { return 0; // __WASI_ESUCCESS } +// __wasi_errno_t __wasi_fd_fdstat_set_flags(__wasi_fd_t fd, __wasi_fdflags_t flags) +Word wasi_unstable_fd_fdstat_set_flags(Word /*fd*/, Word /*flags*/) { + // Flags that can be specified: append, dsync, nonblock, rsync, and sync. Proxy-wasm only supports + // STDOUT and STDERR, but none of these flags have any effect in Proxy-Wasm. + return 52; // __WASI_ERRNO_ENOSYS +} + // __wasi_errno_t __wasi_environ_get(char **environ, char *environ_buf); Word wasi_unstable_environ_get(Word environ_array_ptr, Word environ_buf) { auto *context = contextOrEffectiveContext(); @@ -862,6 +962,375 @@ Word wasi_unstable_clock_time_get(Word clock_id, uint64_t /*precision*/, return 0; // __WASI_ESUCCESS } +// __wasi_errno_t __wasi_clock_res_get(__wasi_clockid_t id, __wasi_timestamp_t *retptr0); +Word wasi_unstable_clock_res_get(Word clock_id, Word result_resolution_uint64_ptr) { + uint64_t result = 0; + auto *context = contextOrEffectiveContext(); + + switch (clock_id) { + case 0 /* realtime */: + case 1 /* monotonic */: + // Return 1 nanosecond as the resolution for both realtime and monotonic clocks + // This is a reasonable default for most systems + result = 1; + break; + default: + // process_cputime_id and thread_cputime_id are not supported yet. + return 58; // __WASI_ENOTSUP + } + + if (!context->wasm()->setDatatype(result_resolution_uint64_ptr, result)) { + return 21; // __WASI_EFAULT + } + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_fd_advise(__wasi_fd_t fd, __wasi_filesize_t offset, __wasi_filesize_t len, +// __wasi_advice_t advice); +Word wasi_unstable_fd_advise(Word fd, uint64_t offset, uint64_t len, Word advice) { + // fd_advise is used to provide advice about the expected behavior of the application with respect + // to a file. Since we don't have a real file system in proxy-wasm, we can just return success + // without doing anything. This is similar to how other file-related functions are implemented in + // this codebase. + + // We could check if fd is valid (e.g., stdout/stderr), but since this is just a hint and not + // required for correctness, we'll just return success for any fd. + + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_fd_allocate(__wasi_fd_t fd, __wasi_filesize_t offset, __wasi_filesize_t +// len); +Word wasi_unstable_fd_allocate(Word fd, uint64_t offset, uint64_t len) { + // fd_allocate is used to ensure that space is allocated for a file. + // Since we don't have a real file system in proxy-wasm, we can just return success without doing + // anything. This is similar to how other file-related functions are implemented in this codebase. + + // We only support stdout and stderr in proxy-wasm, which don't need allocation + if (fd != 1 /* stdout */ && fd != 2 /* stderr */) { + return 8; // __WASI_ERRNO_BADF - Bad file descriptor + } + + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_fd_datasync(__wasi_fd_t fd); +Word wasi_unstable_fd_datasync(Word fd) { + // fd_datasync is used to synchronize the data of a file to disk. + // Since we don't have a real file system in proxy-wasm, we can just return success for + // stdout/stderr and an error for other file descriptors. + + // We only support stdout and stderr in proxy-wasm + if (fd != 1 /* stdout */ && fd != 2 /* stderr */) { + return 8; // __WASI_ERRNO_BADF - Bad file descriptor + } + + // For stdout and stderr, there's no need to sync as they're handled by the host system + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_fd_fdstat_set_rights(__wasi_fd_t fd, __wasi_rights_t fs_rights_base, +// __wasi_rights_t fs_rights_inheriting); +Word wasi_unstable_fd_fdstat_set_rights(Word fd, uint64_t fs_rights_base, + uint64_t fs_rights_inheriting) { + // fd_fdstat_set_rights is used to adjust the rights associated with a file descriptor. + // Since we don't have a real file system in proxy-wasm, we can just return success for + // stdout/stderr and an error for other file descriptors. + + // We only support stdout and stderr in proxy-wasm + if (fd != 1 /* stdout */ && fd != 2 /* stderr */) { + return 8; // __WASI_ERRNO_BADF - Bad file descriptor + } + + // For stdout and stderr, we don't actually change any rights, but we can pretend it succeeded + // This is similar to how other file-related functions are implemented in this codebase + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_fd_filestat_set_size(__wasi_fd_t fd, __wasi_filesize_t size); +Word wasi_unstable_fd_filestat_set_size(Word fd, uint64_t size) { + // fd_filestat_set_size is used to adjust the size of a file, similar to ftruncate. + // Since we don't have a real file system in proxy-wasm, we can just return success for + // stdout/stderr and an error for other file descriptors. + + // We only support stdout and stderr in proxy-wasm + if (fd != 1 /* stdout */ && fd != 2 /* stderr */) { + return 8; // __WASI_ERRNO_BADF - Bad file descriptor + } + + // For stdout and stderr, we don't actually change any size, but we can pretend it succeeded + // This is similar to how other file-related functions are implemented in this codebase + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_fd_filestat_set_times(__wasi_fd_t fd, __wasi_timestamp_t atim, +// __wasi_timestamp_t mtim, __wasi_fstflags_t fst_flags); +Word wasi_unstable_fd_filestat_set_times(Word fd, uint64_t atim, uint64_t mtim, Word fst_flags) { + // fd_filestat_set_times is used to set the access and modification times of a file. + // Since we don't have a real file system in proxy-wasm, we can just return success for + // stdout/stderr and an error for other file descriptors. + + // We only support stdout and stderr in proxy-wasm + if (fd != 1 /* stdout */ && fd != 2 /* stderr */) { + return 8; // __WASI_ERRNO_BADF - Bad file descriptor + } + + // For stdout and stderr, we don't actually change any times, but we can pretend it succeeded + // This is similar to how other file-related functions are implemented in this codebase + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_fd_pread(__wasi_fd_t fd, const __wasi_iovec_t *iovs, size_t iovs_len, +// __wasi_filesize_t offset, __wasi_size_t *retptr0); +Word wasi_unstable_fd_pread(Word fd, Word iovs_ptr, Word iovs_len, uint64_t offset, + Word nread_ptr) { + // fd_pread is used to read from a file descriptor at a given offset. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + // This is similar to how fd_read is implemented in this codebase. + + // We don't support reading from any files in proxy-wasm + return 52; // __WASI_ERRNO_ENOSYS - Function not implemented +} + +// __wasi_errno_t __wasi_fd_pwrite(__wasi_fd_t fd, const __wasi_ciovec_t *iovs, size_t iovs_len, +// __wasi_filesize_t offset, __wasi_size_t *retptr0); +Word wasi_unstable_fd_pwrite(Word fd, Word iovs_ptr, Word iovs_len, uint64_t offset, + Word nwritten_ptr) { + auto *context = contextOrEffectiveContext(); + + // fd_pwrite is used to write to a file descriptor at a given offset. + // In proxy-wasm, we only support writing to stdout and stderr, and we don't support offsets. + // We'll implement this similar to fd_write but return an error for non-stdout/stderr fds. + + // Check if fd is stdout or stderr + if (fd != 1 /* stdout */ && fd != 2 /* stderr */) { + return 8; // __WASI_ERRNO_BADF - Bad file descriptor + } + + // For stdout and stderr, we'll just ignore the offset and write the data + // This is similar to how fd_write is implemented in this codebase + Word nwritten(0); + auto result = writevImpl(fd, iovs_ptr, iovs_len, &nwritten); + if (result != 0) { // __WASI_ESUCCESS + return result; + } + + if (!context->wasmVm()->setWord(nwritten_ptr, Word(nwritten))) { + return 21; // __WASI_EFAULT + } + + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_fd_readdir(__wasi_fd_t fd, uint8_t *buf, __wasi_size_t buf_len, +// __wasi_dircookie_t cookie, __wasi_size_t *retptr0); +Word wasi_unstable_fd_readdir(Word fd, Word buf_ptr, Word buf_len, uint64_t cookie, + Word nread_ptr) { + auto *context = contextOrEffectiveContext(); + + // fd_readdir is used to read directory entries from a directory. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + + // Set the number of bytes read to 0 + if (!context->wasmVm()->setWord(nread_ptr, Word(0))) { + return 21; // __WASI_EFAULT + } + + // Return ENOTDIR (Not a directory) error + return 20; // __WASI_ENOTDIR +} + +// __wasi_errno_t __wasi_fd_renumber(__wasi_fd_t fd, __wasi_fd_t to); +Word wasi_unstable_fd_renumber(Word fd, Word to) { + // fd_renumber is used to atomically replace a file descriptor by renumbering another file + // descriptor. In proxy-wasm, we only support stdout and stderr, which are fixed file descriptors. + + // Check if both file descriptors are valid (stdout or stderr) + if ((fd != 1 /* stdout */ && fd != 2 /* stderr */) || + (to != 1 /* stdout */ && to != 2 /* stderr */)) { + return 8; // __WASI_ERRNO_BADF - Bad file descriptor + } + + // We don't actually support renumbering stdout and stderr, so return an error + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_fd_sync(__wasi_fd_t fd); +Word wasi_unstable_fd_sync(Word fd) { + // fd_sync is used to synchronize a file's in-core state with the storage device. + // Since we don't have a real file system in proxy-wasm, we can just return success for + // stdout/stderr and an error for other file descriptors. + + // We only support stdout and stderr in proxy-wasm + if (fd != 1 /* stdout */ && fd != 2 /* stderr */) { + return 8; // __WASI_ERRNO_BADF - Bad file descriptor + } + + // For stdout and stderr, there's no need to sync as they're handled by the host system + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_fd_tell(__wasi_fd_t fd, __wasi_filesize_t *retptr0); +Word wasi_unstable_fd_tell(Word fd, Word retptr0) { + auto *context = contextOrEffectiveContext(); + + // fd_tell is used to get the current offset of a file descriptor. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + + // We only support stdout and stderr in proxy-wasm, which don't support seeking + if (fd != 1 /* stdout */ && fd != 2 /* stderr */) { + return 8; // __WASI_ERRNO_BADF - Bad file descriptor + } + + // For stdout and stderr, we'll just return 0 as the offset + if (!context->wasm()->setDatatype(retptr0, uint64_t(0))) { + return 21; // __WASI_EFAULT + } + + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_path_create_directory(__wasi_fd_t fd, const char *path); +Word wasi_unstable_path_create_directory(Word fd, Word path_ptr, Word path_len) { + // path_create_directory is used to create a directory. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_path_filestat_set_times(__wasi_fd_t fd, __wasi_lookupflags_t flags, const +// char *path, __wasi_timestamp_t atim, __wasi_timestamp_t mtim, __wasi_fstflags_t fst_flags); +Word wasi_unstable_path_filestat_set_times(Word fd, Word flags, Word path_ptr, Word path_len, + uint64_t atim, uint64_t mtim, Word fst_flags) { + // path_filestat_set_times is used to set the access and modification times of a file by path. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_path_link(__wasi_fd_t old_fd, __wasi_lookupflags_t old_flags, const char +// *old_path, __wasi_fd_t new_fd, const char *new_path); +Word wasi_unstable_path_link(Word old_fd, Word old_flags, Word old_path_ptr, Word old_path_len, + Word new_fd, Word new_path_ptr) { + // path_link is used to create a hard link. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_path_readlink(__wasi_fd_t fd, const char *path, uint8_t *buf, __wasi_size_t +// buf_len, __wasi_size_t *retptr0); +Word wasi_unstable_path_readlink(Word fd, Word path_ptr, Word path_len, Word buf_ptr, Word buf_len, + Word retptr0) { + auto *context = contextOrEffectiveContext(); + + // path_readlink is used to read the contents of a symbolic link. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + + // Set the number of bytes read to 0 + if (!context->wasmVm()->setWord(retptr0, Word(0))) { + return 21; // __WASI_EFAULT + } + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_path_remove_directory(__wasi_fd_t fd, const char *path); +Word wasi_unstable_path_remove_directory(Word fd, Word path_ptr, Word path_len) { + // path_remove_directory is used to remove a directory. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_path_rename(__wasi_fd_t fd, const char *old_path, __wasi_fd_t new_fd, const +// char *new_path); +Word wasi_unstable_path_rename(Word fd, Word old_path_ptr, Word old_path_len, Word new_fd, + Word new_path_ptr, Word new_path_len) { + // path_rename is used to rename a file or directory. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_path_symlink(const char *old_path, __wasi_fd_t fd, const char *new_path); +Word wasi_unstable_path_symlink(Word old_path_ptr, Word old_path_len, Word fd, Word new_path_ptr) { + // path_symlink is used to create a symbolic link. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_path_unlink_file(__wasi_fd_t fd, const char *path); +Word wasi_unstable_path_unlink_file(Word fd, Word path_ptr, Word path_len) { + // path_unlink_file is used to unlink a file. + // Since we don't have a real file system in proxy-wasm, we can just return an error. + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_sock_accept(__wasi_fd_t fd, __wasi_fdflags_t flags, __wasi_fd_t *retptr0); +Word wasi_unstable_sock_accept(Word fd, Word flags, Word retptr0) { + auto *context = contextOrEffectiveContext(); + + // sock_accept is used to accept a new connection on a socket. + // Since we don't have socket support in proxy-wasm, we can just return an error. + + // Set the returned file descriptor to an invalid value + if (!context->wasm()->setDatatype(retptr0, uint32_t(0))) { + return 21; // __WASI_EFAULT + } + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_sock_recv(__wasi_fd_t fd, const __wasi_iovec_t *ri_data, size_t +// ri_data_len, __wasi_riflags_t ri_flags, __wasi_size_t *retptr0, __wasi_roflags_t *retptr1); +Word wasi_unstable_sock_recv(Word fd, Word ri_data_ptr, Word ri_data_len, Word ri_flags, + Word retptr0, Word retptr1) { + auto *context = contextOrEffectiveContext(); + + // sock_recv is used to receive data from a socket. + // Since we don't have socket support in proxy-wasm, we can just return an error. + + // Set the number of bytes received to 0 + if (!context->wasmVm()->setWord(retptr0, Word(0))) { + return 21; // __WASI_EFAULT + } + + // Set the output flags to 0 + if (!context->wasm()->setDatatype(retptr1, uint16_t(0))) { + return 21; // __WASI_EFAULT + } + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_sock_send(__wasi_fd_t fd, const __wasi_ciovec_t *si_data, size_t +// si_data_len, __wasi_siflags_t si_flags, __wasi_size_t *retptr0); +Word wasi_unstable_sock_send(Word fd, Word si_data_ptr, Word si_data_len, Word si_flags, + Word retptr0) { + auto *context = contextOrEffectiveContext(); + + // sock_send is used to send data on a socket. + // Since we don't have socket support in proxy-wasm, we can just return an error. + + // Set the number of bytes sent to 0 + if (!context->wasmVm()->setWord(retptr0, Word(0))) { + return 21; // __WASI_EFAULT + } + + return 58; // __WASI_ENOTSUP - Not supported +} + +// __wasi_errno_t __wasi_sock_shutdown(__wasi_fd_t fd, __wasi_sdflags_t how); +Word wasi_unstable_sock_shutdown(Word fd, Word how) { + // sock_shutdown is used to shut down part of a full-duplex connection. + // Since we don't have socket support in proxy-wasm, we can just return an error. + + return 58; // __WASI_ENOTSUP - Not supported +} + // __wasi_errno_t __wasi_random_get(uint8_t *buf, size_t buf_len); Word wasi_unstable_random_get(Word result_buf_ptr, Word buf_len) { if (buf_len > PROXY_WASM_HOST_WASI_RANDOM_GET_MAX_SIZE_BYTES) { @@ -879,6 +1348,28 @@ Word wasi_unstable_random_get(Word result_buf_ptr, Word buf_len) { return 0; // __WASI_ESUCCESS } +// __wasi_errno_t __wasi_sched_yield() +Word wasi_unstable_sched_yield() { + // Per POSIX man pages, it is valid to return success if the calling thread is the only thread in + // the highest priority list. This is vacuously true for wasm without threads. There are no valid + // error cases defined. + return 0; // __WASI_ESUCCESS +} + +// __wasi_errno_t __wasi_poll_oneoff(const __wasi_subscription_t *in, __wasi_event_t *out, +// __wasi_size_t nsubscriptions, __wasi_size_t *nevents) +Word wasi_unstable_poll_oneoff(Word /*in*/, Word /*out*/, Word /*nsubscriptions*/, + Word nevents_ptr) { + auto *context = contextOrEffectiveContext(); + + // Since we are not performing event polling, directly set nevents to 0 + if (!context->wasmVm()->setWord(nevents_ptr, Word(0))) { + return 21; // __WASI_EFAULT - If there is a failure setting memory + } + + return 0; // __WASI_ESUCCESS +} + // void __wasi_proc_exit(__wasi_exitcode_t rval); void wasi_unstable_proc_exit(Word /*exit_code*/) { auto *context = contextOrEffectiveContext(); diff --git a/src/null/null_plugin.cc b/src/null/null_plugin.cc index 0f74496a2..d2a1b3667 100644 --- a/src/null/null_plugin.cc +++ b/src/null/null_plugin.cc @@ -127,6 +127,20 @@ void NullPlugin::getFunction(std::string_view function_name, WasmCallVoid<3> *f) } } +void NullPlugin::getFunction(std::string_view function_name, WasmCallVoid<4> *f) { + auto *plugin = this; + if (function_name == "proxy_on_redis_call_response") { + *f = [plugin](ContextBase *context, Word context_id, Word token, Word status, + Word response_size) { + SaveRestoreContext saved_context(context); + plugin->onRedisCallResponse(context_id, token, status, response_size); + }; + } else if (!wasm_vm_->integration()->getNullVmFunction(function_name, false, 4, this, f)) { + error("Missing getFunction for: " + std::string(function_name)); + *f = nullptr; + } +} + void NullPlugin::getFunction(std::string_view function_name, WasmCallVoid<5> *f) { auto *plugin = this; if (function_name == "proxy_on_http_call_response") { @@ -441,6 +455,12 @@ void NullPlugin::onHttpCallResponse(uint64_t context_id, uint64_t token, uint64_ getRootContext(context_id)->onHttpCallResponse(token, headers, body_size, trailers); } +void NullPlugin::onRedisCallResponse(uint64_t context_id, uint64_t token, uint64_t status, + uint64_t response_size) { + getRootContext(context_id) + ->onRedisCallResponse(token, static_cast(status), response_size); +} + void NullPlugin::onGrpcReceive(uint64_t context_id, uint64_t token, size_t body_size) { getRootContext(context_id)->onGrpcReceive(token, body_size); } diff --git a/src/wamr/wamr.cc b/src/wamr/wamr.cc index ea3d140e7..c02af9e19 100644 --- a/src/wamr/wamr.cc +++ b/src/wamr/wamr.cc @@ -56,7 +56,8 @@ class Wamr : public WasmVm { Wamr() = default; std::string_view getEngineName() override { return "wamr"; } - std::string_view getPrecompiledSectionName() override { return ""; } + // must use the exact name given by test-tools/append-aot-to-wasm/append_aot_to_wasm.py + std::string_view getPrecompiledSectionName() override { return "wamr-aot"; } Cloneable cloneable() override { return Cloneable::CompiledBytecode; } std::unique_ptr clone() override; @@ -118,18 +119,32 @@ class Wamr : public WasmVm { std::unordered_map module_functions_; }; -bool Wamr::load(std::string_view bytecode, std::string_view /*precompiled*/, +bool Wamr::load(std::string_view bytecode, std::string_view precompiled, const std::unordered_map & /*function_names*/) { store_ = wasm_store_new(engine()); if (store_ == nullptr) { return false; } - wasm_byte_vec_t binary = {.size = bytecode.size(), - .data = (char *)bytecode.data(), - .num_elems = bytecode.size(), - .size_of_elem = sizeof(byte_t), - .lock = nullptr}; + wasm_byte_vec_t binary = {0}; + if (precompiled.empty()) { + binary.size = bytecode.size(); + binary.data = const_cast(bytecode.data()); + binary.num_elems = bytecode.size(); + binary.size_of_elem = sizeof(byte_t); + binary.lock = nullptr; + } else { + // skip leading paddings + uint8_t padding_count = static_cast(precompiled[0]); + precompiled.remove_prefix(padding_count + 1); + + binary.size = precompiled.size(); + binary.data = const_cast(precompiled.data()); + binary.num_elems = precompiled.size(); + binary.size_of_elem = sizeof(byte_t); + binary.lock = nullptr; + } + module_ = wasm_module_new(store_.get(), &binary); if (module_ == nullptr) { return false; diff --git a/src/wasm.cc b/src/wasm.cc index cb1dd9b3a..00fb171f1 100644 --- a/src/wasm.cc +++ b/src/wasm.cc @@ -146,10 +146,16 @@ void WasmBase::registerCallbacks() { } else if (abiVersion() == AbiVersion::ProxyWasm_0_2_0) { _REGISTER_PROXY(continue_stream); _REGISTER_PROXY(close_stream); - } else if (abiVersion() == AbiVersion::ProxyWasm_0_2_1) { + } else if (abiVersion() == AbiVersion::ProxyWasm_0_2_1 || + abiVersion() == AbiVersion::ProxyWasm_0_2_100) { _REGISTER_PROXY(continue_stream); _REGISTER_PROXY(close_stream); _REGISTER_PROXY(get_log_level); + _REGISTER_PROXY(redis_init); + _REGISTER_PROXY(redis_call); + _REGISTER_PROXY(inject_encoded_data_to_filter_chain); + _REGISTER_PROXY(set_upstream_override_host); + _REGISTER_PROXY(get_upstream_hosts); } #undef _REGISTER_PROXY @@ -200,6 +206,12 @@ void WasmBase::getFunctions() { _GET_PROXY_ABI(on_request_headers, _abi_02); _GET_PROXY_ABI(on_response_headers, _abi_02); _GET_PROXY(on_foreign_function); + _GET_PROXY(on_redis_call_response); + } else if (abiVersion() == AbiVersion::ProxyWasm_0_2_100) { + _GET_PROXY_ABI(on_request_headers, _abi_03); + _GET_PROXY_ABI(on_response_headers, _abi_02); + _GET_PROXY(on_foreign_function); + _GET_PROXY(on_redis_call_response); } #undef _GET_PROXY_ABI #undef _GET_PROXY @@ -212,7 +224,7 @@ WasmBase::WasmBase(const std::shared_ptr &base_wasm_handle, started_from_(base_wasm_handle->wasm()->wasm_vm()->cloneable()), envs_(base_wasm_handle->wasm()->envs()), allowed_capabilities_(base_wasm_handle->wasm()->allowed_capabilities_), - base_wasm_handle_(base_wasm_handle) { + base_wasm_handle_weak_(base_wasm_handle) { if (started_from_ != Cloneable::NotCloneable) { wasm_vm_ = base_wasm_handle->wasm()->wasm_vm()->clone(); } else { @@ -240,6 +252,7 @@ WasmBase::WasmBase(std::unique_ptr wasm_vm, std::string_view vm_id, } WasmBase::~WasmBase() { + clearWasmInContext(); root_contexts_.clear(); pending_done_.clear(); pending_delete_.clear(); @@ -329,9 +342,14 @@ bool WasmBase::initialize() { } if (started_from_ == Cloneable::NotCloneable) { - auto ok = wasm_vm_->load(base_wasm_handle_->wasm()->moduleBytecode(), - base_wasm_handle_->wasm()->modulePrecompiled(), - base_wasm_handle_->wasm()->functionNames()); + auto base_wasm_handle = base_wasm_handle_weak_.lock(); + if (!base_wasm_handle) { + fail(FailState::UnableToInitializeCode, "Base wasm handle is null"); + return false; + } + auto ok = wasm_vm_->load(base_wasm_handle->wasm()->moduleBytecode(), + base_wasm_handle->wasm()->modulePrecompiled(), + base_wasm_handle->wasm()->functionNames()); if (!ok) { fail(FailState::UnableToInitializeCode, "Failed to load Wasm module from base Wasm"); return false; @@ -339,7 +357,12 @@ bool WasmBase::initialize() { } if (started_from_.has_value()) { - abi_version_ = base_wasm_handle_->wasm()->abiVersion(); + auto base_wasm_handle = base_wasm_handle_weak_.lock(); + if (!base_wasm_handle) { + fail(FailState::UnableToInitializeCode, "Base wasm handle is null"); + return false; + } + abi_version_ = base_wasm_handle->wasm()->abiVersion(); } if (started_from_ != Cloneable::InstantiatedModule) { @@ -378,23 +401,54 @@ ContextBase *WasmBase::getRootContext(const std::shared_ptr &plugin, void WasmBase::startVm(ContextBase *root_context) { // wasi_snapshot_preview1.clock_time_get wasm_vm_->setRestrictedCallback( - true, {// logging (Proxy-Wasm) - "env.proxy_log", - // logging (stdout/stderr) - "wasi_unstable.fd_write", "wasi_snapshot_preview1.fd_write", - // args - "wasi_unstable.args_sizes_get", "wasi_snapshot_preview1.args_sizes_get", - "wasi_unstable.args_get", "wasi_snapshot_preview1.args_get", - // environment variables - "wasi_unstable.environ_sizes_get", "wasi_snapshot_preview1.environ_sizes_get", - "wasi_unstable.environ_get", "wasi_snapshot_preview1.environ_get", - // preopened files/directories - "wasi_unstable.fd_prestat_get", "wasi_snapshot_preview1.fd_prestat_get", - "wasi_unstable.fd_prestat_dir_name", "wasi_snapshot_preview1.fd_prestat_dir_name", - // time - "wasi_unstable.clock_time_get", "wasi_snapshot_preview1.clock_time_get", - // random - "wasi_unstable.random_get", "wasi_snapshot_preview1.random_get"}); + true, + {// logging (Proxy-Wasm) + "env.proxy_log", + // logging (stdout/stderr) + "wasi_unstable.fd_write", "wasi_snapshot_preview1.fd_write", + // args + "wasi_unstable.args_sizes_get", "wasi_snapshot_preview1.args_sizes_get", + "wasi_unstable.args_get", "wasi_snapshot_preview1.args_get", + // environment variables + "wasi_unstable.environ_sizes_get", "wasi_snapshot_preview1.environ_sizes_get", + "wasi_unstable.environ_get", "wasi_snapshot_preview1.environ_get", + // preopened files/directories + "wasi_unstable.fd_prestat_get", "wasi_snapshot_preview1.fd_prestat_get", + "wasi_unstable.fd_prestat_dir_name", "wasi_snapshot_preview1.fd_prestat_dir_name", + // time + "wasi_unstable.clock_time_get", "wasi_snapshot_preview1.clock_time_get", + "wasi_unstable.clock_res_get", "wasi_snapshot_preview1.clock_res_get", + // random + "wasi_unstable.random_get", "wasi_snapshot_preview1.random_get", + // Go runtime initialization + "wasi_unstable.fd_fdstat_get", "wasi_snapshot_preview1.fd_fdstat_get", + "wasi_unstable.fd_fdstat_set_flags", "wasi_snapshot_preview1.fd_fdstat_set_flags", + "wasi_unstable.fd_fdstat_set_rights", "wasi_snapshot_preview1.fd_fdstat_set_rights", + "wasi_unstable.path_filestat_get", "wasi_snapshot_preview1.path_filestat_get", + "wasi_unstable.fd_filestat_get", "wasi_snapshot_preview1.fd_filestat_get", + "wasi_unstable.fd_filestat_set_size", "wasi_snapshot_preview1.fd_filestat_set_size", + "wasi_unstable.fd_filestat_set_times", "wasi_snapshot_preview1.fd_filestat_set_times", + "wasi_unstable.fd_advise", "wasi_snapshot_preview1.fd_advise", "wasi_unstable.fd_allocate", + "wasi_snapshot_preview1.fd_allocate", "wasi_unstable.fd_datasync", + "wasi_snapshot_preview1.fd_datasync", "wasi_unstable.fd_pread", + "wasi_snapshot_preview1.fd_pread", "wasi_unstable.fd_pwrite", + "wasi_snapshot_preview1.fd_pwrite", "wasi_unstable.fd_readdir", + "wasi_snapshot_preview1.fd_readdir", "wasi_unstable.fd_renumber", + "wasi_snapshot_preview1.fd_renumber", "wasi_unstable.fd_sync", + "wasi_snapshot_preview1.fd_sync", "wasi_unstable.fd_tell", "wasi_snapshot_preview1.fd_tell", + "wasi_unstable.path_create_directory", "wasi_snapshot_preview1.path_create_directory", + "wasi_unstable.path_filestat_set_times", "wasi_snapshot_preview1.path_filestat_set_times", + "wasi_unstable.path_link", "wasi_snapshot_preview1.path_link", "wasi_unstable.path_readlink", + "wasi_snapshot_preview1.path_readlink", "wasi_unstable.path_remove_directory", + "wasi_snapshot_preview1.path_remove_directory", "wasi_unstable.path_rename", + "wasi_snapshot_preview1.path_rename", "wasi_unstable.path_symlink", + "wasi_snapshot_preview1.path_symlink", "wasi_unstable.path_unlink_file", + "wasi_snapshot_preview1.path_unlink_file", "wasi_unstable.poll_oneoff", + "wasi_snapshot_preview1.poll_oneoff", "wasi_unstable.sock_accept", + "wasi_snapshot_preview1.sock_accept", "wasi_unstable.sock_recv", + "wasi_snapshot_preview1.sock_recv", "wasi_unstable.sock_send", + "wasi_snapshot_preview1.sock_send", "wasi_unstable.sock_shutdown", + "wasi_snapshot_preview1.sock_shutdown"}); if (_initialize_) { // WASI reactor. _initialize_(root_context); @@ -580,6 +634,76 @@ std::shared_ptr getThreadLocalWasm(std::string_view vm_key) { return nullptr; } +void setWasmFailCallback(const std::string &vm_key, + const std::shared_ptr &wasm_handle) { + wasm_handle->wasm()->wasm_vm()->addFailCallback([vm_key](proxy_wasm::FailState fail_state) { + if (fail_state == proxy_wasm::FailState::RuntimeError) { + // If VM failed, erase the entry so that: + // 1) we can recreate the new thread local VM from the same base_wasm. + // 2) we wouldn't reuse the failed VM for new plugins accidentally. + local_wasms.erase(vm_key); + } + }); +} + +void setWasmRecoverCallback(const std::string &vm_key, + const std::shared_ptr &wasm_handle, + const std::shared_ptr &base_handle, + const WasmHandleCloneFactory &clone_factory) { + std::weak_ptr wasm_handle_for_copy = wasm_handle; + std::weak_ptr base_handle_for_copy = base_handle; + wasm_handle->setRecoverVmCallback([vm_key, wasm_handle_for_copy, base_handle_for_copy, + clone_factory]() -> std::shared_ptr { + const auto base_handle = base_handle_for_copy.lock(); + if (!base_handle) { + std::cerr << "Failed to get base_handle shared_ptr in setRecoverVmCallback" + << "\n"; + return nullptr; + } + const auto &integration = base_handle->wasm()->wasm_vm()->integration(); + integration->trace("Start recover wasm_handle"); + + // Check if this is a proactive rebuild (shouldRebuild flag is set) + auto old_wasm_handle = wasm_handle_for_copy.lock(); + bool is_proactive_rebuild = (old_wasm_handle && old_wasm_handle->wasm()->shouldRebuild()); + + auto it = local_wasms.find(vm_key); + if (it != local_wasms.end()) { + auto wasm_handle = it->second.lock(); + if (wasm_handle && !is_proactive_rebuild) { + integration->trace("Wasm handle already exists, reuse for fail recovery"); + return wasm_handle; + } + // For proactive rebuild, force erase the cache to create new instance + integration->trace("Proactive rebuild: erase existing cache"); + local_wasms.erase(vm_key); + } + removeStaleLocalCacheEntries(local_wasms, local_wasms_keys); + // Try to recover wasm vm + auto new_handle = clone_factory(base_handle); + if (!new_handle) { + std::cerr << "Failed to clone Base Wasm during recover" + << "\n"; + base_handle->wasm()->fail(FailState::RecoverError, + "Failed to clone Base Wasm during recover"); + return nullptr; + } + + if (!new_handle->wasm()->initialize()) { + std::cerr << "Failed to initialize Wasm code during recover" + << "\n"; + base_handle->wasm()->fail(FailState::RecoverError, + "Failed to initialize Wasm code during recover"); + return nullptr; + } + cacheLocalWasm(vm_key, new_handle); + setWasmFailCallback(vm_key, new_handle); + setWasmRecoverCallback(vm_key, new_handle, base_handle, clone_factory); + integration->trace("Wasm handle has been recovered"); + return new_handle; + }); +} + static std::shared_ptr getOrCreateThreadLocalWasm(const std::shared_ptr &base_handle, const WasmHandleCloneFactory &clone_factory) { @@ -606,15 +730,80 @@ getOrCreateThreadLocalWasm(const std::shared_ptr &base_handle, return nullptr; } cacheLocalWasm(vm_key, wasm_handle); - wasm_handle->wasm()->wasm_vm()->addFailCallback([vm_key](proxy_wasm::FailState fail_state) { + setWasmFailCallback(vm_key, wasm_handle); + setWasmRecoverCallback(vm_key, wasm_handle, base_handle, clone_factory); + return wasm_handle; +} + +void setPluginFailCallback(const std::string &key, + const std::shared_ptr &wasm_handle) { + wasm_handle->wasm()->wasm_vm()->addFailCallback(key, [key](proxy_wasm::FailState fail_state) { if (fail_state == proxy_wasm::FailState::RuntimeError) { // If VM failed, erase the entry so that: - // 1) we can recreate the new thread local VM from the same base_wasm. - // 2) we wouldn't reuse the failed VM for new plugins accidentally. - local_wasms.erase(vm_key); - }; + // 1) we can recreate the new thread local plugin from the same base_wasm. + // 2) we wouldn't reuse the failed VM for new plugin configs accidentally. + local_plugins.erase(key); + } }); - return wasm_handle; +} + +void setPluginRecoverCallback(const std::string &key, + const std::shared_ptr &plugin_handle, + const std::shared_ptr &base_handle, + const std::shared_ptr &plugin, + const PluginHandleFactory &plugin_factory) { + std::weak_ptr base_handle_for_copy = base_handle; + + plugin_handle->setRecoverPluginCallback( + [key, base_handle_for_copy, plugin, plugin_factory]( + std::shared_ptr &wasm_handle) -> std::shared_ptr { + const auto base_handle = base_handle_for_copy.lock(); + if (!base_handle) { + std::cerr << "Failed to get base_handle shared_ptr in setRecoverPluginCallback" + << "\n"; + return nullptr; + } + const auto &integration = base_handle->wasm()->wasm_vm()->integration(); + integration->trace("Start recover plugin_handle"); + auto it = local_plugins.find(key); + if (it != local_plugins.end()) { + auto plugin_handle = it->second.lock(); + // Check if the associated wasm needs rebuild + bool should_rebuild = (plugin_handle && plugin_handle->wasmHandle() && + plugin_handle->wasmHandle()->wasm() && + plugin_handle->wasmHandle()->wasm()->shouldRebuild()); + if (plugin_handle && !should_rebuild) { + integration->trace("Plugin handle already exists, reuse for fail recovery"); + return plugin_handle; + } + // For proactive rebuild, force erase the cache to create new instance + integration->trace("Proactive rebuild: erase existing plugin cache"); + local_plugins.erase(key); + } + removeStaleLocalCacheEntries(local_plugins, local_plugins_keys); + // Try to recover wasm plugin + auto *plugin_context = wasm_handle->wasm()->start(plugin); + if (plugin_context == nullptr) { + std::cerr << "Failed to start thread-local Wasm during recover" + << "\n"; + base_handle->wasm()->fail(FailState::RecoverError, + "Failed to start thread-local Wasm during recover"); + return nullptr; + } + if (!wasm_handle->wasm()->configure(plugin_context, plugin)) { + std::cerr << "Failed to configure thread-local Wasm plugin during recover" + << "\n"; + base_handle->wasm()->fail(FailState::RecoverError, + "Failed to configure thread-local Wasm plugin during recover"); + return nullptr; + } + auto new_handle = plugin_factory(wasm_handle, plugin); + cacheLocalPlugin(key, new_handle); + setPluginFailCallback(key, wasm_handle); + setPluginRecoverCallback(key, new_handle, base_handle, plugin, plugin_factory); + integration->trace("Plugin handle has been recovered"); + return new_handle; + }); } std::shared_ptr getOrCreateThreadLocalPlugin( @@ -649,14 +838,9 @@ std::shared_ptr getOrCreateThreadLocalPlugin( } auto plugin_handle = plugin_factory(wasm_handle, plugin); cacheLocalPlugin(key, plugin_handle); - wasm_handle->wasm()->wasm_vm()->addFailCallback([key](proxy_wasm::FailState fail_state) { - if (fail_state == proxy_wasm::FailState::RuntimeError) { - // If VM failed, erase the entry so that: - // 1) we can recreate the new thread local plugin from the same base_wasm. - // 2) we wouldn't reuse the failed VM for new plugin configs accidentally. - local_plugins.erase(key); - }; - }); + plugin_handle->setPluginHandleKey(key); + setPluginFailCallback(key, wasm_handle); + setPluginRecoverCallback(key, plugin_handle, base_handle, plugin, plugin_factory); return plugin_handle; } diff --git a/test/wasm_test.cc b/test/wasm_test.cc index 2a3e60cc4..342dc4c55 100644 --- a/test/wasm_test.cc +++ b/test/wasm_test.cc @@ -117,6 +117,132 @@ TEST_P(TestVm, GetOrCreateThreadLocalWasmFailCallbacks) { ASSERT_NE(thread_local_plugin3->wasm(), thread_local_plugin2->wasm()); } +// Recover only used for WasmVMs - not available for NullVM. +TEST_P(TestVm, RecoverCrashedThreadLocalWasm) { + const auto *const plugin_name = "plugin_name"; + const auto *const root_id = "root_id"; + const auto *const vm_id = "vm_id"; + const auto *const vm_config = "vm_config"; + const auto *const plugin_config = "plugin_config"; + const auto fail_open = false; + + // Create a plugin. + const auto plugin = std::make_shared(plugin_name, root_id, vm_id, engine_, + plugin_config, fail_open, "plugin_key"); + + // Define callbacks. + WasmHandleFactory wasm_handle_factory = + [this, vm_id, vm_config](std::string_view vm_key) -> std::shared_ptr { + auto base_wasm = std::make_shared(makeVm(engine_), vm_id, vm_config, vm_key, + std::unordered_map{}, + AllowedCapabilitiesMap{}); + return std::make_shared(base_wasm); + }; + + WasmHandleCloneFactory wasm_handle_clone_factory = + [this](const std::shared_ptr &base_wasm_handle) + -> std::shared_ptr { + auto wasm = std::make_shared( + base_wasm_handle, [this]() -> std::unique_ptr { return makeVm(engine_); }); + return std::make_shared(wasm); + }; + + PluginHandleFactory plugin_handle_factory = + [](const std::shared_ptr &base_wasm, + const std::shared_ptr &plugin) -> std::shared_ptr { + return std::make_shared(base_wasm, plugin); + }; + + // Read the minimal loadable binary. + auto source = readTestWasmFile("abi_export.wasm"); + + // Create base Wasm via createWasm. + auto base_wasm_handle = + createWasm("vm_key", source, plugin, wasm_handle_factory, wasm_handle_clone_factory, false); + ASSERT_TRUE(base_wasm_handle && base_wasm_handle->wasm()); + + // Create a thread local plugin. + auto plugin_handle = getOrCreateThreadLocalPlugin( + base_wasm_handle, plugin, wasm_handle_clone_factory, plugin_handle_factory); + // Cause runtime crash. + plugin_handle->wasm()->wasm_vm()->fail(FailState::RuntimeError, "runtime error msg"); + ASSERT_TRUE(plugin_handle->wasm()->isFailed()); + + // do recover. + std::shared_ptr new_plugin_handle; + ASSERT_TRUE(plugin_handle->rebuild(new_plugin_handle)); + // Verify recover success. + ASSERT_FALSE(new_plugin_handle->wasm()->isFailed()); + // Verify the pointer to WasmBase is different from the crashed one. + ASSERT_NE(new_plugin_handle->wasm(), plugin_handle->wasm()); + + // Cause runtime crash again. + new_plugin_handle->wasm()->wasm_vm()->fail(FailState::RuntimeError, "runtime error msg"); + ASSERT_TRUE(new_plugin_handle->wasm()->isFailed()); + // Do recover again. + std::shared_ptr new_plugin_handle2; + ASSERT_TRUE(new_plugin_handle->rebuild(new_plugin_handle2)); + // Verify recover again success. + ASSERT_FALSE(new_plugin_handle2->wasm()->isFailed()); + // Verify the pointer to WasmBase is different from the crashed one. + ASSERT_NE(new_plugin_handle2->wasm(), new_plugin_handle->wasm()); + + // This time, create another thread local plugin with *different* plugin key for the same vm_key. + // This one should reuse the recovered VM. + const auto another_plugin = std::make_shared( + plugin_name, root_id, vm_id, engine_, plugin_config, fail_open, "another_plugin_key"); + auto another_handle = getOrCreateThreadLocalPlugin( + base_wasm_handle, another_plugin, wasm_handle_clone_factory, plugin_handle_factory); + ASSERT_TRUE(another_handle && another_handle->plugin()); + ASSERT_FALSE(another_handle->wasm()->isFailed()); + // Verify the pointer to WasmBase is same with recovered one + ASSERT_EQ(another_handle->wasm(), new_plugin_handle2->wasm()); + // Verify plugin handle is different + ASSERT_NE(another_handle, new_plugin_handle2); + + // Cause runtime crash again. + new_plugin_handle2->wasm()->wasm_vm()->fail(FailState::RuntimeError, "runtime error msg"); + ASSERT_TRUE(new_plugin_handle2->wasm()->isFailed()); + // Create another thread local plugin with *different* plugin key before recover. + // This one also should not end up using the failed VM. + auto another_handle2 = getOrCreateThreadLocalPlugin( + base_wasm_handle, another_plugin, wasm_handle_clone_factory, plugin_handle_factory); + ASSERT_TRUE(another_handle2 && another_handle2->plugin()); + ASSERT_FALSE(another_handle2->wasm()->isFailed()); + // Verify the pointer to WasmBase is different from the failed one. + ASSERT_NE(another_handle2->wasm(), new_plugin_handle2->wasm()); + // Do recover again. + std::shared_ptr new_plugin_handle3; + ASSERT_TRUE(new_plugin_handle2->rebuild(new_plugin_handle3)); + // Verify the pointer to WasmBase is different from the crashed one. + ASSERT_NE(new_plugin_handle3->wasm(), new_plugin_handle2->wasm()); + + // Cause the another plugin with same vm_key crash. + another_handle2->wasm()->wasm_vm()->fail(FailState::RuntimeError, "runtime error msg"); + ASSERT_TRUE(another_handle2->wasm()->isFailed()); + // Do recover again + std::shared_ptr new_another_handle2; + ASSERT_TRUE(another_handle2->rebuild(new_another_handle2)); + // Verify the pointer to WasmBase is different from the crashed one. + ASSERT_NE(new_another_handle2->wasm(), another_handle2->wasm()); + + // Cause the another plugin crash again + new_another_handle2->wasm()->wasm_vm()->fail(FailState::RuntimeError, "runtime error msg"); + ASSERT_TRUE(new_another_handle2->wasm()->isFailed()); + // Create thread local plugin with same plugin key + auto another_handle3 = getOrCreateThreadLocalPlugin( + base_wasm_handle, another_plugin, wasm_handle_clone_factory, plugin_handle_factory); + ASSERT_TRUE(another_handle3 && another_handle3->plugin()); + ASSERT_FALSE(another_handle3->wasm()->isFailed()); + // Verify the pointer to WasmBase is different from the failed one. + ASSERT_NE(another_handle3->wasm(), new_another_handle2->wasm()); + // Do recover again. + std::shared_ptr new_another_handle3; + ASSERT_TRUE(new_another_handle2->rebuild(new_another_handle3)); + // Recover should reuse the plugin handle + ASSERT_EQ(new_another_handle3, another_handle3); +} + // Tests the canary is always applied when making a call `createWasm` TEST_P(TestVm, AlwaysApplyCanary) { // Use different root_id, but the others are the same