diff --git a/.github/workflows/pre-commit.yml b/.github/workflows/pre-commit.yml index ab4bd951..4fa18732 100644 --- a/.github/workflows/pre-commit.yml +++ b/.github/workflows/pre-commit.yml @@ -1,4 +1,4 @@ -# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -31,8 +31,8 @@ on: jobs: pre-commit: - runs-on: ubuntu-22.04 + runs-on: ubuntu-latest steps: - - uses: actions/checkout@v3 - - uses: actions/setup-python@v3 - - uses: pre-commit/action@v3.0.0 + - uses: actions/checkout@v5.0.0 + - uses: actions/setup-python@v6.0.0 + - uses: pre-commit/action@v3.0.1 diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 298baab6..3c76a6ed 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,4 @@ -# Copyright 2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -25,7 +25,7 @@ # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. repos: -- repo: https://github.com/timothycrosley/isort +- repo: https://github.com/PyCQA/isort rev: 5.12.0 hooks: - id: isort @@ -36,7 +36,7 @@ repos: - id: black types_or: [python, cython] - repo: https://github.com/PyCQA/flake8 - rev: 5.0.4 + rev: 7.3.0 hooks: - id: flake8 args: [--max-line-length=88, --select=C,E,F,W,B,B950, --extend-ignore = E203,E501] @@ -57,7 +57,7 @@ repos: # More details about these pre-commit hooks here: # https://pre-commit.com/hooks.html - repo: https://github.com/pre-commit/pre-commit-hooks - rev: v4.4.0 + rev: v6.0.0 hooks: - id: check-case-conflict - id: check-executables-have-shebangs diff --git a/CMakeLists.txt b/CMakeLists.txt index 0aaa95af..f5c5b293 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright 2020-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright 2020-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -24,7 +24,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -cmake_minimum_required(VERSION 3.17) +cmake_minimum_required(VERSION 3.31.8) project(tritonpythonbackend LANGUAGES C CXX) @@ -122,10 +122,12 @@ FetchContent_MakeAvailable(dlpack) # # Boost # +set(TRITON_BOOST_URL "https://archives.boost.io/release/1.80.0/source/boost_1_80_0.tar.gz" CACHE STRING "Boost source code URL") + ExternalProject_Add( boostorg - URL https://archives.boost.io/release/1.79.0/source/boost_1_79_0.tar.gz - URL_HASH SHA256=273f1be93238a068aba4f9735a4a2b003019af067b9c183ed227780b8f36062c + URL ${TRITON_BOOST_URL} + URL_HASH SHA256=4b2136f98bdd1f5857f1c3dea9ac2018effe65286cf251534b6ae20cc45e1847 PREFIX "boost-src" CONFIGURE_COMMAND ${CMAKE_COMMAND} -E copy_directory /boost/ ${CMAKE_BINARY_DIR}/boost @@ -237,6 +239,8 @@ set( src/response_sender.h src/pb_stub.h src/pb_stub.cc + src/pb_stub_log.h + src/pb_stub_log.cc src/pb_response_iterator.h src/pb_response_iterator.cc src/pb_cancel.cc diff --git a/src/message_queue.h b/src/message_queue.h index e9c47afd..06661c66 100644 --- a/src/message_queue.h +++ b/src/message_queue.h @@ -1,4 +1,4 @@ -// Copyright 2021-2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -32,14 +32,19 @@ #include #include +#include "pb_utils.h" #include "shm_manager.h" +#ifdef TRITON_PB_STUB +#include "pb_stub_log.h" +#endif namespace triton { namespace backend { namespace python { namespace bi = boost::interprocess; /// Struct holding the representation of a message queue inside the shared /// memory. -/// \param size Total size of the message queue. +/// \param size Total size of the message queue. Considered invalid after +/// MessageQueue::LoadFromSharedMemory. Check DLIS-8378 for additional details. /// \param mutex Handle of the mutex variable protecting index. /// \param index Used element index. /// \param sem_empty Semaphore object counting the number of empty buffer slots. @@ -110,7 +115,22 @@ class MessageQueue { { bi::scoped_lock lock{*MutexMutable()}; - Buffer()[Head()] = message; + int head_idx = Head(); + // Additional check to avoid out of bounds read/write. Check DLIS-8378 for + // additional details. + if (head_idx < 0 || static_cast(head_idx) >= Size()) { + std::string error_msg = + "internal error: message queue head index out of bounds. Expects " + "positive integer less than the size of message queue " + + std::to_string(Size()) + " but got " + std::to_string(head_idx); +#ifdef TRITON_PB_STUB + LOG_ERROR << error_msg; +#else + LOG_MESSAGE(TRITONSERVER_LOG_ERROR, error_msg.c_str()); +#endif + return; + } + Buffer()[head_idx] = message; HeadIncrement(); } SemFullMutable()->post(); @@ -145,7 +165,22 @@ class MessageQueue { } success = true; - Buffer()[Head()] = message; + int head_idx = Head(); + // Additional check to avoid out of bounds read/write. Check DLIS-8378 for + // additional details. + if (head_idx < 0 || static_cast(head_idx) >= Size()) { + std::string error_msg = + "internal error: message queue head index out of bounds. Expects " + "positive integer less than the size of message queue " + + std::to_string(Size()) + " but got " + std::to_string(head_idx); +#ifdef TRITON_PB_STUB + LOG_ERROR << error_msg; +#else + LOG_MESSAGE(TRITONSERVER_LOG_ERROR, error_msg.c_str()); +#endif + return; + } + Buffer()[head_idx] = message; HeadIncrement(); } SemFullMutable()->post(); @@ -244,7 +279,7 @@ class MessageQueue { } private: - std::size_t& Size() { return mq_shm_ptr_->size; } + uint32_t Size() { return size_; } const bi::interprocess_mutex& Mutex() { return mq_shm_ptr_->mutex; } bi::interprocess_mutex* MutexMutable() { return &(mq_shm_ptr_->mutex); } int& Head() { return mq_shm_ptr_->head; } @@ -273,6 +308,7 @@ class MessageQueue { MessageQueueShm* mq_shm_ptr_; T* mq_buffer_shm_ptr_; bi::managed_external_buffer::handle_t mq_handle_; + uint32_t size_; /// Create/load a Message queue. /// \param mq_shm Message queue representation in shared memory. @@ -284,6 +320,7 @@ class MessageQueue { mq_buffer_shm_ptr_ = mq_buffer_shm_.data_.get(); mq_shm_ptr_ = mq_shm_.data_.get(); mq_handle_ = mq_shm_.handle_; + size_ = mq_shm_ptr_->size; } }; }}} // namespace triton::backend::python diff --git a/src/pb_bls_cancel.cc b/src/pb_bls_cancel.cc index 0df4492b..4341c037 100644 --- a/src/pb_bls_cancel.cc +++ b/src/pb_bls_cancel.cc @@ -27,6 +27,7 @@ #include "pb_bls_cancel.h" #include "pb_stub.h" +#include "pb_stub_log.h" namespace triton { namespace backend { namespace python { diff --git a/src/pb_cancel.cc b/src/pb_cancel.cc index 0774261d..da9daf98 100644 --- a/src/pb_cancel.cc +++ b/src/pb_cancel.cc @@ -1,4 +1,4 @@ -// Copyright 2023-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2023-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -27,6 +27,7 @@ #include "pb_cancel.h" #include "pb_stub.h" +#include "pb_stub_log.h" namespace triton { namespace backend { namespace python { diff --git a/src/pb_memory.cc b/src/pb_memory.cc index fa32bb1c..5b678f1a 100644 --- a/src/pb_memory.cc +++ b/src/pb_memory.cc @@ -1,4 +1,4 @@ -// Copyright 2022-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -26,6 +26,8 @@ #include "pb_memory.h" +#include + namespace triton { namespace backend { namespace python { std::unique_ptr @@ -225,7 +227,6 @@ PbMemory::LoadFromSharedMemory( { MemoryShm* memory_shm_ptr = reinterpret_cast(data_shm); char* memory_data_shm = data_shm + sizeof(MemoryShm); - char* data_ptr = nullptr; bool opened_cuda_ipc_handle = false; if (memory_shm_ptr->memory_type == TRITONSERVER_MEMORY_GPU && @@ -260,6 +261,19 @@ PbMemory::LoadFromSharedMemory( } else { data_ptr = memory_data_shm; } + + // This check only validates CPU shared memory access. + if (memory_shm_ptr->memory_type != TRITONSERVER_MEMORY_GPU && + (data_ptr + memory_shm_ptr->byte_size > + (char*)shm_pool->GetBaseAddress() + shm_pool->GetCurrentCapacity())) { + std::ostringstream oss; + oss << "0x" << std::hex + << (reinterpret_cast(data_ptr) + memory_shm_ptr->byte_size); + throw PythonBackendException( + std::string("Attempted to access out of bounds memory address ") + + oss.str()); + } + return std::unique_ptr(new PbMemory( data_shm, data_ptr, handle, opened_cuda_ipc_handle /* opened_cuda_ipc_handle */)); @@ -309,6 +323,19 @@ PbMemory::LoadFromSharedMemory( } else { data_ptr = memory_data_shm; } + + // This check only validates CPU shared memory access. + if (memory_shm_ptr->memory_type != TRITONSERVER_MEMORY_GPU && + (data_ptr + memory_shm_ptr->byte_size > + (char*)shm_pool->GetBaseAddress() + shm_pool->GetCurrentCapacity())) { + std::ostringstream oss; + oss << "0x" << std::hex + << (reinterpret_cast(data_ptr) + memory_shm_ptr->byte_size); + throw PythonBackendException( + std::string("Attempted to access out of bounds memory address ") + + oss.str()); + } + return std::unique_ptr(new PbMemory( memory_shm, data_ptr, opened_cuda_ipc_handle /* opened_cuda_ipc_handle */)); diff --git a/src/pb_stub.cc b/src/pb_stub.cc index 0a2279ec..56048d78 100644 --- a/src/pb_stub.cc +++ b/src/pb_stub.cc @@ -49,6 +49,7 @@ #include "pb_preferred_memory.h" #include "pb_response_iterator.h" #include "pb_string.h" +#include "pb_stub_log.h" #include "pb_utils.h" #include "response_sender.h" #include "scoped_defer.h" @@ -1039,11 +1040,13 @@ Stub::~Stub() { #ifdef TRITON_ENABLE_GPU try { - CUDAHandler& cuda_api = CUDAHandler::getInstance(); - for (auto& m : - shm_pool_->GetCUDAMemoryPoolManager()->CUDAPoolAddressMap()) { - if (m.second != nullptr) { - cuda_api.CloseCudaHandle(m.first, m.second); + if (shm_pool_ != nullptr) { + CUDAHandler& cuda_api = CUDAHandler::getInstance(); + for (auto& m : + shm_pool_->GetCUDAMemoryPoolManager()->CUDAPoolAddressMap()) { + if (m.second != nullptr) { + cuda_api.CloseCudaHandle(m.first, m.second); + } } } } @@ -1052,13 +1055,14 @@ Stub::~Stub() } #endif - { + // Ensure the interpreter is active before trying to clean up. + if (Py_IsInitialized()) { py::gil_scoped_acquire acquire; py::object async_event_loop_local(std::move(async_event_loop_)); py::object background_futures_local(std::move(background_futures_)); py::object model_instance_local(std::move(model_instance_)); } - stub_instance_.reset(); + stub_message_queue_.reset(); parent_message_queue_.reset(); stub_to_parent_mq_.reset(); @@ -1569,138 +1573,6 @@ Stub::ProcessBLSResponseDecoupled(std::unique_ptr& ipc_message) } } -std::unique_ptr Logger::log_instance_; - -std::unique_ptr& -Logger::GetOrCreateInstance() -{ - if (Logger::log_instance_.get() == nullptr) { - Logger::log_instance_ = std::make_unique(); - } - - return Logger::log_instance_; -} - -// Bound function, called from the python client -void -Logger::Log(const std::string& message, LogLevel level) -{ - std::unique_ptr& stub = Stub::GetOrCreateInstance(); - py::object frame = py::module_::import("inspect").attr("currentframe"); - py::object caller_frame = frame(); - py::object info = py::module_::import("inspect").attr("getframeinfo"); - py::object caller_info = info(caller_frame); - py::object filename_python = caller_info.attr("filename"); - std::string filename = filename_python.cast(); - py::object lineno = caller_info.attr("lineno"); - uint32_t line = lineno.cast(); - - if (!stub->StubToParentServiceActive()) { - Logger::GetOrCreateInstance()->Log(filename, line, level, message); - } else { - std::unique_ptr log_msg(new PbLog(filename, line, message, level)); - stub->EnqueueLogRequest(log_msg); - } -} - -// Called internally (.e.g. LOG_ERROR << "Error"; ) -void -Logger::Log( - const std::string& filename, uint32_t lineno, LogLevel level, - const std::string& message) -{ - // If the log monitor service is not active yet, format - // and pass messages to cerr - if (!BackendLoggingActive()) { - std::string path(filename); - size_t pos = path.rfind(std::filesystem::path::preferred_separator); - if (pos != std::string::npos) { - path = path.substr(pos + 1, std::string::npos); - } -#ifdef _WIN32 - std::stringstream ss; - SYSTEMTIME system_time; - GetSystemTime(&system_time); - ss << LeadingLogChar(level) << std::setfill('0') << std::setw(2) - << system_time.wMonth << std::setw(2) << system_time.wDay << ' ' - << std::setw(2) << system_time.wHour << ':' << std::setw(2) - << system_time.wMinute << ':' << std::setw(2) << system_time.wSecond - << '.' << std::setw(6) << system_time.wMilliseconds * 1000 << ' ' - << static_cast(GetCurrentProcessId()) << ' ' << path << ':' - << lineno << "] "; -#else - std::stringstream ss; - struct timeval tv; - gettimeofday(&tv, NULL); - struct tm tm_time; - gmtime_r(((time_t*)&(tv.tv_sec)), &tm_time); - ss << LeadingLogChar(level) << std::setfill('0') << std::setw(2) - << (tm_time.tm_mon + 1) << std::setw(2) << tm_time.tm_mday << " " - << std::setw(2) << tm_time.tm_hour << ':' << std::setw(2) - << tm_time.tm_min << ':' << std::setw(2) << tm_time.tm_sec << "." - << std::setw(6) << tv.tv_usec << ' ' << static_cast(getpid()) - << ' ' << path << ':' << lineno << "] "; - std::cerr << ss.str() << " " << message << std::endl; -#endif - } else { - // Ensure we do not create a stub instance before it has initialized - std::unique_ptr& stub = Stub::GetOrCreateInstance(); - std::unique_ptr log_msg(new PbLog(filename, lineno, message, level)); - stub->EnqueueLogRequest(log_msg); - } -} - -void -Logger::LogInfo(const std::string& message) -{ - Logger::Log(message, LogLevel::kInfo); -} - -void -Logger::LogWarn(const std::string& message) -{ - Logger::Log(message, LogLevel::kWarning); -} - -void -Logger::LogError(const std::string& message) -{ - Logger::Log(message, LogLevel::kError); -} - -void -Logger::LogVerbose(const std::string& message) -{ - Logger::Log(message, LogLevel::kVerbose); -} - -const std::string -Logger::LeadingLogChar(const LogLevel& level) -{ - switch (level) { - case LogLevel::kWarning: - return "W"; - case LogLevel::kError: - return "E"; - case LogLevel::kInfo: - case LogLevel::kVerbose: - default: - return "I"; - } -} - -void -Logger::SetBackendLoggingActive(bool status) -{ - backend_logging_active_ = status; -} - -bool -Logger::BackendLoggingActive() -{ - return backend_logging_active_; -} - PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module) { py::class_> triton_error( @@ -2161,6 +2033,7 @@ main(int argc, char** argv) catch (const PythonBackendException& pb_exception) { LOG_INFO << "Failed to preinitialize Python stub: " << pb_exception.what(); logger.reset(); + stub.reset(); exit(1); } diff --git a/src/pb_stub.h b/src/pb_stub.h index 172c04a8..942ecd98 100644 --- a/src/pb_stub.h +++ b/src/pb_stub.h @@ -30,8 +30,6 @@ #include #include -#include - #include "infer_request.h" #include "infer_response.h" #include "ipc_message.h" @@ -41,7 +39,6 @@ #include "pb_cancel.h" #include "pb_log.h" #include "pb_response_iterator.h" -#include "pb_utils.h" namespace bi = boost::interprocess; @@ -54,105 +51,6 @@ using cudaStream_t = void*; namespace triton { namespace backend { namespace python { -#define LOG_IF_EXCEPTION(X) \ - do { \ - try { \ - (X); \ - } \ - catch (const PythonBackendException& pb_exception) { \ - LOG_INFO << pb_exception.what(); \ - } \ - } while (false) - -#define LOG_EXCEPTION(E) \ - do { \ - LOG_INFO << E.what(); \ - } while (false) - -/// Macros that use current filename and line number. -#define LOG_INFO LOG_FL(__FILE__, __LINE__, LogLevel::kInfo) -#define LOG_WARN LOG_FL(__FILE__, __LINE__, LogLevel::kWarning) -#define LOG_ERROR LOG_FL(__FILE__, __LINE__, LogLevel::kError) -#define LOG_VERBOSE LOG_FL(__FILE__, __LINE__, LogLevel::kVerbose) - -class Logger { - public: - Logger() { backend_logging_active_ = false; }; - ~Logger() { log_instance_.reset(); }; - /// Python client log function - static void Log(const std::string& message, LogLevel level = LogLevel::kInfo); - - /// Python client log info function - static void LogInfo(const std::string& message); - - /// Python client warning function - static void LogWarn(const std::string& message); - - /// Python client log error function - static void LogError(const std::string& message); - - /// Python client log verbose function - static void LogVerbose(const std::string& message); - - /// Internal log function - void Log( - const std::string& filename, uint32_t lineno, LogLevel level, - const std::string& message); - - /// Log format helper function - const std::string LeadingLogChar(const LogLevel& level); - - /// Set PYBE Logging Status - void SetBackendLoggingActive(bool status); - - /// Get PYBE Logging Status - bool BackendLoggingActive(); - - /// Singleton Getter Function - static std::unique_ptr& GetOrCreateInstance(); - - DISALLOW_COPY_AND_ASSIGN(Logger); - - /// Flush the log. - void Flush() { std::cerr << std::flush; } - - private: - static std::unique_ptr log_instance_; - bool backend_logging_active_; -}; - -class LogMessage { - public: - /// Create a log message, stripping the path down to the filename only - LogMessage(const char* file, int line, LogLevel level) : level_(level) - { - std::string path(file); - const char os_slash = std::filesystem::path::preferred_separator; - size_t pos = path.rfind(os_slash); - if (pos != std::string::npos) { - path = path.substr(pos + 1, std::string::npos); - } - file_ = path; - line_ = static_cast(line); - } - /// Log message to console or send to backend (see Logger::Log for details) - ~LogMessage() - { - Logger::GetOrCreateInstance()->Log(file_, line_, level_, stream_.str()); - } - - std::stringstream& stream() { return stream_; } - - private: - std::stringstream stream_; - std::string file_; - uint32_t line_; - LogLevel level_; -}; - -#define LOG_FL(FN, LN, LVL) LogMessage((char*)(FN), LN, LVL).stream() - - class ModelContext { public: // Scans and establishes path for serving the python model. diff --git a/src/pb_stub_log.cc b/src/pb_stub_log.cc new file mode 100644 index 00000000..d0b1ff97 --- /dev/null +++ b/src/pb_stub_log.cc @@ -0,0 +1,170 @@ +// Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#include "pb_stub_log.h" + +#include + +#include "pb_stub.h" + + +namespace py = pybind11; + +namespace triton { namespace backend { namespace python { + +std::unique_ptr Logger::log_instance_; + +std::unique_ptr& +Logger::GetOrCreateInstance() +{ + if (Logger::log_instance_.get() == nullptr) { + Logger::log_instance_ = std::make_unique(); + } + + return Logger::log_instance_; +} + +// Bound function, called from the python client +void +Logger::Log(const std::string& message, LogLevel level) +{ + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + py::object frame = py::module_::import("inspect").attr("currentframe"); + py::object caller_frame = frame(); + py::object info = py::module_::import("inspect").attr("getframeinfo"); + py::object caller_info = info(caller_frame); + py::object filename_python = caller_info.attr("filename"); + std::string filename = filename_python.cast(); + py::object lineno = caller_info.attr("lineno"); + uint32_t line = lineno.cast(); + + if (!stub->StubToParentServiceActive()) { + Logger::GetOrCreateInstance()->Log(filename, line, level, message); + } else { + std::unique_ptr log_msg(new PbLog(filename, line, message, level)); + stub->EnqueueLogRequest(log_msg); + } +} + +// Called internally (.e.g. LOG_ERROR << "Error"; ) +void +Logger::Log( + const std::string& filename, uint32_t lineno, LogLevel level, + const std::string& message) +{ + // If the log monitor service is not active yet, format + // and pass messages to cerr + if (!BackendLoggingActive()) { + std::string path(filename); + size_t pos = path.rfind(std::filesystem::path::preferred_separator); + if (pos != std::string::npos) { + path = path.substr(pos + 1, std::string::npos); + } +#ifdef _WIN32 + std::stringstream ss; + SYSTEMTIME system_time; + GetSystemTime(&system_time); + ss << LeadingLogChar(level) << std::setfill('0') << std::setw(2) + << system_time.wMonth << std::setw(2) << system_time.wDay << ' ' + << std::setw(2) << system_time.wHour << ':' << std::setw(2) + << system_time.wMinute << ':' << std::setw(2) << system_time.wSecond + << '.' << std::setw(6) << system_time.wMilliseconds * 1000 << ' ' + << static_cast(GetCurrentProcessId()) << ' ' << path << ':' + << lineno << "] "; +#else + std::stringstream ss; + struct timeval tv; + gettimeofday(&tv, NULL); + struct tm tm_time; + gmtime_r(((time_t*)&(tv.tv_sec)), &tm_time); + ss << LeadingLogChar(level) << std::setfill('0') << std::setw(2) + << (tm_time.tm_mon + 1) << std::setw(2) << tm_time.tm_mday << " " + << std::setw(2) << tm_time.tm_hour << ':' << std::setw(2) + << tm_time.tm_min << ':' << std::setw(2) << tm_time.tm_sec << "." + << std::setw(6) << tv.tv_usec << ' ' << static_cast(getpid()) + << ' ' << path << ':' << lineno << "] "; + std::cerr << ss.str() << " " << message << std::endl; +#endif + } else { + // Ensure we do not create a stub instance before it has initialized + std::unique_ptr& stub = Stub::GetOrCreateInstance(); + std::unique_ptr log_msg(new PbLog(filename, lineno, message, level)); + stub->EnqueueLogRequest(log_msg); + } +} + +void +Logger::LogInfo(const std::string& message) +{ + Logger::Log(message, LogLevel::kInfo); +} + +void +Logger::LogWarn(const std::string& message) +{ + Logger::Log(message, LogLevel::kWarning); +} + +void +Logger::LogError(const std::string& message) +{ + Logger::Log(message, LogLevel::kError); +} + +void +Logger::LogVerbose(const std::string& message) +{ + Logger::Log(message, LogLevel::kVerbose); +} + +const std::string +Logger::LeadingLogChar(const LogLevel& level) +{ + switch (level) { + case LogLevel::kWarning: + return "W"; + case LogLevel::kError: + return "E"; + case LogLevel::kInfo: + case LogLevel::kVerbose: + default: + return "I"; + } +} + +void +Logger::SetBackendLoggingActive(bool status) +{ + backend_logging_active_ = status; +} + +bool +Logger::BackendLoggingActive() +{ + return backend_logging_active_; +} + +}}} // namespace triton::backend::python diff --git a/src/pb_stub_log.h b/src/pb_stub_log.h new file mode 100644 index 00000000..df67eba8 --- /dev/null +++ b/src/pb_stub_log.h @@ -0,0 +1,134 @@ +// Copyright 2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions +// are met: +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above copyright +// notice, this list of conditions and the following disclaimer in the +// documentation and/or other materials provided with the distribution. +// * Neither the name of NVIDIA CORPORATION nor the names of its +// contributors may be used to endorse or promote products derived +// from this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY +// EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE +// IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR +// PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR +// CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, +// EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, +// PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR +// PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY +// OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +#pragma once + +#include +#include + +#include "pb_utils.h" + +namespace triton { namespace backend { namespace python { + +#define LOG_IF_EXCEPTION(X) \ + do { \ + try { \ + (X); \ + } \ + catch (const PythonBackendException& pb_exception) { \ + LOG_INFO << pb_exception.what(); \ + } \ + } while (false) + +#define LOG_EXCEPTION(E) \ + do { \ + LOG_INFO << E.what(); \ + } while (false) + +/// Macros that use current filename and line number. +#define LOG_INFO LOG_FL(__FILE__, __LINE__, LogLevel::kInfo) +#define LOG_WARN LOG_FL(__FILE__, __LINE__, LogLevel::kWarning) +#define LOG_ERROR LOG_FL(__FILE__, __LINE__, LogLevel::kError) +#define LOG_VERBOSE LOG_FL(__FILE__, __LINE__, LogLevel::kVerbose) + +class Logger { + public: + Logger() { backend_logging_active_ = false; }; + ~Logger() { log_instance_.reset(); }; + /// Python client log function + static void Log(const std::string& message, LogLevel level = LogLevel::kInfo); + + /// Python client log info function + static void LogInfo(const std::string& message); + + /// Python client warning function + static void LogWarn(const std::string& message); + + /// Python client log error function + static void LogError(const std::string& message); + + /// Python client log verbose function + static void LogVerbose(const std::string& message); + + /// Internal log function + void Log( + const std::string& filename, uint32_t lineno, LogLevel level, + const std::string& message); + + /// Log format helper function + const std::string LeadingLogChar(const LogLevel& level); + + /// Set PYBE Logging Status + void SetBackendLoggingActive(bool status); + + /// Get PYBE Logging Status + bool BackendLoggingActive(); + + /// Singleton Getter Function + static std::unique_ptr& GetOrCreateInstance(); + + DISALLOW_COPY_AND_ASSIGN(Logger); + + /// Flush the log. + void Flush() { std::cerr << std::flush; } + + private: + static std::unique_ptr log_instance_; + bool backend_logging_active_; +}; + +class LogMessage { + public: + /// Create a log message, stripping the path down to the filename only + LogMessage(const char* file, int line, LogLevel level) : level_(level) + { + std::string path(file); + const char os_slash = std::filesystem::path::preferred_separator; + size_t pos = path.rfind(os_slash); + if (pos != std::string::npos) { + path = path.substr(pos + 1, std::string::npos); + } + file_ = path; + line_ = static_cast(line); + } + /// Log message to console or send to backend (see Logger::Log for details) + ~LogMessage() + { + Logger::GetOrCreateInstance()->Log(file_, line_, level_, stream_.str()); + } + + std::stringstream& stream() { return stream_; } + + private: + std::stringstream stream_; + std::string file_; + uint32_t line_; + LogLevel level_; +}; + +#define LOG_FL(FN, LN, LVL) LogMessage((char*)(FN), LN, LVL).stream() + +}}} // namespace triton::backend::python diff --git a/src/pb_tensor.cc b/src/pb_tensor.cc index 9fde62fe..26e77586 100644 --- a/src/pb_tensor.cc +++ b/src/pb_tensor.cc @@ -41,9 +41,70 @@ namespace py = pybind11; typedef SSIZE_T ssize_t; #endif +#include +#include +#include + namespace triton { namespace backend { namespace python { #ifdef TRITON_PB_STUB +py::array +deserialize_bytes_tensor_cpp(const uint8_t* data, size_t data_size) +{ + if (data_size == 0) { + py::module numpy = py::module::import("numpy"); + return numpy.attr("empty")(0, py::dtype("object")); + } + + // First pass: count the number of strings and calculate total size + size_t offset = 0; + size_t num_strings = 0; + size_t total_string_size = 0; + + while (offset < data_size) { + if (offset + 4 > data_size) { + throw PythonBackendException( + "Invalid bytes tensor data: incomplete length field"); + } + + // Read 4-byte length (little-endian) + uint32_t length = *reinterpret_cast(data + offset); + offset += 4; + + if (offset + length > data_size) { + throw PythonBackendException( + "Invalid bytes tensor data: string extends beyond buffer"); + } + + num_strings++; + total_string_size += length; + offset += length; + } + + // Create numpy array of objects using pybind11's numpy module + py::module numpy = py::module::import("numpy"); + py::array result = numpy.attr("empty")(num_strings, py::dtype("object")); + auto result_ptr = static_cast(result.request().ptr); + + // Second pass: extract strings + offset = 0; + size_t string_index = 0; + + while (offset < data_size) { + uint32_t length = *reinterpret_cast(data + offset); + offset += 4; + + // Create Python bytes object using pybind11 + py::bytes bytes_obj(reinterpret_cast(data + offset), length); + Py_INCREF(bytes_obj.ptr()); // Increment reference count + result_ptr[string_index] = bytes_obj.ptr(); + string_index++; + offset += length; + } + + return result; +} + PbTensor::PbTensor(const std::string& name, py::array& numpy_array) : name_(name) { @@ -160,14 +221,9 @@ PbTensor::PbTensor( py::array(triton_to_pybind_dtype(dtype_), dims_, (void*)memory_ptr_); numpy_array_ = numpy_array.attr("view")(triton_to_numpy_type(dtype_)); } else { - py::object numpy_array = py::array( - triton_to_pybind_dtype(TRITONSERVER_TYPE_UINT8), {byte_size}, - (void*)memory_ptr_); - py::module triton_pb_utils = - py::module::import("triton_python_backend_utils"); - numpy_array_ = - triton_pb_utils.attr("deserialize_bytes_tensor")(numpy_array) - .attr("reshape")(dims); + py::object numpy_array = deserialize_bytes_tensor_cpp( + static_cast(memory_ptr_), byte_size_); + numpy_array_ = numpy_array.attr("reshape")(dims_); } } else { numpy_array_ = py::none(); @@ -234,6 +290,7 @@ delete_unused_dltensor(PyObject* dlp) } } + std::shared_ptr PbTensor::FromNumpy(const std::string& name, py::array& numpy_array) { @@ -668,14 +725,9 @@ PbTensor::PbTensor( py::array(triton_to_pybind_dtype(dtype_), dims_, (void*)memory_ptr_); numpy_array_ = numpy_array.attr("view")(triton_to_numpy_type(dtype_)); } else { - py::object numpy_array = py::array( - triton_to_pybind_dtype(TRITONSERVER_TYPE_UINT8), {byte_size_}, - (void*)memory_ptr_); - py::module triton_pb_utils = - py::module::import("triton_python_backend_utils"); - numpy_array_ = - triton_pb_utils.attr("deserialize_bytes_tensor")(numpy_array) - .attr("reshape")(dims_); + py::object numpy_array = deserialize_bytes_tensor_cpp( + static_cast(memory_ptr_), byte_size_); + numpy_array_ = numpy_array.attr("reshape")(dims_); } } else { numpy_array_ = py::none(); diff --git a/src/pb_utils.cc b/src/pb_utils.cc index 809531b8..79b45ec2 100644 --- a/src/pb_utils.cc +++ b/src/pb_utils.cc @@ -26,12 +26,21 @@ #include "pb_utils.h" +#include + +#include + #ifdef _WIN32 #include #include #else #include +#include +#endif + +#ifndef _WIN32 +extern char** environ; #endif @@ -315,6 +324,30 @@ WrapTritonErrorInSharedPtr(TRITONSERVER_Error* error) } #endif // NOT TRITON_PB_STUB +bool +IsValidIdentifier(const std::string& input) +{ + // Check for invalid characters + if (input.empty() || + input.find_first_of(INVALID_CHARS) != std::string::npos) { + return false; + } + + return true; +} + +bool +IsExecutableFile(const std::string& filepath) +{ + struct stat file_stat; + if (stat(filepath.c_str(), &file_stat) != 0) { + return false; + } + + // Check if it's a regular file and executable by owner + return S_ISREG(file_stat.st_mode) && (file_stat.st_mode & S_IXUSR); +} + std::string GenerateUUID() { @@ -323,4 +356,85 @@ GenerateUUID() return boost::uuids::to_string(uuid); } +// Helper function to get environment variables for Python virtual environments +std::map +ParseActivationScript(const std::string& activate_path) +{ + std::map env_vars; + + // Read the current environment as baseline +#ifndef _WIN32 + if (environ != nullptr) { + for (char** env = environ; *env != nullptr; env++) { + std::string env_str(*env); + size_t eq_pos = env_str.find('='); + if (eq_pos != std::string::npos) { + std::string key = env_str.substr(0, eq_pos); + std::string value = env_str.substr(eq_pos + 1); + env_vars[key] = value; + } + } + } +#endif + + // Extract virtual environment root from activation script path + std::string venv_path = activate_path; + size_t bin_activate_pos = venv_path.find("/bin/activate"); + if (bin_activate_pos != std::string::npos) { + venv_path = venv_path.substr(0, bin_activate_pos); + } + + // Set standard virtual environment variables + env_vars["VIRTUAL_ENV"] = venv_path; + env_vars["VIRTUAL_ENV_PROMPT"] = "(" + venv_path + ")"; + + // Update PATH to include the virtual environment's bin directory + std::string new_path = venv_path + "/bin"; + if (env_vars.find("PATH") != env_vars.end()) { + new_path += ":" + env_vars["PATH"]; + } + env_vars["PATH"] = new_path; + + // Update LD_LIBRARY_PATH to include the virtual environment's lib directory + std::string new_lib_path = venv_path + "/lib"; + if (env_vars.find("LD_LIBRARY_PATH") != env_vars.end()) { + new_lib_path += ":" + env_vars["LD_LIBRARY_PATH"]; + } + env_vars["LD_LIBRARY_PATH"] = new_lib_path; + + // Remove PYTHONHOME if it exists + env_vars.erase("PYTHONHOME"); + + return env_vars; +} + +// Helper function to prepare environment array for execve +std::pair, std::vector> +PrepareEnvironment( + const std::map& env_vars, + const std::string& additional_lib_path) +{ + std::vector env_strings; + std::vector env_array; + + for (const auto& [key, value] : env_vars) { + std::string env_string; + if (key == "LD_LIBRARY_PATH" && !additional_lib_path.empty()) { + // Prepend the additional library path + env_string = key + "=" + additional_lib_path + ":" + value; + } else { + env_string = key + "=" + value; + } + env_strings.push_back(env_string); + } + + // Convert to char* array + for (auto& env_str : env_strings) { + env_array.push_back(const_cast(env_str.c_str())); + } + env_array.push_back(nullptr); + + return std::make_pair(std::move(env_strings), std::move(env_array)); +} + }}} // namespace triton::backend::python diff --git a/src/pb_utils.h b/src/pb_utils.h index 1306f375..fa315210 100644 --- a/src/pb_utils.h +++ b/src/pb_utils.h @@ -36,10 +36,12 @@ #include #include #include +#include #include #include #include #include +#include #include #include "pb_exception.h" @@ -73,7 +75,7 @@ namespace bi = boost::interprocess; TRITONSERVER_ErrorMessage(pb2_exception.what())); \ } \ } \ - while (false) + } while (false) #define THROW_IF_TRITON_ERROR(X) \ do { \ @@ -341,6 +343,15 @@ bool IsUsingCUDAPool( // being retrieved from core that are not platform-agnostic. void SanitizePath(std::string& path); +// Invalid characters that are not allowed in user input +constexpr const char* INVALID_CHARS = ";|&$`<>()[]{}\\\"'*?~#!"; + +// Validate that an identifier (model name, region name, etc.) +bool IsValidIdentifier(const std::string& input); + +// Check if a file exists and is executable +bool IsExecutableFile(const std::string& filepath); + #ifndef TRITON_PB_STUB std::shared_ptr WrapTritonErrorInSharedPtr( TRITONSERVER_Error* error); @@ -348,4 +359,12 @@ std::shared_ptr WrapTritonErrorInSharedPtr( std::string GenerateUUID(); +// Environment handling utilities for Python activation scripts +std::map ParseActivationScript( + const std::string& activate_path); + +std::pair, std::vector> PrepareEnvironment( + const std::map& env_vars, + const std::string& additional_lib_path = ""); + }}} // namespace triton::backend::python diff --git a/src/shm_manager.cc b/src/shm_manager.cc index 1c7c4d65..134cee6f 100644 --- a/src/shm_manager.cc +++ b/src/shm_manager.cc @@ -1,4 +1,4 @@ -// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -188,8 +188,8 @@ SharedMemoryManager::GrowIfNeeded(uint64_t byte_size) } catch (bi::interprocess_exception& ex) { std::string error_message = - ("Failed to increase the shared memory pool size for key '" + - shm_region_name_ + "' to " + std::to_string(*total_size_) + + ("Failed to increase the shared memory pool size to " + + std::to_string(*total_size_) + " bytes. If you are running Triton inside docker, use '--shm-size' " "flag to control the shared memory region size. Error: " + ex.what()); diff --git a/src/shm_manager.h b/src/shm_manager.h index 25e04570..e0799a07 100644 --- a/src/shm_manager.h +++ b/src/shm_manager.h @@ -1,4 +1,4 @@ -// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +// Copyright 2021-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. // // Redistribution and use in source and binary forms, with or without // modification, are permitted provided that the following conditions @@ -188,6 +188,9 @@ class SharedMemoryManager { return cuda_memory_pool_manager_; } + uint64_t GetCurrentCapacity() { return current_capacity_; } + void* GetBaseAddress() { return managed_buffer_->get_address(); } + ~SharedMemoryManager() noexcept(false); private: diff --git a/src/shm_monitor/CMakeLists.txt b/src/shm_monitor/CMakeLists.txt index 0f7d4b86..2ae8bd45 100644 --- a/src/shm_monitor/CMakeLists.txt +++ b/src/shm_monitor/CMakeLists.txt @@ -1,4 +1,4 @@ -# Copyright (c) 2022, NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# Copyright (c) 2022-2025, NVIDIA CORPORATION & AFFILIATES. All rights reserved. # # Redistribution and use in source and binary forms, with or without # modification, are permitted provided that the following conditions @@ -24,7 +24,7 @@ # (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE # OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. -cmake_minimum_required (VERSION 3.18) +cmake_minimum_required (VERSION 3.31.8) pybind11_add_module( triton-shm-monitor diff --git a/src/stub_launcher.cc b/src/stub_launcher.cc index 828228e6..32f5d1bd 100644 --- a/src/stub_launcher.cc +++ b/src/stub_launcher.cc @@ -28,12 +28,15 @@ #include +#include "pb_utils.h" #include "python_be.h" #ifdef _WIN32 #include // getpid() #endif +extern char** environ; + namespace triton { namespace backend { namespace python { StubLauncher::StubLauncher(const std::string stub_process_kind) @@ -280,7 +283,9 @@ StubLauncher::Launch() // Push a dummy message to the message queue so that the stub // process is notified that it can release the object stored in // shared memory. - stub_message_queue_->Push(DUMMY_MESSAGE); + if (stub_message_queue_) { + stub_message_queue_->Push(DUMMY_MESSAGE); + } // If the model is not initialized, wait for the stub process to exit. if (!is_initialized_) { @@ -299,11 +304,23 @@ StubLauncher::Launch() // // The reason it is broken into two steps is that creation of the health // monitoring thread may take longer which can make the server process think - // that the stub process is unhealthy and return early. Waiting until the - // health thread is spawn would make sure would prevent this issue. - parent_message_queue_->Pop(); + // that the stub process is unhealthy and return early. Waiting with a longer + // timeout prevents this issue. + const uint64_t initialization_timeout_ms = 10000; // 10 sec + LOG_MESSAGE( + TRITONSERVER_LOG_VERBOSE, + "Waiting for the stub health monitoring thread to start"); + + bi::managed_external_buffer::handle_t message; + auto err = ReceiveMessageFromStub(message, initialization_timeout_ms); + if (err != nullptr) { + KillStubProcess(); + } if (stub_process_kind_ == "AUTOCOMPLETE_STUB") { + if (err != nullptr) { + throw BackendModelException(err); + } try { AutocompleteStubProcess(); } @@ -314,6 +331,7 @@ StubLauncher::Launch() TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, ex.what())); } } else if (stub_process_kind_ == "MODEL_INSTANCE_STUB") { + RETURN_IF_ERROR(err); RETURN_IF_ERROR(ModelInstanceStubProcess()); } else { return TRITONSERVER_ErrorNew( @@ -337,10 +355,17 @@ StubLauncher::Launch() stub_name = model_instance_name_; } - const char* stub_args[4]; - stub_args[0] = "bash"; - stub_args[1] = "-c"; - stub_args[3] = nullptr; // Last argument must be nullptr + if (!IsValidIdentifier(stub_name)) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + "Invalid stub name: contains invalid characters"); + } + + if (!IsValidIdentifier(shm_region_name_)) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INVALID_ARG, + "Invalid shared memory region name: contains invalid characters"); + } // Default Python backend stub std::string python_backend_stub = python_lib_ + "/triton_python_backend_stub"; @@ -353,48 +378,7 @@ StubLauncher::Launch() python_backend_stub = model_python_backend_stub; } - std::string bash_argument; - - // This shared memory variable indicates whether the stub process should - // revert the LD_LIBRARY_PATH changes to avoid shared library issues in - // executables and libraries. - ipc_control_->uses_env = false; - if (python_execution_env_ != "") { - std::stringstream ss; - - // Need to properly set the LD_LIBRARY_PATH so that Python environments - // using different python versions load properly. - ss << "source " << path_to_activate_ - << " && exec env LD_LIBRARY_PATH=" << path_to_libpython_ - << ":$LD_LIBRARY_PATH " << python_backend_stub << " " << model_path_ - << " " << shm_region_name_ << " " << shm_default_byte_size_ << " " - << shm_growth_byte_size_ << " " << parent_pid_ << " " << python_lib_ - << " " << ipc_control_handle_ << " " << stub_name << " " - << runtime_modeldir_; - ipc_control_->uses_env = true; - bash_argument = ss.str(); - } else { - std::stringstream ss; - ss << " exec " << python_backend_stub << " " << model_path_ << " " - << shm_region_name_ << " " << shm_default_byte_size_ << " " - << shm_growth_byte_size_ << " " << parent_pid_ << " " << python_lib_ - << " " << ipc_control_handle_ << " " << stub_name << " " - << runtime_modeldir_; - bash_argument = ss.str(); - } - LOG_MESSAGE( - TRITONSERVER_LOG_VERBOSE, - (std::string("Starting Python backend stub: ") + bash_argument).c_str()); - - stub_args[2] = bash_argument.c_str(); - - int stub_status_code = - system((python_backend_stub + "> /dev/null 2>&1").c_str()); - - // If running stub process without any arguments returns any status code, - // other than 1, it can indicate a permission issue as a result of - // downloading the stub process from a cloud object storage service. - if (WEXITSTATUS(stub_status_code) != 1) { + if (!IsExecutableFile(python_backend_stub)) { // Give the execute permission for the triton_python_backend_stub to the // owner. int error = chmod(python_backend_stub.c_str(), S_IXUSR); @@ -409,79 +393,196 @@ StubLauncher::Launch() } } - pid_t pid = fork(); - if (pid < 0) { - return TRITONSERVER_ErrorNew( - TRITONSERVER_ERROR_INTERNAL, - "Failed to fork the stub process for auto-complete."); - } - if (pid == 0) { - // Replace this child process with the new stub process. - execvp("bash", (char**)stub_args); - // execvp() never return if succeeded. Otherwise, an error has occurred. - std::stringstream ss; - ss << "Failed to run python backend stub. Errno = " << errno << '\n' - << "Python backend stub path: " << python_backend_stub << '\n' - << "Shared Memory Region Name: " << shm_region_name_ << '\n' - << "Shared Memory Default Byte Size: " << shm_default_byte_size_ << '\n' - << "Shared Memory Growth Byte Size: " << shm_growth_byte_size_ << '\n'; - // Print the error message directly because the underlying mutexes in - // LOG_MESSAGE() could be forked when it is locked by other thread(s). - std::cerr << '\n' << ss.str() << '\n'; - // Terminate the child execution immediately to avoid any issues. - _Exit(1); - } else { - ScopedDefer _([&] { - // Push a dummy message to the message queue so that the stub - // process is notified that it can release the object stored in - // shared memory. - stub_message_queue_->Push(DUMMY_MESSAGE); + // Prepare arguments for execution + std::vector arg_strings; + std::vector exec_args; - // If the model is not initialized, wait for the stub process to exit. - if (!is_initialized_) { - stub_message_queue_.reset(); - parent_message_queue_.reset(); - memory_manager_.reset(); - WaitForStubProcess(); - } - }); - - stub_pid_ = pid; - - // The stub process would send two messages to the parent process during the - // initialization. - // 1. When the stub process's health monitoring thread has started. - // 2. When the initialization is fully completed and the Python model is - // loaded. - // - // The reason it is broken into two steps is that creation of the health - // monitoring thread may take longer which can make the server process think - // that the stub process is unhealthy and return early. Waiting until the - // health thread is spawn would prevent this issue. - parent_message_queue_->Pop(); - - if (stub_process_kind_ == "AUTOCOMPLETE_STUB") { - try { - AutocompleteStubProcess(); - } - catch (const PythonBackendException& ex) { - // Need to kill the stub process first - KillStubProcess(); - throw BackendModelException( - TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, ex.what())); - } - } else if (stub_process_kind_ == "MODEL_INSTANCE_STUB") { - RETURN_IF_ERROR(ModelInstanceStubProcess()); + // This shared memory variable indicates whether the stub process should + // revert the LD_LIBRARY_PATH changes to avoid shared library issues in + // executables and libraries. + ipc_control_->uses_env = false; + + if (python_execution_env_ != "") { + ipc_control_->uses_env = true; + + // Parse environment variables from activation script + std::map env_vars = + ParseActivationScript(path_to_activate_); + + // Prepare environment with additional library path + auto [env_strings, custom_env] = + PrepareEnvironment(env_vars, path_to_libpython_); + + // Set up arguments for direct execution + arg_strings.push_back(python_backend_stub); + arg_strings.push_back(model_path_); + arg_strings.push_back(shm_region_name_); + arg_strings.push_back(std::to_string(shm_default_byte_size_)); + arg_strings.push_back(std::to_string(shm_growth_byte_size_)); + arg_strings.push_back(std::to_string(parent_pid_)); + arg_strings.push_back(python_lib_); + arg_strings.push_back(std::to_string(ipc_control_handle_)); + arg_strings.push_back(stub_name); + arg_strings.push_back(runtime_modeldir_); + + // Convert strings to char* array for exec + for (const auto& arg : arg_strings) { + exec_args.push_back(arg.c_str()); + } + exec_args.push_back(nullptr); // exec requires null termination + + // Log the command being executed + std::ostringstream log_cmd; + for (size_t i = 0; i < arg_strings.size(); ++i) { + if (i > 0) + log_cmd << " "; + log_cmd << "'" << arg_strings[i] << "'"; + } + LOG_MESSAGE( + TRITONSERVER_LOG_VERBOSE, + (std::string("Starting Python backend stub with custom environment: ") + + log_cmd.str()) + .c_str()); + + pid_t pid = fork(); + if (pid < 0) { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, + "Failed to fork the stub process for auto-complete."); + } + if (pid == 0) { + // Replace this child process with the new stub process using custom + // environment + execve( + python_backend_stub.c_str(), const_cast(exec_args.data()), + custom_env.data()); + // execve() never returns if succeeded. Otherwise, an error has occurred. + std::stringstream ss; + ss << "Failed to run python backend stub with custom environment. Errno " + "= " + << errno << '\n' + << "Python backend stub path: " << python_backend_stub << '\n' + << "Activation script: " << path_to_activate_ << '\n' + << "Library path: " << path_to_libpython_ << '\n'; + std::cerr << '\n' << ss.str() << '\n'; + _Exit(1); } else { + stub_pid_ = pid; + } + + } else { + arg_strings.push_back(python_backend_stub); + arg_strings.push_back(model_path_); + arg_strings.push_back(shm_region_name_); + arg_strings.push_back(std::to_string(shm_default_byte_size_)); + arg_strings.push_back(std::to_string(shm_growth_byte_size_)); + arg_strings.push_back(std::to_string(parent_pid_)); + arg_strings.push_back(python_lib_); + arg_strings.push_back(std::to_string(ipc_control_handle_)); + arg_strings.push_back(stub_name); + arg_strings.push_back(runtime_modeldir_); + + // Convert strings to char* array for exec + for (const auto& arg : arg_strings) { + exec_args.push_back(arg.c_str()); + } + exec_args.push_back(nullptr); // exec requires null termination + + // Log the command being executed + std::ostringstream log_cmd; + for (size_t i = 0; i < arg_strings.size(); ++i) { + if (i > 0) + log_cmd << " "; + log_cmd << "'" << arg_strings[i] << "'"; + } + LOG_MESSAGE( + TRITONSERVER_LOG_VERBOSE, + (std::string("Starting Python backend stub: ") + log_cmd.str()) + .c_str()); + + pid_t pid = fork(); + if (pid < 0) { return TRITONSERVER_ErrorNew( TRITONSERVER_ERROR_INTERNAL, - (std::string("Unknown stub_process_kind: ") + stub_process_kind_) - .c_str()); + "Failed to fork the stub process for auto-complete."); + } + if (pid == 0) { + // Replace this child process with the new stub process. + execv(python_backend_stub.c_str(), const_cast(exec_args.data())); + // execv() never returns if succeeded. Otherwise, an error has occurred. + std::stringstream ss; + ss << "Failed to run python backend stub. Errno = " << errno << '\n' + << "Python backend stub path: " << python_backend_stub << '\n'; + std::cerr << '\n' << ss.str() << '\n'; + _Exit(1); + } else { + stub_pid_ = pid; + } + } + + ScopedDefer _([&] { + // Push a dummy message to the message queue so that the stub + // process is notified that it can release the object stored in + // shared memory. + if (stub_message_queue_) { + stub_message_queue_->Push(DUMMY_MESSAGE); + } + + // If the model is not initialized, wait for the stub process to exit. + if (!is_initialized_) { + stub_message_queue_.reset(); + parent_message_queue_.reset(); + memory_manager_.reset(); + WaitForStubProcess(); } + }); + + // The stub process would send two messages to the parent process during the + // initialization. + // 1. When the stub process's health monitoring thread has started. + // 2. When the initialization is fully completed and the Python model is + // loaded. + // + // The reason it is broken into two steps is that creation of the health + // monitoring thread may take longer which can make the server process think + // that the stub process is unhealthy and return early. Waiting with a + // longer timeout prevents this issue. + const uint64_t initialization_timeout_ms = 10000; // 10 sec + LOG_MESSAGE( + TRITONSERVER_LOG_VERBOSE, + "Waiting for the stub health monitoring thread to start"); + + bi::managed_external_buffer::handle_t message; + auto err = ReceiveMessageFromStub(message, initialization_timeout_ms); + if (err != nullptr) { + KillStubProcess(); + } - is_initialized_ = true; + if (stub_process_kind_ == "AUTOCOMPLETE_STUB") { + if (err != nullptr) { + throw BackendModelException(err); + } + try { + AutocompleteStubProcess(); + } + catch (const PythonBackendException& ex) { + // Need to kill the stub process first + KillStubProcess(); + throw BackendModelException( + TRITONSERVER_ErrorNew(TRITONSERVER_ERROR_INTERNAL, ex.what())); + } + } else if (stub_process_kind_ == "MODEL_INSTANCE_STUB") { + RETURN_IF_ERROR(err); + RETURN_IF_ERROR(ModelInstanceStubProcess()); + } else { + return TRITONSERVER_ErrorNew( + TRITONSERVER_ERROR_INTERNAL, + (std::string("Unknown stub_process_kind: ") + stub_process_kind_) + .c_str()); } + is_initialized_ = true; + return nullptr; } @@ -592,8 +693,13 @@ StubLauncher::ModelInstanceStubProcess() initialize_message->Args() = initialize_map_handle; stub_message_queue_->Push(initialize_message->ShmHandle()); + const uint64_t initialization_timeout_ms = 5000; // 5 sec + LOG_MESSAGE( + TRITONSERVER_LOG_VERBOSE, + "Waiting for the stub process initialization response"); + bi::managed_external_buffer::handle_t message; - RETURN_IF_ERROR(ReceiveMessageFromStub(message)); + RETURN_IF_ERROR(ReceiveMessageFromStub(message, initialization_timeout_ms)); std::unique_ptr initialize_response_message = IPCMessage::LoadFromSharedMemory(shm_pool_, message); @@ -726,11 +832,11 @@ StubLauncher::KillStubProcess() TRITONSERVER_Error* StubLauncher::ReceiveMessageFromStub( - bi::managed_external_buffer::handle_t& message) + bi::managed_external_buffer::handle_t& message, + uint64_t timeout_miliseconds) { bool success = false; while (!success) { - uint64_t timeout_miliseconds = 1000; { boost::posix_time::ptime timeout = boost::get_system_time() + diff --git a/src/stub_launcher.h b/src/stub_launcher.h index 6c8dd910..58cdcc61 100644 --- a/src/stub_launcher.h +++ b/src/stub_launcher.h @@ -147,7 +147,8 @@ class StubLauncher { // Get a message from the stub process TRITONSERVER_Error* ReceiveMessageFromStub( - bi::managed_external_buffer::handle_t& message); + bi::managed_external_buffer::handle_t& message, + uint64_t timeout_miliseconds = 1000); // Wait for stub process void WaitForStubProcess();