diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index ab4bd951..4fa18732 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -1,4 +1,4 @@ -# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -31,8 +31,8 @@ on: jobs: pre-commit: - runs-on: ubuntu-22.04 + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v3 - - uses: pre-commit/action@v3.0.0 + - uses: actions/checkout@v5.0.0 + - uses: actions/setup-python@v6.0.0 + - uses: pre-commit/action@v3.0.1 diff --git a/.gitignore b/.gitignore index bafd2974..419005f0 100644 --- a/.gitignore +++ b/.gitignore @@ -140,3 +140,4 @@ dmypy.json # vscode .vscode/settings.json +.vscode/c_cpp_properties.json diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 298baab6..3c76a6ed 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,4 @@ -# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -25,7 +25,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. repos: -- repo: https://github.com/timothycrosley/isort +- repo: https://github.com/PyCQA/isort rev: 5.12.0 hooks: - id: isort @@ -36,7 +36,7 @@ repos: - id: black types_or: [python, cython] - repo: https://github.com/PyCQA/flake8 - rev: 5.0.4 + rev: 7.3.0 hooks: - id: flake8 args: [--max-line-length=88, --select=C,E,F,W,B,B950, --extend-ignore = E203,E501] @@ -57,7 +57,7 @@ repos: # More details about these pre-commit hooks here: # https://pre-commit.com/hooks.html - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v6.0.0 hooks: - id: check-case-conflict - id: check-executables-have-shebangs diff --git a/CMakeLists.txt b/CMakeLists.txt index 69c7c698..f5c5b293 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -24,7 +24,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -cmake_minimum_required(VERSION 3.17) +cmake_minimum_required(VERSION 3.31.8) project(tritonpythonbackend LANGUAGES C CXX) @@ -122,10 +122,12 @@ FetchContent_MakeAvailable(dlpack) # # Boost # +set(TRITON_BOOST_URL "https://archives.boost.io/release/1.80.0/source/boost_1_80_0.tar.gz" CACHE STRING "Boost source code URL") + ExternalProject_Add( boostorg - URL https://archives.boost.io/release/1.79.0/source/boost_1_79_0.tar.gz - URL_HASH SHA256=273f1be93238a068aba4f9735a4a2b003019af067b9c183ed227780b8f36062c + URL ${TRITON_BOOST_URL} + URL_HASH SHA256=4b2136f98bdd1f5857f1c3dea9ac2018effe65286cf251534b6ae20cc45e1847 PREFIX "boost-src" CONFIGURE_COMMAND ${CMAKE_COMMAND} -E copy_directory /boost/ ${CMAKE_BINARY_DIR}/boost @@ -237,10 +239,14 @@ set( src/response_sender.h src/pb_stub.h src/pb_stub.cc + src/pb_stub_log.h + src/pb_stub_log.cc src/pb_response_iterator.h src/pb_response_iterator.cc src/pb_cancel.cc src/pb_cancel.h + src/pb_bls_cancel.cc + src/pb_bls_cancel.h ) list(APPEND diff --git a/README.md b/README.md index b00dc0bf..dd5e877a 100644 --- a/README.md +++ b/README.md @@ -1409,14 +1409,52 @@ class TritonPythonModel: A complete example for sync and async BLS for decoupled models is included in the [Examples](#examples) section. +Note: Async BLS is not supported on Python 3.6 or lower due to the `async` +keyword and `asyncio.run` being introduced in Python 3.7. + Starting from the 22.04 release, the lifetime of the BLS output tensors have been improved such that if a tensor is no longer needed in your Python model it will be automatically deallocated. This can increase the number of BLS requests that you can execute in your model without running into the out of GPU or shared memory error. -Note: Async BLS is not supported on Python 3.6 or lower due to the `async` -keyword and `asyncio.run` being introduced in Python 3.7. +### Cancelling decoupled BLS requests +A decoupled BLS inference request may be cancelled by calling the `cancel()` +method on the response iterator returned from the method executing the BLS +inference request. For example, + +```python +import triton_python_backend_utils as pb_utils + +class TritonPythonModel: + ... + def execute(self, requests): + ... + bls_response_iterator = bls_request.exec(decoupled=True) + ... + bls_response_iterator.cancel() + ... +``` + +You may also call the `cancel()` method on the response iterator returned from +the `async_exec()` method of the inference request. For example, + +```python +import triton_python_backend_utils as pb_utils + +class TritonPythonModel: + ... + async def execute(self, requests): + ... + bls_response_iterator = await bls_request.async_exec(decoupled=True) + ... + bls_response_iterator.cancel() + ... +``` + +Note: Whether the decoupled model returns a cancellation error and stops executing +the request depends on the model's backend implementation. Please refer to the +documentation for more details [Handing in Backend](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/request_cancellation.md#handling-in-backend) ## Model Loading API diff --git a/src/infer_payload.cc b/src/infer_payload.cc index 762201e8..6baad307 100644 --- a/src/infer_payload.cc +++ b/src/infer_payload.cc @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -31,7 +31,8 @@ namespace triton { namespace backend { namespace python { InferPayload::InferPayload( const bool is_decoupled, std::function)> callback) - : is_decoupled_(is_decoupled), is_promise_set_(false), callback_(callback) + : is_decoupled_(is_decoupled), is_promise_set_(false), callback_(callback), + request_address_(reinterpret_cast(nullptr)) { promise_.reset(new std::promise>()); } @@ -91,4 +92,31 @@ InferPayload::ResponseAllocUserp() return response_alloc_userp_; } +void +InferPayload::SetRequestAddress(intptr_t request_address) +{ + std::unique_lock lock(request_address_mutex_); + request_address_ = request_address; +} + +void +InferPayload::SetRequestCancellationFunc( + const std::function& request_cancel_func) +{ + request_cancel_func_ = request_cancel_func; +} + +void +InferPayload::SafeCancelRequest() +{ + std::unique_lock lock(request_address_mutex_); + if (request_address_ == 0L) { + return; + } + + if (request_cancel_func_) { + request_cancel_func_(request_address_); + } +} + }}} // namespace triton::backend::python diff --git a/src/infer_payload.h b/src/infer_payload.h index 662e8922..8e4aa7d3 100644 --- a/src/infer_payload.h +++ b/src/infer_payload.h @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -62,6 +62,10 @@ class InferPayload : public std::enable_shared_from_this { void SetResponseAllocUserp( const ResponseAllocatorUserp& response_alloc_userp); std::shared_ptr ResponseAllocUserp(); + void SetRequestAddress(intptr_t request_address); + void SetRequestCancellationFunc( + const std::function& request_cancel_func); + void SafeCancelRequest(); private: std::unique_ptr>> promise_; @@ -70,6 +74,9 @@ class InferPayload : public std::enable_shared_from_this { bool is_promise_set_; std::function)> callback_; std::shared_ptr response_alloc_userp_; + std::mutex request_address_mutex_; + intptr_t request_address_; + std::function request_cancel_func_; }; }}} // namespace triton::backend::python diff --git a/src/infer_response.cc b/src/infer_response.cc index a6b6847d..382756d4 100644 --- a/src/infer_response.cc +++ b/src/infer_response.cc @@ -91,6 +91,7 @@ InferResponse::SaveToSharedMemory( response_shm_ptr->is_error_set = false; shm_handle_ = response_shm_.handle_; response_shm_ptr->is_last_response = is_last_response_; + response_shm_ptr->id = id_; // Only save the output tensors to shared memory when the inference response // doesn't have error. @@ -113,7 +114,6 @@ InferResponse::SaveToSharedMemory( tensor_handle_shm_ptr[j] = output_tensor->ShmHandle(); j++; } - response_shm_ptr->id = id_; parameters_shm_ = PbString::Create(shm_pool, parameters_); response_shm_ptr->parameters = parameters_shm_->ShmHandle(); diff --git a/src/ipc_message.h b/src/ipc_message.h index c3d1472e..c0fab3a3 100644 --- a/src/ipc_message.h +++ b/src/ipc_message.h @@ -1,4 +1,4 @@ -// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -67,7 +67,8 @@ typedef enum PYTHONSTUB_commandtype_enum { PYTHONSTUB_LoadModelRequest, PYTHONSTUB_UnloadModelRequest, PYTHONSTUB_ModelReadinessRequest, - PYTHONSTUB_IsRequestCancelled + PYTHONSTUB_IsRequestCancelled, + PYTHONSTUB_CancelBLSInferRequest } PYTHONSTUB_CommandType; /// diff --git a/src/message_queue.h b/src/message_queue.h index e9c47afd..06661c66 100644 --- a/src/message_queue.h +++ b/src/message_queue.h @@ -1,4 +1,4 @@ -// Copyright 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -32,14 +32,19 @@ #include #include +#include "pb_utils.h" #include "shm_manager.h" +#ifdef TRITON_PB_STUB +#include "pb_stub_log.h" +#endif namespace triton { namespace backend { namespace python { namespace bi = boost::interprocess; /// Struct holding the representation of a message queue inside the shared /// memory. -/// \param size Total size of the message queue. +/// \param size Total size of the message queue. Considered invalid after +/// MessageQueue::LoadFromSharedMemory. Check DLIS-8378 for additional details. /// \param mutex Handle of the mutex variable protecting index. /// \param index Used element index. /// \param sem_empty Semaphore object counting the number of empty buffer slots. @@ -110,7 +115,22 @@ class MessageQueue { { bi::scoped_lock lock{*MutexMutable()}; - Buffer()[Head()] = message; + int head_idx = Head(); + // Additional check to avoid out of bounds read/write. Check DLIS-8378 for + // additional details. + if (head_idx < 0 || static_cast(head_idx) >= Size()) { + std::string error_msg = + "internal error: message queue head index out of bounds. Expects " + "positive integer less than the size of message queue " + + std::to_string(Size()) + " but got " + std::to_string(head_idx); +#ifdef TRITON_PB_STUB + LOG_ERROR << error_msg; +#else + LOG_MESSAGE(TRITONSERVER_LOG_ERROR, error_msg.c_str()); +#endif + return; + } + Buffer()[head_idx] = message; HeadIncrement(); } SemFullMutable()->post(); @@ -145,7 +165,22 @@ class MessageQueue { } success = true; - Buffer()[Head()] = message; + int head_idx = Head(); + // Additional check to avoid out of bounds read/write. Check DLIS-8378 for + // additional details. + if (head_idx < 0 || static_cast(head_idx) >= Size()) { + std::string error_msg = + "internal error: message queue head index out of bounds. Expects " + "positive integer less than the size of message queue " + + std::to_string(Size()) + " but got " + std::to_string(head_idx); +#ifdef TRITON_PB_STUB + LOG_ERROR << error_msg; +#else + LOG_MESSAGE(TRITONSERVER_LOG_ERROR, error_msg.c_str()); +#endif + return; + } + Buffer()[head_idx] = message; HeadIncrement(); } SemFullMutable()->post(); @@ -244,7 +279,7 @@ class MessageQueue { } private: - std::size_t& Size() { return mq_shm_ptr_->size; } + uint32_t Size() { return size_; } const bi::interprocess_mutex& Mutex() { return mq_shm_ptr_->mutex; } bi::interprocess_mutex* MutexMutable() { return &(mq_shm_ptr_->mutex); } int& Head() { return mq_shm_ptr_->head; } @@ -273,6 +308,7 @@ class MessageQueue { MessageQueueShm* mq_shm_ptr_; T* mq_buffer_shm_ptr_; bi::managed_external_buffer::handle_t mq_handle_; + uint32_t size_; /// Create/load a Message queue. /// \param mq_shm Message queue representation in shared memory. @@ -284,6 +320,7 @@ class MessageQueue { mq_buffer_shm_ptr_ = mq_buffer_shm_.data_.get(); mq_shm_ptr_ = mq_shm_.data_.get(); mq_handle_ = mq_shm_.handle_; + size_ = mq_shm_ptr_->size; } }; }}} // namespace triton::backend::python diff --git a/src/pb_bls_cancel.cc b/src/pb_bls_cancel.cc new file mode 100644 index 00000000..4341c037 --- /dev/null +++ b/src/pb_bls_cancel.cc @@ -0,0 +1,93 @@ +// Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "pb_bls_cancel.h" + +#include "pb_stub.h" +#include "pb_stub_log.h" + +namespace triton { namespace backend { namespace python { + +void +PbBLSCancel::SaveToSharedMemory(std::unique_ptr& shm_pool) +{ + cancel_shm_ = shm_pool->Construct(); + new (&(cancel_shm_.data_->mu)) bi::interprocess_mutex; + new (&(cancel_shm_.data_->cv)) bi::interprocess_condition; + cancel_shm_.data_->waiting_on_stub = false; + cancel_shm_.data_->infer_payload_id = infer_playload_id_; + cancel_shm_.data_->is_cancelled = is_cancelled_; +} + +bi::managed_external_buffer::handle_t +PbBLSCancel::ShmHandle() +{ + return cancel_shm_.handle_; +} + +CancelBLSRequestMessage* +PbBLSCancel::ShmPayload() +{ + return cancel_shm_.data_.get(); +} + +void +PbBLSCancel::Cancel() +{ + // Release the GIL. Python objects are not accessed during the check. + py::gil_scoped_release gil_release; + + std::unique_lock lk(mu_); + // The cancelled flag can only move from false to true, not the other way, so + // it is checked on each query until cancelled and then implicitly cached. + if (is_cancelled_) { + return; + } + if (!updating_) { + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + if (!stub->StubToParentServiceActive()) { + LOG_ERROR << "Cannot communicate with parent service"; + return; + } + + stub->EnqueueCancelBLSRequest(this); + updating_ = true; + } + cv_.wait(lk, [this] { return !updating_; }); +} + +void +PbBLSCancel::ReportIsCancelled(bool is_cancelled) +{ + { + std::lock_guard lk(mu_); + is_cancelled_ = is_cancelled; + updating_ = false; + } + cv_.notify_all(); +} + +}}} // namespace triton::backend::python diff --git a/src/pb_bls_cancel.h b/src/pb_bls_cancel.h new file mode 100644 index 00000000..7fdd3fbf --- /dev/null +++ b/src/pb_bls_cancel.h @@ -0,0 +1,63 @@ +// Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include + +#include "pb_utils.h" + +namespace triton { namespace backend { namespace python { + +class PbBLSCancel { + public: + PbBLSCancel(void* infer_playload_id) + : updating_(false), infer_playload_id_(infer_playload_id), + is_cancelled_(false) + { + } + DISALLOW_COPY_AND_ASSIGN(PbBLSCancel); + + void SaveToSharedMemory(std::unique_ptr& shm_pool); + bi::managed_external_buffer::handle_t ShmHandle(); + CancelBLSRequestMessage* ShmPayload(); + + void Cancel(); + void ReportIsCancelled(bool is_cancelled); + + private: + AllocatedSharedMemory cancel_shm_; + + std::mutex mu_; + std::condition_variable cv_; + bool updating_; + + void* infer_playload_id_; + bool is_cancelled_; +}; + +}}}; // namespace triton::backend::python diff --git a/src/pb_cancel.cc b/src/pb_cancel.cc index 0774261d..da9daf98 100644 --- a/src/pb_cancel.cc +++ b/src/pb_cancel.cc @@ -1,4 +1,4 @@ -// Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -27,6 +27,7 @@ #include "pb_cancel.h" #include "pb_stub.h" +#include "pb_stub_log.h" namespace triton { namespace backend { namespace python { diff --git a/src/pb_memory.cc b/src/pb_memory.cc index fa32bb1c..5b678f1a 100644 --- a/src/pb_memory.cc +++ b/src/pb_memory.cc @@ -1,4 +1,4 @@ -// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -26,6 +26,8 @@ #include "pb_memory.h" +#include + namespace triton { namespace backend { namespace python { std::unique_ptr @@ -225,7 +227,6 @@ PbMemory::LoadFromSharedMemory( { MemoryShm* memory_shm_ptr = reinterpret_cast(data_shm); char* memory_data_shm = data_shm + sizeof(MemoryShm); - char* data_ptr = nullptr; bool opened_cuda_ipc_handle = false; if (memory_shm_ptr->memory_type == TRITONSERVER_MEMORY_GPU && @@ -260,6 +261,19 @@ PbMemory::LoadFromSharedMemory( } else { data_ptr = memory_data_shm; } + + // This check only validates CPU shared memory access. + if (memory_shm_ptr->memory_type != TRITONSERVER_MEMORY_GPU && + (data_ptr + memory_shm_ptr->byte_size > + (char*)shm_pool->GetBaseAddress() + shm_pool->GetCurrentCapacity())) { + std::ostringstream oss; + oss << "0x" << std::hex + << (reinterpret_cast(data_ptr) + memory_shm_ptr->byte_size); + throw PythonBackendException( + std::string("Attempted to access out of bounds memory address ") + + oss.str()); + } + return std::unique_ptr(new PbMemory( data_shm, data_ptr, handle, opened_cuda_ipc_handle /* opened_cuda_ipc_handle */)); @@ -309,6 +323,19 @@ PbMemory::LoadFromSharedMemory( } else { data_ptr = memory_data_shm; } + + // This check only validates CPU shared memory access. + if (memory_shm_ptr->memory_type != TRITONSERVER_MEMORY_GPU && + (data_ptr + memory_shm_ptr->byte_size > + (char*)shm_pool->GetBaseAddress() + shm_pool->GetCurrentCapacity())) { + std::ostringstream oss; + oss << "0x" << std::hex + << (reinterpret_cast(data_ptr) + memory_shm_ptr->byte_size); + throw PythonBackendException( + std::string("Attempted to access out of bounds memory address ") + + oss.str()); + } + return std::unique_ptr(new PbMemory( memory_shm, data_ptr, opened_cuda_ipc_handle /* opened_cuda_ipc_handle */)); diff --git a/src/pb_response_iterator.cc b/src/pb_response_iterator.cc index 9abf4997..536d4232 100644 --- a/src/pb_response_iterator.cc +++ b/src/pb_response_iterator.cc @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -40,6 +40,7 @@ ResponseIterator::ResponseIterator( : id_(response->Id()), is_finished_(false), is_cleared_(false), idx_(0) { response_buffer_.push(response); + pb_bls_cancel_ = std::make_shared(response->Id()); } ResponseIterator::~ResponseIterator() @@ -159,4 +160,12 @@ ResponseIterator::GetExistingResponses() return responses; } +void +ResponseIterator::Cancel() +{ + if (!is_finished_) { + pb_bls_cancel_->Cancel(); + } +} + }}} // namespace triton::backend::python diff --git a/src/pb_response_iterator.h b/src/pb_response_iterator.h index cad5ff1f..cb26d6a3 100644 --- a/src/pb_response_iterator.h +++ b/src/pb_response_iterator.h @@ -1,4 +1,4 @@ -// Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -29,6 +29,7 @@ #include #include "infer_response.h" +#include "pb_bls_cancel.h" namespace triton { namespace backend { namespace python { @@ -43,6 +44,7 @@ class ResponseIterator { void* Id(); void Clear(); std::vector> GetExistingResponses(); + void Cancel(); private: std::vector> responses_; @@ -53,6 +55,7 @@ class ResponseIterator { bool is_finished_; bool is_cleared_; size_t idx_; + std::shared_ptr pb_bls_cancel_; }; }}} // namespace triton::backend::python diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 4e09ea1d..56048d78 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -49,6 +49,7 @@ #include "pb_preferred_memory.h" #include "pb_response_iterator.h" #include "pb_string.h" +#include "pb_stub_log.h" #include "pb_utils.h" #include "response_sender.h" #include "scoped_defer.h" @@ -1039,11 +1040,13 @@ Stub::~Stub() { #ifdef TRITON_ENABLE_GPU try { - CUDAHandler& cuda_api = CUDAHandler::getInstance(); - for (auto& m : - shm_pool_->GetCUDAMemoryPoolManager()->CUDAPoolAddressMap()) { - if (m.second != nullptr) { - cuda_api.CloseCudaHandle(m.first, m.second); + if (shm_pool_ != nullptr) { + CUDAHandler& cuda_api = CUDAHandler::getInstance(); + for (auto& m : + shm_pool_->GetCUDAMemoryPoolManager()->CUDAPoolAddressMap()) { + if (m.second != nullptr) { + cuda_api.CloseCudaHandle(m.first, m.second); + } } } } @@ -1052,13 +1055,14 @@ Stub::~Stub() } #endif - { + // Ensure the interpreter is active before trying to clean up. + if (Py_IsInitialized()) { py::gil_scoped_acquire acquire; py::object async_event_loop_local(std::move(async_event_loop_)); py::object background_futures_local(std::move(background_futures_)); py::object model_instance_local(std::move(model_instance_)); } - stub_instance_.reset(); + stub_message_queue_.reset(); parent_message_queue_.reset(); stub_to_parent_mq_.reset(); @@ -1136,6 +1140,9 @@ Stub::ServiceStubToParentRequests() } else if ( utils_msg_payload->command_type == PYTHONSTUB_IsRequestCancelled) { SendIsCancelled(utils_msg_payload); + } else if ( + utils_msg_payload->command_type == PYTHONSTUB_CancelBLSInferRequest) { + SendCancelBLSRequest(utils_msg_payload); } else { std::cerr << "Error when sending message via stub_to_parent message " "buffer - unknown command\n"; @@ -1221,6 +1228,46 @@ Stub::EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type) } } +void +Stub::SendCancelBLSRequest( + std::unique_ptr& utils_msg_payload) +{ + PbBLSCancel* pb_bls_cancel = + reinterpret_cast(utils_msg_payload->utils_message_ptr); + pb_bls_cancel->SaveToSharedMemory(shm_pool_); + + CancelBLSRequestMessage* message_payload = pb_bls_cancel->ShmPayload(); + std::unique_ptr ipc_message = + IPCMessage::Create(shm_pool_, false /* inline_response */); + ipc_message->Command() = utils_msg_payload->command_type; + ipc_message->Args() = pb_bls_cancel->ShmHandle(); + + bool is_cancelled = false; + { + bi::scoped_lock lk(message_payload->mu); + + SendIPCUtilsMessage(ipc_message); + while (!message_payload->waiting_on_stub) { + message_payload->cv.wait(lk); + } + + is_cancelled = message_payload->is_cancelled; + message_payload->waiting_on_stub = false; + message_payload->cv.notify_all(); + } + pb_bls_cancel->ReportIsCancelled(is_cancelled); +} + +void +Stub::EnqueueCancelBLSRequest(PbBLSCancel* pb_bls_cancel) +{ + std::unique_ptr utils_msg_payload = + std::make_unique( + PYTHONSTUB_CancelBLSInferRequest, + reinterpret_cast(pb_bls_cancel)); + EnqueueUtilsMessage(std::move(utils_msg_payload)); +} + void Stub::EnqueueIsCancelled(PbCancel* pb_cancel) { @@ -1526,138 +1573,6 @@ Stub::ProcessBLSResponseDecoupled(std::unique_ptr& ipc_message) } } -std::unique_ptr Logger::log_instance_; - -std::unique_ptr& -Logger::GetOrCreateInstance() -{ - if (Logger::log_instance_.get() == nullptr) { - Logger::log_instance_ = std::make_unique(); - } - - return Logger::log_instance_; -} - -// Bound function, called from the python client -void -Logger::Log(const std::string& message, LogLevel level) -{ - std::unique_ptr& stub = Stub::GetOrCreateInstance(); - py::object frame = py::module_::import("inspect").attr("currentframe"); - py::object caller_frame = frame(); - py::object info = py::module_::import("inspect").attr("getframeinfo"); - py::object caller_info = info(caller_frame); - py::object filename_python = caller_info.attr("filename"); - std::string filename = filename_python.cast(); - py::object lineno = caller_info.attr("lineno"); - uint32_t line = lineno.cast(); - - if (!stub->StubToParentServiceActive()) { - Logger::GetOrCreateInstance()->Log(filename, line, level, message); - } else { - std::unique_ptr log_msg(new PbLog(filename, line, message, level)); - stub->EnqueueLogRequest(log_msg); - } -} - -// Called internally (.e.g. LOG_ERROR << "Error"; ) -void -Logger::Log( - const std::string& filename, uint32_t lineno, LogLevel level, - const std::string& message) -{ - // If the log monitor service is not active yet, format - // and pass messages to cerr - if (!BackendLoggingActive()) { - std::string path(filename); - size_t pos = path.rfind(std::filesystem::path::preferred_separator); - if (pos != std::string::npos) { - path = path.substr(pos + 1, std::string::npos); - } -#ifdef _WIN32 - std::stringstream ss; - SYSTEMTIME system_time; - GetSystemTime(&system_time); - ss << LeadingLogChar(level) << std::setfill('0') << std::setw(2) - << system_time.wMonth << std::setw(2) << system_time.wDay << ' ' - << std::setw(2) << system_time.wHour << ':' << std::setw(2) - << system_time.wMinute << ':' << std::setw(2) << system_time.wSecond - << '.' << std::setw(6) << system_time.wMilliseconds * 1000 << ' ' - << static_cast(GetCurrentProcessId()) << ' ' << path << ':' - << lineno << "] "; -#else - std::stringstream ss; - struct timeval tv; - gettimeofday(&tv, NULL); - struct tm tm_time; - gmtime_r(((time_t*)&(tv.tv_sec)), &tm_time); - ss << LeadingLogChar(level) << std::setfill('0') << std::setw(2) - << (tm_time.tm_mon + 1) << std::setw(2) << tm_time.tm_mday << " " - << std::setw(2) << tm_time.tm_hour << ':' << std::setw(2) - << tm_time.tm_min << ':' << std::setw(2) << tm_time.tm_sec << "." - << std::setw(6) << tv.tv_usec << ' ' << static_cast(getpid()) - << ' ' << path << ':' << lineno << "] "; - std::cerr << ss.str() << " " << message << std::endl; -#endif - } else { - // Ensure we do not create a stub instance before it has initialized - std::unique_ptr& stub = Stub::GetOrCreateInstance(); - std::unique_ptr log_msg(new PbLog(filename, lineno, message, level)); - stub->EnqueueLogRequest(log_msg); - } -} - -void -Logger::LogInfo(const std::string& message) -{ - Logger::Log(message, LogLevel::kInfo); -} - -void -Logger::LogWarn(const std::string& message) -{ - Logger::Log(message, LogLevel::kWarning); -} - -void -Logger::LogError(const std::string& message) -{ - Logger::Log(message, LogLevel::kError); -} - -void -Logger::LogVerbose(const std::string& message) -{ - Logger::Log(message, LogLevel::kVerbose); -} - -const std::string -Logger::LeadingLogChar(const LogLevel& level) -{ - switch (level) { - case LogLevel::kWarning: - return "W"; - case LogLevel::kError: - return "E"; - case LogLevel::kInfo: - case LogLevel::kVerbose: - default: - return "I"; - } -} - -void -Logger::SetBackendLoggingActive(bool status) -{ - backend_logging_active_ = status; -} - -bool -Logger::BackendLoggingActive() -{ - return backend_logging_active_; -} - PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) { py::class_> triton_error( @@ -1909,7 +1824,8 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) it.Iter(); return it; }) - .def("__next__", &ResponseIterator::Next); + .def("__next__", &ResponseIterator::Next) + .def("cancel", &ResponseIterator::Cancel); py::class_ logger(module, "Logger"); py::enum_(logger, "LogLevel") @@ -2117,6 +2033,7 @@ main(int argc, char** argv) catch (const PythonBackendException& pb_exception) { LOG_INFO << "Failed to preinitialize Python stub: " << pb_exception.what(); logger.reset(); + stub.reset(); exit(1); } diff --git a/src/pb_stub.h b/src/pb_stub.h index 7d76ec9a..942ecd98 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -1,4 +1,4 @@ -// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -30,8 +30,6 @@ #include #include -#include - #include "infer_request.h" #include "infer_response.h" #include "ipc_message.h" @@ -41,7 +39,6 @@ #include "pb_cancel.h" #include "pb_log.h" #include "pb_response_iterator.h" -#include "pb_utils.h" namespace bi = boost::interprocess; @@ -54,105 +51,6 @@ using cudaStream_t = void*; namespace triton { namespace backend { namespace python { -#define LOG_IF_EXCEPTION(X) \ - do { \ - try { \ - (X); \ - } \ - catch (const PythonBackendException& pb_exception) { \ - LOG_INFO << pb_exception.what(); \ - } \ - } while (false) - -#define LOG_EXCEPTION(E) \ - do { \ - LOG_INFO << E.what(); \ - } while (false) - -/// Macros that use current filename and line number. -#define LOG_INFO LOG_FL(__FILE__, __LINE__, LogLevel::kInfo) -#define LOG_WARN LOG_FL(__FILE__, __LINE__, LogLevel::kWarning) -#define LOG_ERROR LOG_FL(__FILE__, __LINE__, LogLevel::kError) -#define LOG_VERBOSE LOG_FL(__FILE__, __LINE__, LogLevel::kVerbose) - -class Logger { - public: - Logger() { backend_logging_active_ = false; }; - ~Logger() { log_instance_.reset(); }; - /// Python client log function - static void Log(const std::string& message, LogLevel level = LogLevel::kInfo); - - /// Python client log info function - static void LogInfo(const std::string& message); - - /// Python client warning function - static void LogWarn(const std::string& message); - - /// Python client log error function - static void LogError(const std::string& message); - - /// Python client log verbose function - static void LogVerbose(const std::string& message); - - /// Internal log function - void Log( - const std::string& filename, uint32_t lineno, LogLevel level, - const std::string& message); - - /// Log format helper function - const std::string LeadingLogChar(const LogLevel& level); - - /// Set PYBE Logging Status - void SetBackendLoggingActive(bool status); - - /// Get PYBE Logging Status - bool BackendLoggingActive(); - - /// Singleton Getter Function - static std::unique_ptr& GetOrCreateInstance(); - - DISALLOW_COPY_AND_ASSIGN(Logger); - - /// Flush the log. - void Flush() { std::cerr << std::flush; } - - private: - static std::unique_ptr log_instance_; - bool backend_logging_active_; -}; - -class LogMessage { - public: - /// Create a log message, stripping the path down to the filename only - LogMessage(const char* file, int line, LogLevel level) : level_(level) - { - std::string path(file); - const char os_slash = std::filesystem::path::preferred_separator; - size_t pos = path.rfind(os_slash); - if (pos != std::string::npos) { - path = path.substr(pos + 1, std::string::npos); - } - file_ = path; - line_ = static_cast(line); - } - /// Log message to console or send to backend (see Logger::Log for details) - ~LogMessage() - { - Logger::GetOrCreateInstance()->Log(file_, line_, level_, stream_.str()); - } - - std::stringstream& stream() { return stream_; } - - private: - std::stringstream stream_; - std::string file_; - uint32_t line_; - LogLevel level_; -}; - -#define LOG_FL(FN, LN, LVL) LogMessage((char*)(FN), LN, LVL).stream() - - class ModelContext { public: // Scans and establishes path for serving the python model. @@ -321,6 +219,15 @@ class Stub { /// and the response factory for BLS decoupled response. void EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type); + /// Send the id to the python backend for request address retrieval and + /// cancellation + void SendCancelBLSRequest( + std::unique_ptr& utils_msg_payload); + + /// Add infer payload id to queue. This is used for retrieving the request + /// address from the infer_payload + void EnqueueCancelBLSRequest(PbBLSCancel* pb_bls_cancel); + /// Add request cancellation query to queue void EnqueueIsCancelled(PbCancel* pb_cancel); diff --git a/src/pb_stub_log.cc b/src/pb_stub_log.cc new file mode 100644 index 00000000..d0b1ff97 --- /dev/null +++ b/src/pb_stub_log.cc @@ -0,0 +1,170 @@ +// Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "pb_stub_log.h" + +#include + +#include "pb_stub.h" + + +namespace py = pybind11; + +namespace triton { namespace backend { namespace python { + +std::unique_ptr Logger::log_instance_; + +std::unique_ptr& +Logger::GetOrCreateInstance() +{ + if (Logger::log_instance_.get() == nullptr) { + Logger::log_instance_ = std::make_unique(); + } + + return Logger::log_instance_; +} + +// Bound function, called from the python client +void +Logger::Log(const std::string& message, LogLevel level) +{ + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + py::object frame = py::module_::import("inspect").attr("currentframe"); + py::object caller_frame = frame(); + py::object info = py::module_::import("inspect").attr("getframeinfo"); + py::object caller_info = info(caller_frame); + py::object filename_python = caller_info.attr("filename"); + std::string filename = filename_python.cast(); + py::object lineno = caller_info.attr("lineno"); + uint32_t line = lineno.cast(); + + if (!stub->StubToParentServiceActive()) { + Logger::GetOrCreateInstance()->Log(filename, line, level, message); + } else { + std::unique_ptr log_msg(new PbLog(filename, line, message, level)); + stub->EnqueueLogRequest(log_msg); + } +} + +// Called internally (.e.g. LOG_ERROR << "Error"; ) +void +Logger::Log( + const std::string& filename, uint32_t lineno, LogLevel level, + const std::string& message) +{ + // If the log monitor service is not active yet, format + // and pass messages to cerr + if (!BackendLoggingActive()) { + std::string path(filename); + size_t pos = path.rfind(std::filesystem::path::preferred_separator); + if (pos != std::string::npos) { + path = path.substr(pos + 1, std::string::npos); + } +#ifdef _WIN32 + std::stringstream ss; + SYSTEMTIME system_time; + GetSystemTime(&system_time); + ss << LeadingLogChar(level) << std::setfill('0') << std::setw(2) + << system_time.wMonth << std::setw(2) << system_time.wDay << ' ' + << std::setw(2) << system_time.wHour << ':' << std::setw(2) + << system_time.wMinute << ':' << std::setw(2) << system_time.wSecond + << '.' << std::setw(6) << system_time.wMilliseconds * 1000 << ' ' + << static_cast(GetCurrentProcessId()) << ' ' << path << ':' + << lineno << "] "; +#else + std::stringstream ss; + struct timeval tv; + gettimeofday(&tv, NULL); + struct tm tm_time; + gmtime_r(((time_t*)&(tv.tv_sec)), &tm_time); + ss << LeadingLogChar(level) << std::setfill('0') << std::setw(2) + << (tm_time.tm_mon + 1) << std::setw(2) << tm_time.tm_mday << " " + << std::setw(2) << tm_time.tm_hour << ':' << std::setw(2) + << tm_time.tm_min << ':' << std::setw(2) << tm_time.tm_sec << "." + << std::setw(6) << tv.tv_usec << ' ' << static_cast(getpid()) + << ' ' << path << ':' << lineno << "] "; + std::cerr << ss.str() << " " << message << std::endl; +#endif + } else { + // Ensure we do not create a stub instance before it has initialized + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + std::unique_ptr log_msg(new PbLog(filename, lineno, message, level)); + stub->EnqueueLogRequest(log_msg); + } +} + +void +Logger::LogInfo(const std::string& message) +{ + Logger::Log(message, LogLevel::kInfo); +} + +void +Logger::LogWarn(const std::string& message) +{ + Logger::Log(message, LogLevel::kWarning); +} + +void +Logger::LogError(const std::string& message) +{ + Logger::Log(message, LogLevel::kError); +} + +void +Logger::LogVerbose(const std::string& message) +{ + Logger::Log(message, LogLevel::kVerbose); +} + +const std::string +Logger::LeadingLogChar(const LogLevel& level) +{ + switch (level) { + case LogLevel::kWarning: + return "W"; + case LogLevel::kError: + return "E"; + case LogLevel::kInfo: + case LogLevel::kVerbose: + default: + return "I"; + } +} + +void +Logger::SetBackendLoggingActive(bool status) +{ + backend_logging_active_ = status; +} + +bool +Logger::BackendLoggingActive() +{ + return backend_logging_active_; +} + +}}} // namespace triton::backend::python diff --git a/src/pb_stub_log.h b/src/pb_stub_log.h new file mode 100644 index 00000000..df67eba8 --- /dev/null +++ b/src/pb_stub_log.h @@ -0,0 +1,134 @@ +// Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include + +#include "pb_utils.h" + +namespace triton { namespace backend { namespace python { + +#define LOG_IF_EXCEPTION(X) \ + do { \ + try { \ + (X); \ + } \ + catch (const PythonBackendException& pb_exception) { \ + LOG_INFO << pb_exception.what(); \ + } \ + } while (false) + +#define LOG_EXCEPTION(E) \ + do { \ + LOG_INFO << E.what(); \ + } while (false) + +/// Macros that use current filename and line number. +#define LOG_INFO LOG_FL(__FILE__, __LINE__, LogLevel::kInfo) +#define LOG_WARN LOG_FL(__FILE__, __LINE__, LogLevel::kWarning) +#define LOG_ERROR LOG_FL(__FILE__, __LINE__, LogLevel::kError) +#define LOG_VERBOSE LOG_FL(__FILE__, __LINE__, LogLevel::kVerbose) + +class Logger { + public: + Logger() { backend_logging_active_ = false; }; + ~Logger() { log_instance_.reset(); }; + /// Python client log function + static void Log(const std::string& message, LogLevel level = LogLevel::kInfo); + + /// Python client log info function + static void LogInfo(const std::string& message); + + /// Python client warning function + static void LogWarn(const std::string& message); + + /// Python client log error function + static void LogError(const std::string& message); + + /// Python client log verbose function + static void LogVerbose(const std::string& message); + + /// Internal log function + void Log( + const std::string& filename, uint32_t lineno, LogLevel level, + const std::string& message); + + /// Log format helper function + const std::string LeadingLogChar(const LogLevel& level); + + /// Set PYBE Logging Status + void SetBackendLoggingActive(bool status); + + /// Get PYBE Logging Status + bool BackendLoggingActive(); + + /// Singleton Getter Function + static std::unique_ptr& GetOrCreateInstance(); + + DISALLOW_COPY_AND_ASSIGN(Logger); + + /// Flush the log. + void Flush() { std::cerr << std::flush; } + + private: + static std::unique_ptr log_instance_; + bool backend_logging_active_; +}; + +class LogMessage { + public: + /// Create a log message, stripping the path down to the filename only + LogMessage(const char* file, int line, LogLevel level) : level_(level) + { + std::string path(file); + const char os_slash = std::filesystem::path::preferred_separator; + size_t pos = path.rfind(os_slash); + if (pos != std::string::npos) { + path = path.substr(pos + 1, std::string::npos); + } + file_ = path; + line_ = static_cast(line); + } + /// Log message to console or send to backend (see Logger::Log for details) + ~LogMessage() + { + Logger::GetOrCreateInstance()->Log(file_, line_, level_, stream_.str()); + } + + std::stringstream& stream() { return stream_; } + + private: + std::stringstream stream_; + std::string file_; + uint32_t line_; + LogLevel level_; +}; + +#define LOG_FL(FN, LN, LVL) LogMessage((char*)(FN), LN, LVL).stream() + +}}} // namespace triton::backend::python diff --git a/src/pb_tensor.cc b/src/pb_tensor.cc index 9fde62fe..26e77586 100644 --- a/src/pb_tensor.cc +++ b/src/pb_tensor.cc @@ -41,9 +41,70 @@ namespace py = pybind11; typedef SSIZE_T ssize_t; #endif +#include +#include +#include + namespace triton { namespace backend { namespace python { #ifdef TRITON_PB_STUB +py::array +deserialize_bytes_tensor_cpp(const uint8_t* data, size_t data_size) +{ + if (data_size == 0) { + py::module numpy = py::module::import("numpy"); + return numpy.attr("empty")(0, py::dtype("object")); + } + + // First pass: count the number of strings and calculate total size + size_t offset = 0; + size_t num_strings = 0; + size_t total_string_size = 0; + + while (offset < data_size) { + if (offset + 4 > data_size) { + throw PythonBackendException( + "Invalid bytes tensor data: incomplete length field"); + } + + // Read 4-byte length (little-endian) + uint32_t length = *reinterpret_cast(data + offset); + offset += 4; + + if (offset + length > data_size) { + throw PythonBackendException( + "Invalid bytes tensor data: string extends beyond buffer"); + } + + num_strings++; + total_string_size += length; + offset += length; + } + + // Create numpy array of objects using pybind11's numpy module + py::module numpy = py::module::import("numpy"); + py::array result = numpy.attr("empty")(num_strings, py::dtype("object")); + auto result_ptr = static_cast(result.request().ptr); + + // Second pass: extract strings + offset = 0; + size_t string_index = 0; + + while (offset < data_size) { + uint32_t length = *reinterpret_cast(data + offset); + offset += 4; + + // Create Python bytes object using pybind11 + py::bytes bytes_obj(reinterpret_cast(data + offset), length); + Py_INCREF(bytes_obj.ptr()); // Increment reference count + result_ptr[string_index] = bytes_obj.ptr(); + string_index++; + offset += length; + } + + return result; +} + PbTensor::PbTensor(const std::string& name, py::array& numpy_array) : name_(name) { @@ -160,14 +221,9 @@ PbTensor::PbTensor( py::array(triton_to_pybind_dtype(dtype_), dims_, (void*)memory_ptr_); numpy_array_ = numpy_array.attr("view")(triton_to_numpy_type(dtype_)); } else { - py::object numpy_array = py::array( - triton_to_pybind_dtype(TRITONSERVER_TYPE_UINT8), {byte_size}, - (void*)memory_ptr_); - py::module triton_pb_utils = - py::module::import("triton_python_backend_utils"); - numpy_array_ = - triton_pb_utils.attr("deserialize_bytes_tensor")(numpy_array) - .attr("reshape")(dims); + py::object numpy_array = deserialize_bytes_tensor_cpp( + static_cast(memory_ptr_), byte_size_); + numpy_array_ = numpy_array.attr("reshape")(dims_); } } else { numpy_array_ = py::none(); @@ -234,6 +290,7 @@ delete_unused_dltensor(PyObject* dlp) } } + std::shared_ptr PbTensor::FromNumpy(const std::string& name, py::array& numpy_array) { @@ -668,14 +725,9 @@ PbTensor::PbTensor( py::array(triton_to_pybind_dtype(dtype_), dims_, (void*)memory_ptr_); numpy_array_ = numpy_array.attr("view")(triton_to_numpy_type(dtype_)); } else { - py::object numpy_array = py::array( - triton_to_pybind_dtype(TRITONSERVER_TYPE_UINT8), {byte_size_}, - (void*)memory_ptr_); - py::module triton_pb_utils = - py::module::import("triton_python_backend_utils"); - numpy_array_ = - triton_pb_utils.attr("deserialize_bytes_tensor")(numpy_array) - .attr("reshape")(dims_); + py::object numpy_array = deserialize_bytes_tensor_cpp( + static_cast(memory_ptr_), byte_size_); + numpy_array_ = numpy_array.attr("reshape")(dims_); } } else { numpy_array_ = py::none(); diff --git a/src/pb_utils.cc b/src/pb_utils.cc index 809531b8..79b45ec2 100644 --- a/src/pb_utils.cc +++ b/src/pb_utils.cc @@ -26,12 +26,21 @@ #include "pb_utils.h" +#include + +#include + #ifdef _WIN32 #include #include #else #include +#include +#endif + +#ifndef _WIN32 +extern char** environ; #endif @@ -315,6 +324,30 @@ WrapTritonErrorInSharedPtr(TRITONSERVER_Error* error) } #endif // NOT TRITON_PB_STUB +bool +IsValidIdentifier(const std::string& input) +{ + // Check for invalid characters + if (input.empty() || + input.find_first_of(INVALID_CHARS) != std::string::npos) { + return false; + } + + return true; +} + +bool +IsExecutableFile(const std::string& filepath) +{ + struct stat file_stat; + if (stat(filepath.c_str(), &file_stat) != 0) { + return false; + } + + // Check if it's a regular file and executable by owner + return S_ISREG(file_stat.st_mode) && (file_stat.st_mode & S_IXUSR); +} + std::string GenerateUUID() { @@ -323,4 +356,85 @@ GenerateUUID() return boost::uuids::to_string(uuid); } +// Helper function to get environment variables for Python virtual environments +std::map +ParseActivationScript(const std::string& activate_path) +{ + std::map env_vars; + + // Read the current environment as baseline +#ifndef _WIN32 + if (environ != nullptr) { + for (char** env = environ; *env != nullptr; env++) { + std::string env_str(*env); + size_t eq_pos = env_str.find('='); + if (eq_pos != std::string::npos) { + std::string key = env_str.substr(0, eq_pos); + std::string value = env_str.substr(eq_pos + 1); + env_vars[key] = value; + } + } + } +#endif + + // Extract virtual environment root from activation script path + std::string venv_path = activate_path; + size_t bin_activate_pos = venv_path.find("/bin/activate"); + if (bin_activate_pos != std::string::npos) { + venv_path = venv_path.substr(0, bin_activate_pos); + } + + // Set standard virtual environment variables + env_vars["VIRTUAL_ENV"] = venv_path; + env_vars["VIRTUAL_ENV_PROMPT"] = "(" + venv_path + ")"; + + // Update PATH to include the virtual environment's bin directory + std::string new_path = venv_path + "/bin"; + if (env_vars.find("PATH") != env_vars.end()) { + new_path += ":" + env_vars["PATH"]; + } + env_vars["PATH"] = new_path; + + // Update LD_LIBRARY_PATH to include the virtual environment's lib directory + std::string new_lib_path = venv_path + "/lib"; + if (env_vars.find("LD_LIBRARY_PATH") != env_vars.end()) { + new_lib_path += ":" + env_vars["LD_LIBRARY_PATH"]; + } + env_vars["LD_LIBRARY_PATH"] = new_lib_path; + + // Remove PYTHONHOME if it exists + env_vars.erase("PYTHONHOME"); + + return env_vars; +} + +// Helper function to prepare environment array for execve +std::pair, std::vector> +PrepareEnvironment( + const std::map& env_vars, + const std::string& additional_lib_path) +{ + std::vector env_strings; + std::vector env_array; + + for (const auto& [key, value] : env_vars) { + std::string env_string; + if (key == "LD_LIBRARY_PATH" && !additional_lib_path.empty()) { + // Prepend the additional library path + env_string = key + "=" + additional_lib_path + ":" + value; + } else { + env_string = key + "=" + value; + } + env_strings.push_back(env_string); + } + + // Convert to char* array + for (auto& env_str : env_strings) { + env_array.push_back(const_cast(env_str.c_str())); + } + env_array.push_back(nullptr); + + return std::make_pair(std::move(env_strings), std::move(env_array)); +} + }}} // namespace triton::backend::python diff --git a/src/pb_utils.h b/src/pb_utils.h index aacf6b49..fa315210 100644 --- a/src/pb_utils.h +++ b/src/pb_utils.h @@ -1,4 +1,4 @@ -// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -36,10 +36,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include "pb_exception.h" @@ -73,7 +75,7 @@ namespace bi = boost::interprocess; TRITONSERVER_ErrorMessage(pb2_exception.what())); \ } \ } \ - while (false) + } while (false) #define THROW_IF_TRITON_ERROR(X) \ do { \ @@ -187,6 +189,11 @@ struct CleanupMessage : SendMessageBase { void* id; }; +struct CancelBLSRequestMessage : SendMessageBase { + void* infer_payload_id; + bool is_cancelled; +}; + struct IsCancelledMessage : SendMessageBase { intptr_t response_factory_address; intptr_t request_address; @@ -336,6 +343,15 @@ bool IsUsingCUDAPool( // being retrieved from core that are not platform-agnostic. void SanitizePath(std::string& path); +// Invalid characters that are not allowed in user input +constexpr const char* INVALID_CHARS = ";|&$`<>()[]{}\\\"'*?~#!"; + +// Validate that an identifier (model name, region name, etc.) +bool IsValidIdentifier(const std::string& input); + +// Check if a file exists and is executable +bool IsExecutableFile(const std::string& filepath); + #ifndef TRITON_PB_STUB std::shared_ptr WrapTritonErrorInSharedPtr( TRITONSERVER_Error* error); @@ -343,4 +359,12 @@ std::shared_ptr WrapTritonErrorInSharedPtr( std::string GenerateUUID(); +// Environment handling utilities for Python activation scripts +std::map ParseActivationScript( + const std::string& activate_path); + +std::pair, std::vector> PrepareEnvironment( + const std::map& env_vars, + const std::string& additional_lib_path = ""); + }}} // namespace triton::backend::python diff --git a/src/python_be.cc b/src/python_be.cc index bdf7b95f..c152e035 100644 --- a/src/python_be.cc +++ b/src/python_be.cc @@ -1,4 +1,4 @@ -// Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -443,7 +443,9 @@ ModelInstanceState::GetInputTensor( if (input_dtype == TRITONSERVER_TYPE_BYTES) { const char* content = reinterpret_cast(input_tensor->DataPtr()); size_t content_byte_size = input_tensor->ByteSize(); - const size_t request_element_cnt = GetElementCount(input_tensor->Dims()); + int64_t request_element_cnt = 0; + RETURN_IF_ERROR( + GetElementCount(input_tensor->Dims(), &request_element_cnt)); RETURN_IF_ERROR(ValidateStringBuffer( content, content_byte_size, request_element_cnt, input_name, nullptr /* str_list */)); @@ -765,6 +767,10 @@ ModelInstanceState::StubToParentMQMonitor() boost::asio::post(*thread_pool_, std::move(task)); break; } + case PYTHONSTUB_CancelBLSInferRequest: { + ProcessCancelBLSRequest(message); + break; + } default: { LOG_MESSAGE( TRITONSERVER_LOG_ERROR, "Unexpected message type received."); @@ -855,6 +861,40 @@ ModelInstanceState::ProcessCleanupRequest( } } +void +ModelInstanceState::ProcessCancelBLSRequest( + const std::unique_ptr& message) +{ + AllocatedSharedMemory message_shm = + Stub()->ShmPool()->Load(message->Args()); + CancelBLSRequestMessage* message_payload = + reinterpret_cast(message_shm.data_.get()); + + { + bi::scoped_lock lk{message_payload->mu}; + + intptr_t id = reinterpret_cast(message_payload->infer_payload_id); + try { + { + std::lock_guard lock(infer_payload_mu_); + if (infer_payload_.find(id) != infer_payload_.end()) { + infer_payload_[id]->SafeCancelRequest(); + } + } + message_payload->is_cancelled = true; + } + catch (const PythonBackendException& pb_exception) { + LOG_MESSAGE(TRITONSERVER_LOG_ERROR, pb_exception.what()); + } + + message_payload->waiting_on_stub = true; + message_payload->cv.notify_all(); + while (message_payload->waiting_on_stub) { + message_payload->cv.wait(lk); + } + } +} + void ModelInstanceState::ProcessIsRequestCancelled( const std::unique_ptr& message) diff --git a/src/python_be.h b/src/python_be.h index c98e1284..6082c50b 100644 --- a/src/python_be.h +++ b/src/python_be.h @@ -1,4 +1,4 @@ -// Copyright 2022-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -403,6 +403,9 @@ class ModelInstanceState : public BackendModelInstance { // Process the decoupled cleanup request for InferPayload and ResponseFactory void ProcessCleanupRequest(const std::unique_ptr& message); + // Process cancelling a BLS request + void ProcessCancelBLSRequest(const std::unique_ptr& message); + // Process request cancellation query void ProcessIsRequestCancelled(const std::unique_ptr& message); diff --git a/src/request_executor.cc b/src/request_executor.cc index 3c51e626..716d3c56 100644 --- a/src/request_executor.cc +++ b/src/request_executor.cc @@ -69,9 +69,15 @@ InferRequestComplete( TRITONSERVER_InferenceRequest* request, const uint32_t flags, void* userp) { if (request != nullptr) { + RequestCompletionUserp* completion_userp = + reinterpret_cast(userp); + completion_userp->infer_payload->SetRequestAddress(0L); + LOG_IF_ERROR( TRITONSERVER_InferenceRequestDelete(request), "Failed to delete inference request."); + + delete completion_userp; } } @@ -85,10 +91,16 @@ InferResponseComplete( std::vector> output_tensors; std::shared_ptr pb_error; std::string parameters_string; + TRITONSERVER_Error_Code error_code = TRITONSERVER_ERROR_INTERNAL; if (response != nullptr) { try { - THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceResponseError(response)); + TRITONSERVER_Error* server_error = + TRITONSERVER_InferenceResponseError(response); + if (server_error != nullptr) { + error_code = TRITONSERVER_ErrorCode(server_error); + } + THROW_IF_TRITON_ERROR(server_error); uint32_t output_count; THROW_IF_TRITON_ERROR( @@ -182,7 +194,7 @@ InferResponseComplete( response = nullptr; } - pb_error = std::make_shared(pb_exception.what()); + pb_error = std::make_shared(pb_exception.what(), error_code); output_tensors.clear(); } @@ -313,6 +325,18 @@ ResponseAlloc( return nullptr; // Success } +void +InferRequestCancel(intptr_t request_address) +{ + if (request_address == 0L) { + return; + } + + TRITONSERVER_InferenceRequest* irequest = + reinterpret_cast(request_address); + THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestCancel(irequest)); +} + TRITONSERVER_Error* OutputBufferQuery( TRITONSERVER_ResponseAllocator* allocator, void* userp, @@ -355,6 +379,7 @@ RequestExecutor::Infer( bool is_ready = false; const char* model_name = infer_request->ModelName().c_str(); TRITONSERVER_InferenceRequest* irequest = nullptr; + RequestCompletionUserp* completion_userp = nullptr; try { int64_t model_version = infer_request->ModelVersion(); @@ -406,8 +431,10 @@ RequestExecutor::Infer( THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetTimeoutMicroseconds( irequest, infer_request->Timeout())); + completion_userp = new RequestCompletionUserp(infer_payload); THROW_IF_TRITON_ERROR(TRITONSERVER_InferenceRequestSetReleaseCallback( - irequest, InferRequestComplete, nullptr /* request_release_userp */)); + irequest, InferRequestComplete, + reinterpret_cast(completion_userp))); TRITONSERVER_InferenceTrace* trace = nullptr; if (infer_request->GetTrace().TritonTrace() != nullptr) { @@ -476,11 +503,21 @@ RequestExecutor::Infer( reinterpret_cast(infer_payload->ResponseAllocUserp().get()), InferResponseComplete, reinterpret_cast(infer_payload.get()))); + // Store the inference request address submitted to the Triton server for + // retrieval + infer_payload->SetRequestAddress(reinterpret_cast(irequest)); + infer_payload->SetRequestCancellationFunc(InferRequestCancel); + THROW_IF_TRITON_ERROR( TRITONSERVER_ServerInferAsync(server_, irequest, trace)); } } catch (const PythonBackendException& pb_exception) { + infer_payload->SetRequestAddress(0L); + if (completion_userp != nullptr) { + delete completion_userp; + } + LOG_IF_ERROR( TRITONSERVER_InferenceRequestDelete(irequest), "Failed to delete inference request."); diff --git a/src/request_executor.h b/src/request_executor.h index 1c5eb1fa..07562d6a 100644 --- a/src/request_executor.h +++ b/src/request_executor.h @@ -1,4 +1,4 @@ -// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -37,6 +37,12 @@ namespace triton { namespace backend { namespace python { TRITONSERVER_Error* CreateTritonErrorFromException( const PythonBackendException& pb_exception); +struct RequestCompletionUserp { + std::shared_ptr infer_payload; + RequestCompletionUserp(std::shared_ptr& infer_payload) + : infer_payload(infer_payload){}; +}; + class RequestExecutor { TRITONSERVER_ResponseAllocator* response_allocator_ = nullptr; TRITONSERVER_Server* server_; diff --git a/src/shm_manager.cc b/src/shm_manager.cc index 1c7c4d65..134cee6f 100644 --- a/src/shm_manager.cc +++ b/src/shm_manager.cc @@ -1,4 +1,4 @@ -// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -188,8 +188,8 @@ SharedMemoryManager::GrowIfNeeded(uint64_t byte_size) } catch (bi::interprocess_exception& ex) { std::string error_message = - ("Failed to increase the shared memory pool size for key '" + - shm_region_name_ + "' to " + std::to_string(*total_size_) + + ("Failed to increase the shared memory pool size to " + + std::to_string(*total_size_) + " bytes. If you are running Triton inside docker, use '--shm-size' " "flag to control the shared memory region size. Error: " + ex.what()); diff --git a/src/shm_manager.h b/src/shm_manager.h index 25e04570..8517faf3 100644 --- a/src/shm_manager.h +++ b/src/shm_manager.h @@ -1,4 +1,4 @@ -// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -43,6 +43,9 @@ namespace triton { namespace backend { namespace python { namespace bi = boost::interprocess; +static constexpr bi::managed_external_buffer::handle_t kShmControlRegionHandle{ + 1}; + class CUDAMemoryPoolManager { public: CUDAMemoryPoolManager() : triton_memory_manager_(nullptr) {} @@ -166,6 +169,10 @@ class SharedMemoryManager { void Deallocate(bi::managed_external_buffer::handle_t handle) { + // Do not delete the control region, to avoid undefined behavior. + if (handle == kShmControlRegionHandle) { + return; + } bi::scoped_lock guard{*shm_mutex_}; GrowIfNeeded(0); void* ptr = managed_buffer_->get_address_from_handle(handle); @@ -174,6 +181,10 @@ class SharedMemoryManager { void DeallocateUnsafe(bi::managed_external_buffer::handle_t handle) { + // Do not delete the control region, to avoid undefined behavior. + if (handle == kShmControlRegionHandle) { + return; + } void* ptr = managed_buffer_->get_address_from_handle(handle); managed_buffer_->deallocate(ptr); } @@ -188,6 +199,9 @@ class SharedMemoryManager { return cuda_memory_pool_manager_; } + uint64_t GetCurrentCapacity() { return current_capacity_; } + void* GetBaseAddress() { return managed_buffer_->get_address(); } + ~SharedMemoryManager() noexcept(false); private: diff --git a/src/shm_monitor/CMakeLists.txt b/src/shm_monitor/CMakeLists.txt index 0f7d4b86..2ae8bd45 100644 --- a/src/shm_monitor/CMakeLists.txt +++ b/src/shm_monitor/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -24,7 +24,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -cmake_minimum_required (VERSION 3.18) +cmake_minimum_required (VERSION 3.31.8) pybind11_add_module( triton-shm-monitor diff --git a/src/stub_launcher.cc b/src/stub_launcher.cc index 828228e6..32f5d1bd 100644 --- a/src/stub_launcher.cc +++ b/src/stub_launcher.cc @@ -28,12 +28,15 @@ #include +#include "pb_utils.h" #include "python_be.h" #ifdef _WIN32 #include // getpid() #endif +extern char** environ; + namespace triton { namespace backend { namespace python { StubLauncher::StubLauncher(const std::string stub_process_kind) @@ -280,7 +283,9 @@ StubLauncher::Launch() // Push a dummy message to the message queue so that the stub // process is notified that it can release the object stored in // shared memory. - stub_message_queue_->Push(DUMMY_MESSAGE); + if (stub_message_queue_) { + stub_message_queue_->Push(DUMMY_MESSAGE); + } // If the model is not initialized, wait for the stub process to exit. if (!is_initialized_) { @@ -299,11 +304,23 @@ StubLauncher::Launch() // // The reason it is broken into two steps is that creation of the health // monitoring thread may take longer which can make the server process think - // that the stub process is unhealthy and return early. Waiting until the - // health thread is spawn would make sure would prevent this issue. - parent_message_queue_->Pop(); + // that the stub process is unhealthy and return early. Waiting with a longer + // timeout prevents this issue. + const uint64_t initialization_timeout_ms = 10000; // 10 sec + LOG_MESSAGE( + TRITONSERVER_LOG_VERBOSE, + "Waiting for the stub health monitoring thread to start"); + + bi::managed_external_buffer::handle_t message; + auto err = ReceiveMessageFromStub(message, initialization_timeout_ms); + if (err != nullptr) { + KillStubProcess(); + } if (stub_process_kind_ == "AUTOCOMPLETE_STUB") { + if (err != nullptr) { + throw BackendModelException(err); + } try { AutocompleteStubProcess(); } @@ -314,6 +331,7 @@ StubLauncher::Launch() TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, ex.what())); } } else if (stub_process_kind_ == "MODEL_INSTANCE_STUB") { + RETURN_IF_ERROR(err); RETURN_IF_ERROR(ModelInstanceStubProcess()); } else { return TRITONSERVER_ErrorNew( @@ -337,10 +355,17 @@ StubLauncher::Launch() stub_name = model_instance_name_; } - const char* stub_args[4]; - stub_args[0] = "bash"; - stub_args[1] = "-c"; - stub_args[3] = nullptr; // Last argument must be nullptr + if (!IsValidIdentifier(stub_name)) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + "Invalid stub name: contains invalid characters"); + } + + if (!IsValidIdentifier(shm_region_name_)) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + "Invalid shared memory region name: contains invalid characters"); + } // Default Python backend stub std::string python_backend_stub = python_lib_ + "/triton_python_backend_stub"; @@ -353,48 +378,7 @@ StubLauncher::Launch() python_backend_stub = model_python_backend_stub; } - std::string bash_argument; - - // This shared memory variable indicates whether the stub process should - // revert the LD_LIBRARY_PATH changes to avoid shared library issues in - // executables and libraries. - ipc_control_->uses_env = false; - if (python_execution_env_ != "") { - std::stringstream ss; - - // Need to properly set the LD_LIBRARY_PATH so that Python environments - // using different python versions load properly. - ss << "source " << path_to_activate_ - << " && exec env LD_LIBRARY_PATH=" << path_to_libpython_ - << ":$LD_LIBRARY_PATH " << python_backend_stub << " " << model_path_ - << " " << shm_region_name_ << " " << shm_default_byte_size_ << " " - << shm_growth_byte_size_ << " " << parent_pid_ << " " << python_lib_ - << " " << ipc_control_handle_ << " " << stub_name << " " - << runtime_modeldir_; - ipc_control_->uses_env = true; - bash_argument = ss.str(); - } else { - std::stringstream ss; - ss << " exec " << python_backend_stub << " " << model_path_ << " " - << shm_region_name_ << " " << shm_default_byte_size_ << " " - << shm_growth_byte_size_ << " " << parent_pid_ << " " << python_lib_ - << " " << ipc_control_handle_ << " " << stub_name << " " - << runtime_modeldir_; - bash_argument = ss.str(); - } - LOG_MESSAGE( - TRITONSERVER_LOG_VERBOSE, - (std::string("Starting Python backend stub: ") + bash_argument).c_str()); - - stub_args[2] = bash_argument.c_str(); - - int stub_status_code = - system((python_backend_stub + "> /dev/null 2>&1").c_str()); - - // If running stub process without any arguments returns any status code, - // other than 1, it can indicate a permission issue as a result of - // downloading the stub process from a cloud object storage service. - if (WEXITSTATUS(stub_status_code) != 1) { + if (!IsExecutableFile(python_backend_stub)) { // Give the execute permission for the triton_python_backend_stub to the // owner. int error = chmod(python_backend_stub.c_str(), S_IXUSR); @@ -409,79 +393,196 @@ StubLauncher::Launch() } } - pid_t pid = fork(); - if (pid < 0) { - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, - "Failed to fork the stub process for auto-complete."); - } - if (pid == 0) { - // Replace this child process with the new stub process. - execvp("bash", (char**)stub_args); - // execvp() never return if succeeded. Otherwise, an error has occurred. - std::stringstream ss; - ss << "Failed to run python backend stub. Errno = " << errno << '\n' - << "Python backend stub path: " << python_backend_stub << '\n' - << "Shared Memory Region Name: " << shm_region_name_ << '\n' - << "Shared Memory Default Byte Size: " << shm_default_byte_size_ << '\n' - << "Shared Memory Growth Byte Size: " << shm_growth_byte_size_ << '\n'; - // Print the error message directly because the underlying mutexes in - // LOG_MESSAGE() could be forked when it is locked by other thread(s). - std::cerr << '\n' << ss.str() << '\n'; - // Terminate the child execution immediately to avoid any issues. - _Exit(1); - } else { - ScopedDefer _([&] { - // Push a dummy message to the message queue so that the stub - // process is notified that it can release the object stored in - // shared memory. - stub_message_queue_->Push(DUMMY_MESSAGE); + // Prepare arguments for execution + std::vector arg_strings; + std::vector exec_args; - // If the model is not initialized, wait for the stub process to exit. - if (!is_initialized_) { - stub_message_queue_.reset(); - parent_message_queue_.reset(); - memory_manager_.reset(); - WaitForStubProcess(); - } - }); - - stub_pid_ = pid; - - // The stub process would send two messages to the parent process during the - // initialization. - // 1. When the stub process's health monitoring thread has started. - // 2. When the initialization is fully completed and the Python model is - // loaded. - // - // The reason it is broken into two steps is that creation of the health - // monitoring thread may take longer which can make the server process think - // that the stub process is unhealthy and return early. Waiting until the - // health thread is spawn would prevent this issue. - parent_message_queue_->Pop(); - - if (stub_process_kind_ == "AUTOCOMPLETE_STUB") { - try { - AutocompleteStubProcess(); - } - catch (const PythonBackendException& ex) { - // Need to kill the stub process first - KillStubProcess(); - throw BackendModelException( - TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, ex.what())); - } - } else if (stub_process_kind_ == "MODEL_INSTANCE_STUB") { - RETURN_IF_ERROR(ModelInstanceStubProcess()); + // This shared memory variable indicates whether the stub process should + // revert the LD_LIBRARY_PATH changes to avoid shared library issues in + // executables and libraries. + ipc_control_->uses_env = false; + + if (python_execution_env_ != "") { + ipc_control_->uses_env = true; + + // Parse environment variables from activation script + std::map env_vars = + ParseActivationScript(path_to_activate_); + + // Prepare environment with additional library path + auto [env_strings, custom_env] = + PrepareEnvironment(env_vars, path_to_libpython_); + + // Set up arguments for direct execution + arg_strings.push_back(python_backend_stub); + arg_strings.push_back(model_path_); + arg_strings.push_back(shm_region_name_); + arg_strings.push_back(std::to_string(shm_default_byte_size_)); + arg_strings.push_back(std::to_string(shm_growth_byte_size_)); + arg_strings.push_back(std::to_string(parent_pid_)); + arg_strings.push_back(python_lib_); + arg_strings.push_back(std::to_string(ipc_control_handle_)); + arg_strings.push_back(stub_name); + arg_strings.push_back(runtime_modeldir_); + + // Convert strings to char* array for exec + for (const auto& arg : arg_strings) { + exec_args.push_back(arg.c_str()); + } + exec_args.push_back(nullptr); // exec requires null termination + + // Log the command being executed + std::ostringstream log_cmd; + for (size_t i = 0; i < arg_strings.size(); ++i) { + if (i > 0) + log_cmd << " "; + log_cmd << "'" << arg_strings[i] << "'"; + } + LOG_MESSAGE( + TRITONSERVER_LOG_VERBOSE, + (std::string("Starting Python backend stub with custom environment: ") + + log_cmd.str()) + .c_str()); + + pid_t pid = fork(); + if (pid < 0) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, + "Failed to fork the stub process for auto-complete."); + } + if (pid == 0) { + // Replace this child process with the new stub process using custom + // environment + execve( + python_backend_stub.c_str(), const_cast(exec_args.data()), + custom_env.data()); + // execve() never returns if succeeded. Otherwise, an error has occurred. + std::stringstream ss; + ss << "Failed to run python backend stub with custom environment. Errno " + "= " + << errno << '\n' + << "Python backend stub path: " << python_backend_stub << '\n' + << "Activation script: " << path_to_activate_ << '\n' + << "Library path: " << path_to_libpython_ << '\n'; + std::cerr << '\n' << ss.str() << '\n'; + _Exit(1); } else { + stub_pid_ = pid; + } + + } else { + arg_strings.push_back(python_backend_stub); + arg_strings.push_back(model_path_); + arg_strings.push_back(shm_region_name_); + arg_strings.push_back(std::to_string(shm_default_byte_size_)); + arg_strings.push_back(std::to_string(shm_growth_byte_size_)); + arg_strings.push_back(std::to_string(parent_pid_)); + arg_strings.push_back(python_lib_); + arg_strings.push_back(std::to_string(ipc_control_handle_)); + arg_strings.push_back(stub_name); + arg_strings.push_back(runtime_modeldir_); + + // Convert strings to char* array for exec + for (const auto& arg : arg_strings) { + exec_args.push_back(arg.c_str()); + } + exec_args.push_back(nullptr); // exec requires null termination + + // Log the command being executed + std::ostringstream log_cmd; + for (size_t i = 0; i < arg_strings.size(); ++i) { + if (i > 0) + log_cmd << " "; + log_cmd << "'" << arg_strings[i] << "'"; + } + LOG_MESSAGE( + TRITONSERVER_LOG_VERBOSE, + (std::string("Starting Python backend stub: ") + log_cmd.str()) + .c_str()); + + pid_t pid = fork(); + if (pid < 0) { return TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_INTERNAL, - (std::string("Unknown stub_process_kind: ") + stub_process_kind_) - .c_str()); + "Failed to fork the stub process for auto-complete."); + } + if (pid == 0) { + // Replace this child process with the new stub process. + execv(python_backend_stub.c_str(), const_cast(exec_args.data())); + // execv() never returns if succeeded. Otherwise, an error has occurred. + std::stringstream ss; + ss << "Failed to run python backend stub. Errno = " << errno << '\n' + << "Python backend stub path: " << python_backend_stub << '\n'; + std::cerr << '\n' << ss.str() << '\n'; + _Exit(1); + } else { + stub_pid_ = pid; + } + } + + ScopedDefer _([&] { + // Push a dummy message to the message queue so that the stub + // process is notified that it can release the object stored in + // shared memory. + if (stub_message_queue_) { + stub_message_queue_->Push(DUMMY_MESSAGE); + } + + // If the model is not initialized, wait for the stub process to exit. + if (!is_initialized_) { + stub_message_queue_.reset(); + parent_message_queue_.reset(); + memory_manager_.reset(); + WaitForStubProcess(); } + }); + + // The stub process would send two messages to the parent process during the + // initialization. + // 1. When the stub process's health monitoring thread has started. + // 2. When the initialization is fully completed and the Python model is + // loaded. + // + // The reason it is broken into two steps is that creation of the health + // monitoring thread may take longer which can make the server process think + // that the stub process is unhealthy and return early. Waiting with a + // longer timeout prevents this issue. + const uint64_t initialization_timeout_ms = 10000; // 10 sec + LOG_MESSAGE( + TRITONSERVER_LOG_VERBOSE, + "Waiting for the stub health monitoring thread to start"); + + bi::managed_external_buffer::handle_t message; + auto err = ReceiveMessageFromStub(message, initialization_timeout_ms); + if (err != nullptr) { + KillStubProcess(); + } - is_initialized_ = true; + if (stub_process_kind_ == "AUTOCOMPLETE_STUB") { + if (err != nullptr) { + throw BackendModelException(err); + } + try { + AutocompleteStubProcess(); + } + catch (const PythonBackendException& ex) { + // Need to kill the stub process first + KillStubProcess(); + throw BackendModelException( + TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, ex.what())); + } + } else if (stub_process_kind_ == "MODEL_INSTANCE_STUB") { + RETURN_IF_ERROR(err); + RETURN_IF_ERROR(ModelInstanceStubProcess()); + } else { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, + (std::string("Unknown stub_process_kind: ") + stub_process_kind_) + .c_str()); } + is_initialized_ = true; + return nullptr; } @@ -592,8 +693,13 @@ StubLauncher::ModelInstanceStubProcess() initialize_message->Args() = initialize_map_handle; stub_message_queue_->Push(initialize_message->ShmHandle()); + const uint64_t initialization_timeout_ms = 5000; // 5 sec + LOG_MESSAGE( + TRITONSERVER_LOG_VERBOSE, + "Waiting for the stub process initialization response"); + bi::managed_external_buffer::handle_t message; - RETURN_IF_ERROR(ReceiveMessageFromStub(message)); + RETURN_IF_ERROR(ReceiveMessageFromStub(message, initialization_timeout_ms)); std::unique_ptr initialize_response_message = IPCMessage::LoadFromSharedMemory(shm_pool_, message); @@ -726,11 +832,11 @@ StubLauncher::KillStubProcess() TRITONSERVER_Error* StubLauncher::ReceiveMessageFromStub( - bi::managed_external_buffer::handle_t& message) + bi::managed_external_buffer::handle_t& message, + uint64_t timeout_miliseconds) { bool success = false; while (!success) { - uint64_t timeout_miliseconds = 1000; { boost::posix_time::ptime timeout = boost::get_system_time() + diff --git a/src/stub_launcher.h b/src/stub_launcher.h index 6c8dd910..58cdcc61 100644 --- a/src/stub_launcher.h +++ b/src/stub_launcher.h @@ -147,7 +147,8 @@ class StubLauncher { // Get a message from the stub process TRITONSERVER_Error* ReceiveMessageFromStub( - bi::managed_external_buffer::handle_t& message); + bi::managed_external_buffer::handle_t& message, + uint64_t timeout_miliseconds = 1000); // Wait for stub process void WaitForStubProcess();