@@ -918,8 +918,11 @@ Stub::ServiceStubToParentRequests()
918918 break ;
919919 } else {
920920 bls_response_cleanup_buffer_.pop ();
921+ {
922+ std::lock_guard<std::mutex> lock (response_iterator_map_mu_);
923+ response_iterator_map_.erase (id);
924+ }
921925 SendCleanupId (id);
922- response_iterator_map_.erase (id);
923926 }
924927 }
925928 }
@@ -1093,7 +1096,11 @@ Stub::ParentToStubMQMonitor()
10931096 response_iterator_map_[infer_response->Id ()]->EnqueueResponse (
10941097 std::move (infer_response));
10951098 } else {
1096- LOG_INFO << " Failed to enqueue the response to its response iterator." ;
1099+ auto response_iterator =
1100+ std::make_shared<ResponseIterator>(std::move (infer_response));
1101+ response_iterator_map_.insert (
1102+ std::pair<void *, std::shared_ptr<ResponseIterator>>(
1103+ response_iterator->Id (), response_iterator));
10971104 }
10981105 }
10991106
@@ -1115,13 +1122,31 @@ Stub::ParentToStubServiceActive()
11151122 return parent_to_stub_thread_;
11161123}
11171124
1118- void
1119- Stub::SaveResponseIterator (std::shared_ptr<ResponseIterator> response_iterator )
1125+ std::shared_ptr<ResponseIterator>
1126+ Stub::GetResponseIterator (std::shared_ptr<InferResponse> infer_response )
11201127{
11211128 std::lock_guard<std::mutex> lock (response_iterator_map_mu_);
1122- response_iterator_map_.insert (
1123- std::pair<void *, std::shared_ptr<ResponseIterator>>(
1124- response_iterator->Id (), response_iterator));
1129+ if (response_iterator_map_.find (infer_response->Id ()) !=
1130+ response_iterator_map_.end ()) {
1131+ // Need to re-construct the 'ResponseIterator' and update the
1132+ // 'response_iterator_map_' to make sure the 'ResponseIterator' object has
1133+ // the correct first response.
1134+ auto response_iterator = std::make_shared<ResponseIterator>(infer_response);
1135+ std::vector<std::shared_ptr<InferResponse>> existing_responses =
1136+ response_iterator_map_[infer_response->Id ()]->GetExistingResponses ();
1137+ for (auto & response : existing_responses) {
1138+ response_iterator->EnqueueResponse (response);
1139+ }
1140+
1141+ response_iterator_map_[infer_response->Id ()] = response_iterator;
1142+ } else {
1143+ auto response_iterator = std::make_shared<ResponseIterator>(infer_response);
1144+ response_iterator_map_.insert (
1145+ std::pair<void *, std::shared_ptr<ResponseIterator>>(
1146+ response_iterator->Id (), response_iterator));
1147+ }
1148+
1149+ return response_iterator_map_[infer_response->Id ()];
11251150}
11261151
11271152bool
@@ -1304,12 +1329,8 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
13041329 infer_request->Exec (decoupled);
13051330 py::object response_object;
13061331 if (decoupled) {
1307- auto response_iterator =
1308- std::make_shared<ResponseIterator>(response);
1332+ auto response_iterator = stub->GetResponseIterator (response);
13091333 response_object = py::cast (response_iterator);
1310- if (response_iterator->Id () != nullptr ) {
1311- stub->SaveResponseIterator (response_iterator);
1312- }
13131334 } else {
13141335 response_object = py::cast (response);
13151336 }
@@ -1334,12 +1355,8 @@ PYBIND11_EMBEDDED_MODULE(c_python_backend_utils, module)
13341355 infer_request->Exec (decoupled);
13351356 py::object response_object;
13361357 if (decoupled) {
1337- auto response_iterator =
1338- std::make_shared<ResponseIterator>(response);
1358+ auto response_iterator = stub->GetResponseIterator (response);
13391359 response_object = py::cast (response_iterator);
1340- if (response_iterator->Id () != nullptr ) {
1341- stub->SaveResponseIterator (response_iterator);
1342- }
13431360 } else {
13441361 response_object = py::cast (response);
13451362 }
0 commit comments