Skip to content

Commit c5f304d

Browse files
authored
Fix segfault for decoupled models (triton-inference-server#327)
* Set release flags and clean up response factory map before returning error * Address comments * Move the cleanup function to the outside scope * Delete response factory when response sender goes out of scope
1 parent 8b0fa4c commit c5f304d

File tree

10 files changed

+47
-85
lines changed

10 files changed

+47
-85
lines changed

src/infer_request.cc

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -405,20 +405,6 @@ InferRequest::InferRequest(
405405
#endif
406406
}
407407

408-
#ifndef TRITON_PB_STUB
409-
TRITONSERVER_Error*
410-
InferRequest::DeleteResponseFactory()
411-
{
412-
TRITONBACKEND_ResponseFactory* response_factory =
413-
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
414-
response_factory_address_);
415-
TRITONSERVER_Error* error =
416-
TRITONBACKEND_ResponseFactoryDelete(response_factory);
417-
418-
return error;
419-
}
420-
#endif
421-
422408
#ifdef TRITON_PB_STUB
423409
bool
424410
InferRequest::IsCancelled()

src/infer_request.h

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -137,10 +137,6 @@ class InferRequest {
137137
intptr_t RequestAddress();
138138
~InferRequest() {}
139139

140-
#ifndef TRITON_PB_STUB
141-
TRITONSERVER_Error* DeleteResponseFactory();
142-
#endif
143-
144140
private:
145141
InferRequest(
146142
AllocatedSharedMemory<char>& infer_request_shm,

src/ipc_message.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ typedef enum PYTHONSTUB_commandtype_enum {
5454
PYTHONSTUB_AutoCompleteRequest,
5555
PYTHONSTUB_AutoCompleteResponse,
5656
PYTHONSTUB_LogRequest,
57-
PYTHONSTUB_CleanupRequest,
57+
PYTHONSTUB_BLSDecoupledInferPayloadCleanup,
58+
PYTHONSTUB_BLSDecoupledResponseFactoryCleanup,
5859
PYTHONSTUB_MetricFamilyRequestNew,
5960
PYTHONSTUB_MetricFamilyRequestDelete,
6061
PYTHONSTUB_MetricRequestNew,

src/pb_response_iterator.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -133,7 +133,7 @@ void
133133
ResponseIterator::Clear()
134134
{
135135
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
136-
stub->EnqueueCleanupId(id_);
136+
stub->EnqueueCleanupId(id_, PYTHONSTUB_BLSDecoupledInferPayloadCleanup);
137137
{
138138
std::lock_guard<std::mutex> lock{mu_};
139139
response_buffer_.push(DUMMY_MESSAGE);

src/pb_stub.cc

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -993,8 +993,12 @@ Stub::ServiceStubToParentRequests()
993993
stub_to_parent_buffer_.pop();
994994
if (utils_msg_payload->command_type == PYTHONSTUB_LogRequest) {
995995
SendLogMessage(utils_msg_payload);
996-
} else if (utils_msg_payload->command_type == PYTHONSTUB_CleanupRequest) {
997-
SendCleanupId(utils_msg_payload);
996+
} else if (
997+
(utils_msg_payload->command_type ==
998+
PYTHONSTUB_BLSDecoupledInferPayloadCleanup) ||
999+
(utils_msg_payload->command_type ==
1000+
PYTHONSTUB_BLSDecoupledResponseFactoryCleanup)) {
1001+
SendCleanupId(utils_msg_payload, utils_msg_payload->command_type);
9981002
} else if (
9991003
utils_msg_payload->command_type == PYTHONSTUB_IsRequestCancelled) {
10001004
SendIsCancelled(utils_msg_payload);
@@ -1040,7 +1044,9 @@ Stub::SendLogMessage(std::unique_ptr<UtilsMessagePayload>& utils_msg_payload)
10401044
}
10411045

10421046
void
1043-
Stub::SendCleanupId(std::unique_ptr<UtilsMessagePayload>& utils_msg_payload)
1047+
Stub::SendCleanupId(
1048+
std::unique_ptr<UtilsMessagePayload>& utils_msg_payload,
1049+
const PYTHONSTUB_CommandType& command_type)
10441050
{
10451051
void* id = utils_msg_payload->utils_message_ptr;
10461052
{
@@ -1050,7 +1056,7 @@ Stub::SendCleanupId(std::unique_ptr<UtilsMessagePayload>& utils_msg_payload)
10501056

10511057
std::unique_ptr<IPCMessage> ipc_message =
10521058
IPCMessage::Create(shm_pool_, true /* inline_response */);
1053-
ipc_message->Command() = PYTHONSTUB_CleanupRequest;
1059+
ipc_message->Command() = command_type;
10541060
AllocatedSharedMemory<char> cleanup_request_message =
10551061
shm_pool_->Construct<char>(
10561062
sizeof(CleanupMessage) +
@@ -1072,11 +1078,11 @@ Stub::SendCleanupId(std::unique_ptr<UtilsMessagePayload>& utils_msg_payload)
10721078
}
10731079

10741080
void
1075-
Stub::EnqueueCleanupId(void* id)
1081+
Stub::EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type)
10761082
{
10771083
if (id != nullptr) {
10781084
std::unique_ptr<UtilsMessagePayload> utils_msg_payload =
1079-
std::make_unique<UtilsMessagePayload>(PYTHONSTUB_CleanupRequest, id);
1085+
std::make_unique<UtilsMessagePayload>(command_type, id);
10801086
EnqueueUtilsMessage(std::move(utils_msg_payload));
10811087
}
10821088
}

src/pb_stub.h

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -315,10 +315,13 @@ class Stub {
315315
std::shared_ptr<InferResponse> infer_response);
316316

317317
/// Send the id to the python backend for object cleanup
318-
void SendCleanupId(std::unique_ptr<UtilsMessagePayload>& utils_msg_payload);
318+
void SendCleanupId(
319+
std::unique_ptr<UtilsMessagePayload>& utils_msg_payload,
320+
const PYTHONSTUB_CommandType& command_type);
319321

320-
/// Add cleanup id to queue
321-
void EnqueueCleanupId(void* id);
322+
/// Add cleanup id to queue. This is used for cleaning up the infer_payload
323+
/// and the response factory for BLS decoupled response.
324+
void EnqueueCleanupId(void* id, const PYTHONSTUB_CommandType& command_type);
322325

323326
/// Add request cancellation query to queue
324327
void EnqueueIsCancelled(PbCancel* pb_cancel);

src/python_be.cc

Lines changed: 16 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -379,21 +379,7 @@ ModelInstanceState::SaveRequestsToSharedMemory(
379379
std::unique_ptr<InferRequest> infer_request;
380380
if (model_state->IsDecoupled()) {
381381
TRITONBACKEND_ResponseFactory* factory_ptr;
382-
// Reuse the response factory if there is already a response factory
383-
// associated with the request
384-
std::lock_guard<std::mutex> guard{response_factory_map_mutex_};
385-
{
386-
if (response_factory_map_.find(reinterpret_cast<intptr_t>(request)) !=
387-
response_factory_map_.end()) {
388-
factory_ptr =
389-
response_factory_map_[reinterpret_cast<intptr_t>(request)];
390-
} else {
391-
RETURN_IF_ERROR(
392-
TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request));
393-
response_factory_map_[reinterpret_cast<intptr_t>(request)] =
394-
factory_ptr;
395-
}
396-
}
382+
RETURN_IF_ERROR(TRITONBACKEND_ResponseFactoryNew(&factory_ptr, request));
397383

398384
infer_request = std::make_unique<InferRequest>(
399385
id, correlation_id, pb_input_tensors, requested_output_names,
@@ -843,7 +829,8 @@ ModelInstanceState::StubToParentMQMonitor()
843829
ProcessLogRequest(message);
844830
break;
845831
}
846-
case PYTHONSTUB_CleanupRequest: {
832+
case PYTHONSTUB_BLSDecoupledInferPayloadCleanup:
833+
case PYTHONSTUB_BLSDecoupledResponseFactoryCleanup: {
847834
ProcessBLSCleanupRequest(message);
848835
break;
849836
}
@@ -941,9 +928,17 @@ ModelInstanceState::ProcessBLSCleanupRequest(
941928
Stub()->ShmPool()->Load<char>(message->Args());
942929
CleanupMessage* cleanup_message_ptr =
943930
reinterpret_cast<CleanupMessage*>(cleanup_request_message.data_.get());
944-
945-
void* id = cleanup_message_ptr->id;
946-
infer_payload_.erase(reinterpret_cast<intptr_t>(id));
931+
intptr_t id = reinterpret_cast<intptr_t>(cleanup_message_ptr->id);
932+
if (message->Command() == PYTHONSTUB_BLSDecoupledInferPayloadCleanup) {
933+
// Remove the InferPayload object from the map.
934+
infer_payload_.erase(id);
935+
} else if (
936+
message->Command() == PYTHONSTUB_BLSDecoupledResponseFactoryCleanup) {
937+
// Delete response factory
938+
std::unique_ptr<
939+
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
940+
response_factory(reinterpret_cast<TRITONBACKEND_ResponseFactory*>(id));
941+
}
947942

948943
{
949944
bi::scoped_lock<bi::interprocess_mutex> lock{*(message->ResponseMutex())};
@@ -1172,12 +1167,6 @@ ModelInstanceState::ResponseSendDecoupled(
11721167
std::lock_guard<std::mutex> guard{closed_requests_mutex_};
11731168
closed_requests_.push_back(send_message_payload->request_address);
11741169
}
1175-
1176-
// Clean up the response factory map.
1177-
{
1178-
std::lock_guard<std::mutex> guard{response_factory_map_mutex_};
1179-
response_factory_map_.erase(send_message_payload->request_address);
1180-
}
11811170
}
11821171

11831172
if (send_message_payload->response != 0) {
@@ -1195,14 +1184,7 @@ ModelInstanceState::ResponseSendDecoupled(
11951184
error_message);
11961185

11971186
std::vector<std::pair<std::unique_ptr<PbMemory>, void*>> gpu_output_buffers;
1198-
std::unique_ptr<
1199-
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
1200-
response_factory_ptr;
12011187
GPUBuffersHelper gpu_buffer_helper;
1202-
if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
1203-
response_factory_ptr.reset(
1204-
reinterpret_cast<TRITONBACKEND_ResponseFactory*>(response_factory));
1205-
}
12061188

12071189
#ifdef TRITON_ENABLE_GPU
12081190
for (auto& output_tensor : infer_response->OutputTensors()) {
@@ -1289,13 +1271,6 @@ ModelInstanceState::ResponseSendDecoupled(
12891271
response_factory, send_message_payload->flags);
12901272
SetErrorForResponseSendMessage(
12911273
send_message_payload, WrapTritonErrorInSharedPtr(error), error_message);
1292-
1293-
if (send_message_payload->flags == TRITONSERVER_RESPONSE_COMPLETE_FINAL) {
1294-
std::unique_ptr<
1295-
TRITONBACKEND_ResponseFactory, backend::ResponseFactoryDeleter>
1296-
response_factory(reinterpret_cast<TRITONBACKEND_ResponseFactory*>(
1297-
send_message_payload->response_factory_address));
1298-
}
12991274
}
13001275
}
13011276

@@ -1368,11 +1343,6 @@ ModelInstanceState::ProcessRequestsDecoupled(
13681343
TRITONSERVER_ERROR_INTERNAL, error->String().c_str());
13691344
}
13701345

1371-
// Reset the release flags for all the requests.
1372-
for (auto& infer_request : pb_infer_requests) {
1373-
infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL);
1374-
}
1375-
13761346
return TRITONSERVER_ErrorNew(
13771347
TRITONSERVER_ERROR_INTERNAL, "Failed to process the requests.");
13781348
}
@@ -2499,15 +2469,9 @@ TRITONBACKEND_ModelInstanceExecute(
24992469
}
25002470
}
25012471

2502-
// We should only delete the response factory for the requests that have
2503-
// not been closed.
25042472
for (auto& infer_request : infer_requests) {
2505-
if (!instance_state->ExistsInClosedRequests(
2506-
infer_request->RequestAddress())) {
2507-
LOG_IF_ERROR(
2508-
infer_request->DeleteResponseFactory(),
2509-
"Failed to delete the response factory.");
2510-
}
2473+
// Reset the release flags for all the requests.
2474+
infer_request->SetReleaseFlags(TRITONSERVER_REQUEST_RELEASE_ALL);
25112475
}
25122476
}
25132477
}

src/python_be.h

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -288,9 +288,6 @@ class ModelInstanceState : public BackendModelInstance {
288288
std::unique_ptr<boost::asio::thread_pool> thread_pool_;
289289
std::unordered_map<intptr_t, std::shared_ptr<InferPayload>> infer_payload_;
290290
std::unique_ptr<RequestExecutor> request_executor_;
291-
std::mutex response_factory_map_mutex_;
292-
std::unordered_map<intptr_t, TRITONBACKEND_ResponseFactory*>
293-
response_factory_map_;
294291

295292
public:
296293
static TRITONSERVER_Error* Create(
@@ -403,7 +400,8 @@ class ModelInstanceState : public BackendModelInstance {
403400
std::unique_ptr<InferResponse>* infer_response,
404401
bi::managed_external_buffer::handle_t* response_handle);
405402

406-
// Process the bls decoupled cleanup request
403+
// Process the bls decoupled cleanup request for InferPayload and
404+
// ResponseFactory
407405
void ProcessBLSCleanupRequest(const std::unique_ptr<IPCMessage>& message);
408406

409407
// Process request cancellation query

src/response_sender.cc

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,13 @@ ResponseSender::ResponseSender(
4545
{
4646
}
4747

48+
ResponseSender::~ResponseSender()
49+
{
50+
std::unique_ptr<Stub>& stub = Stub::GetOrCreateInstance();
51+
stub->EnqueueCleanupId(
52+
reinterpret_cast<void*>(response_factory_address_),
53+
PYTHONSTUB_BLSDecoupledResponseFactoryCleanup);
54+
}
4855

4956
void
5057
ResponseSender::Send(

src/response_sender.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ class ResponseSender {
3838
intptr_t request_address, intptr_t response_factory_address,
3939
std::unique_ptr<SharedMemoryManager>& shm_pool,
4040
const std::shared_ptr<PbCancel>& pb_cancel);
41+
~ResponseSender();
4142
void Send(std::shared_ptr<InferResponse> response, const uint32_t flags);
4243
bool IsCancelled();
4344

0 commit comments

Comments
 (0)