Skip to content

Commit 0cdcaf3

Browse files
authored
Decoupled Async Execute (triton-inference-server#350)
* Add async decoupled execute * Enable decoupled bls async exec * Improve handling for async execute future object * Add docs for async execute for decoupled model * Fix link on docs * Improve docs wording * Improve destruction steps for async execute future object * Piggy back on GIL for protection * Document model should not modify event loop * Use Python add_done_callback * Protect infer_payload_ * Use traceback API that supports Python 3.8 and 3.9 * Update docs
1 parent 4d42111 commit 0cdcaf3

File tree

5 files changed

+107
-14
lines changed

5 files changed

+107
-14
lines changed

README.md

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ any C++ code.
4949
- [Request Cancellation Handling](#request-cancellation-handling)
5050
- [Decoupled mode](#decoupled-mode)
5151
- [Use Cases](#use-cases)
52-
- [Known Issues](#known-issues)
52+
- [Async Execute](#async-execute)
5353
- [Request Rescheduling](#request-rescheduling)
5454
- [`finalize`](#finalize)
5555
- [Model Config File](#model-config-file)
@@ -620,9 +620,24 @@ full power of what can be achieved from decoupled API. Read
620620
[Decoupled Backends and Models](https://github.com/triton-inference-server/server/blob/main/docs/user_guide/decoupled_models.md)
621621
for more details on how to host a decoupled model.
622622

623-
##### Known Issues
623+
##### Async Execute
624624

625-
* Currently, decoupled Python models can not make async infer requests.
625+
Starting from 24.04, `async def execute(self, requests):` is supported for
626+
decoupled Python models. Its coroutine will be executed by an AsyncIO event loop
627+
shared with requests executing in the same model instance. The next request for
628+
the model instance can start executing while the current request is waiting.
629+
630+
This is useful for minimizing the number of model instances for models that
631+
spend the majority of its time waiting, given requests can be executed
632+
concurrently by AsyncIO. To take full advantage of the concurrency, it is vital
633+
for the async execute function to not block the event loop from making progress
634+
while it is waiting, i.e. downloading over the network.
635+
636+
Notes:
637+
* The model should not modify the running event loop, as this might cause
638+
unexpected issues.
639+
* The server/backend do not control how many requests are added to the event
640+
loop by a model instance.
626641

627642
#### Request Rescheduling
628643

src/pb_stub.cc

Lines changed: 80 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,32 @@ PyDefaultArgumentToMutableType(const py::object& argument)
104104
std::string(py::str(argument.get_type())));
105105
}
106106

107+
void
108+
AsyncEventFutureDoneCallback(const py::object& py_future)
109+
{
110+
// TODO: Why using `py_future.result()` with error hangs on exit?
111+
try {
112+
py::object exception = py_future.attr("exception")();
113+
if (!py::isinstance<py::none>(exception)) {
114+
std::string err_msg = "";
115+
py::object traceback = py::module_::import("traceback")
116+
.attr("TracebackException")
117+
.attr("from_exception")(exception)
118+
.attr("format")();
119+
for (py::handle line : traceback) {
120+
err_msg += py::str(line);
121+
}
122+
LOG_ERROR << err_msg;
123+
}
124+
}
125+
catch (const PythonBackendException& pb_exception) {
126+
LOG_ERROR << pb_exception.what();
127+
}
128+
catch (const py::error_already_set& error) {
129+
LOG_ERROR << error.what();
130+
}
131+
}
132+
107133
void
108134
Stub::Instantiate(
109135
int64_t shm_growth_size, int64_t shm_default_size,
@@ -533,6 +559,8 @@ Stub::Initialize(bi::managed_external_buffer::handle_t map_handle)
533559
c_python_backend_utils.attr("InferenceResponse"));
534560
c_python_backend_utils.attr("shared_memory") = py::cast(shm_pool_.get());
535561

562+
async_event_loop_ = py::none();
563+
536564
py::object TritonPythonModel = sys.attr("TritonPythonModel");
537565
deserialize_bytes_ = python_backend_utils.attr("deserialize_bytes_tensor");
538566
serialize_bytes_ = python_backend_utils.attr("serialize_byte_tensor");
@@ -690,11 +718,18 @@ Stub::ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr)
690718

691719
py::object execute_return =
692720
model_instance_.attr("execute")(py_request_list);
693-
if (!py::isinstance<py::none>(execute_return)) {
694-
throw PythonBackendException(
695-
"Python model '" + name_ +
696-
"' is using the decoupled mode and the execute function must "
697-
"return None.");
721+
bool is_coroutine = py::module::import("asyncio")
722+
.attr("iscoroutine")(execute_return)
723+
.cast<bool>();
724+
if (is_coroutine) {
725+
RunCoroutine(execute_return);
726+
} else {
727+
if (!py::isinstance<py::none>(execute_return)) {
728+
throw PythonBackendException(
729+
"Python model '" + name_ +
730+
"' is using the decoupled mode and the execute function must "
731+
"return None.");
732+
}
698733
}
699734
}
700735
}
@@ -870,6 +905,35 @@ Stub::ProcessRequests(RequestBatch* request_batch_shm_ptr)
870905
}
871906
}
872907

908+
py::object
909+
Stub::GetAsyncEventLoop()
910+
{
911+
if (py::isinstance<py::none>(async_event_loop_)) {
912+
// Create the event loop if not already.
913+
py::module asyncio = py::module_::import("asyncio");
914+
async_event_loop_ = asyncio.attr("new_event_loop")();
915+
asyncio.attr("set_event_loop")(async_event_loop_);
916+
py::object py_thread =
917+
py::module_::import("threading")
918+
.attr("Thread")(
919+
"target"_a = async_event_loop_.attr("run_forever"),
920+
"daemon"_a = true);
921+
py_thread.attr("start")();
922+
}
923+
return async_event_loop_;
924+
}
925+
926+
void
927+
Stub::RunCoroutine(py::object coroutine)
928+
{
929+
py::object loop = GetAsyncEventLoop();
930+
py::object py_future = py::module_::import("asyncio").attr(
931+
"run_coroutine_threadsafe")(coroutine, loop);
932+
py_future.attr("add_done_callback")(
933+
py::module_::import("c_python_backend_utils")
934+
.attr("async_event_future_done_callback"));
935+
}
936+
873937
void
874938
Stub::UpdateHealth()
875939
{
@@ -881,6 +945,10 @@ void
881945
Stub::Finalize()
882946
{
883947
finalizing_ = true;
948+
// Stop async event loop if created.
949+
if (!py::isinstance<py::none>(async_event_loop_)) {
950+
async_event_loop_.attr("stop")();
951+
}
884952
// Call finalize if exists.
885953
if (initialized_ && py::hasattr(model_instance_, "finalize")) {
886954
try {
@@ -943,6 +1011,7 @@ Stub::~Stub()
9431011

9441012
{
9451013
py::gil_scoped_acquire acquire;
1014+
async_event_loop_ = py::none();
9461015
model_instance_ = py::none();
9471016
}
9481017
stub_instance_.reset();
@@ -1729,11 +1798,6 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
17291798
[](std::shared_ptr<InferRequest>& infer_request,
17301799
const bool decoupled) {
17311800
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
1732-
if (stub->IsDecoupled()) {
1733-
throw PythonBackendException(
1734-
"Async BLS request execution is not support in the decoupled "
1735-
"API.");
1736-
}
17371801
py::object loop =
17381802
py::module_::import("asyncio").attr("get_running_loop")();
17391803
py::cpp_function callback = [&stub, infer_request, decoupled]() {
@@ -1860,6 +1924,12 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
18601924
"is_model_ready", &IsModelReady, py::arg("model_name").none(false),
18611925
py::arg("model_version").none(false) = "");
18621926

1927+
// This function is not part of the public API for Python backend. This is
1928+
// only used for internal callbacks.
1929+
module.def(
1930+
"async_event_future_done_callback", &AsyncEventFutureDoneCallback,
1931+
py::arg("py_future").none(false));
1932+
18631933
// This class is not part of the public API for Python backend. This is only
18641934
// used for internal testing purposes.
18651935
py::class_<SharedMemoryManager>(module, "SharedMemory")

src/pb_stub.h

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
// Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
1+
// Copyright 2021-2024, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
22
//
33
// Redistribution and use in source and binary forms, with or without
44
// modification, are permitted provided that the following conditions
@@ -255,6 +255,10 @@ class Stub {
255255

256256
void ProcessRequestsDecoupled(RequestBatch* request_batch_shm_ptr);
257257

258+
py::object GetAsyncEventLoop();
259+
260+
void RunCoroutine(py::object coroutine);
261+
258262
/// Get the memory manager message queue
259263
std::unique_ptr<MessageQueue<uint64_t>>& MemoryManagerQueue();
260264

@@ -363,6 +367,7 @@ class Stub {
363367
py::object model_instance_;
364368
py::object deserialize_bytes_;
365369
py::object serialize_bytes_;
370+
py::object async_event_loop_;
366371
std::unique_ptr<MessageQueue<bi::managed_external_buffer::handle_t>>
367372
stub_message_queue_;
368373
std::unique_ptr<MessageQueue<bi::managed_external_buffer::handle_t>>

src/python_be.cc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,6 +768,7 @@ ModelInstanceState::ExecuteBLSRequest(
768768
if (is_decoupled && (infer_response->Id() != nullptr)) {
769769
// Need to manage the lifetime of InferPayload object for bls
770770
// decoupled responses.
771+
std::lock_guard<std::mutex> lock(infer_payload_mu_);
771772
infer_payload_[reinterpret_cast<intptr_t>(infer_payload.get())] =
772773
infer_payload;
773774
}
@@ -961,6 +962,7 @@ ModelInstanceState::ProcessCleanupRequest(
961962
intptr_t id = reinterpret_cast<intptr_t>(cleanup_message_ptr->id);
962963
if (message->Command() == PYTHONSTUB_BLSDecoupledInferPayloadCleanup) {
963964
// Remove the InferPayload object from the map.
965+
std::lock_guard<std::mutex> lock(infer_payload_mu_);
964966
infer_payload_.erase(id);
965967
} else if (message->Command() == PYTHONSTUB_DecoupledResponseFactoryCleanup) {
966968
// Delete response factory

src/python_be.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ class ModelInstanceState : public BackendModelInstance {
296296
std::vector<std::future<void>> futures_;
297297
std::unique_ptr<boost::asio::thread_pool> thread_pool_;
298298
std::unordered_map<intptr_t, std::shared_ptr<InferPayload>> infer_payload_;
299+
std::mutex infer_payload_mu_;
299300
std::unique_ptr<RequestExecutor> request_executor_;
300301

301302
public:

0 commit comments

Comments
 (0)