Skip to content

Commit 4798c5f

Browse files
SergeyRyabininSergey Ryabinin
authored andcommitted
Add queueing to curl multi
1 parent 09d9cc3 commit 4798c5f

File tree

6 files changed

+198
-15
lines changed

6 files changed

+198
-15
lines changed

src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,12 @@ namespace Aws
4141
Version version = Version::HTTP_VERSION_2TLS);
4242
~CurlMultiHandleContainer();
4343

44+
45+
/**
46+
* Gets an available curl handle from the pool, or returns nullptr if no handle is available
47+
*/
48+
CurlEasyHandleContext* TryAcquireCurlHandle();
49+
4450
/**
4551
* Blocks until a curl handle from the pool is available for use.
4652
*/
@@ -56,6 +62,11 @@ namespace Aws
5662
*/
5763
void DestroyCurlHandle(CurlEasyHandleContext* handleCtx);
5864

65+
/**
66+
* Resets handle to an initial state and returns it; or Destroys and creates a new handle if the curl code is bad.
67+
*/
68+
CurlEasyHandleContext* ResetCurlHandle(CurlEasyHandleContext* handleCtx, const CURLcode code);
69+
5970
inline CURLM* AccessCurlMultiHandle()
6071
{
6172
return m_curlMultiHandle;
@@ -67,7 +78,7 @@ namespace Aws
6778
CurlMultiHandleContainer(const CurlMultiHandleContainer&&) = delete;
6879
const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&&) = delete;
6980

70-
CurlEasyHandleContext* CreateCurlHandleInPool();
81+
CurlEasyHandleContext* CreateCurlHandleInPool(bool release = true);
7182
bool CheckAndGrowPool();
7283
void SetDefaultOptionsOnHandle(CurlEasyHandleContext& handleCtx);
7384
static long ConvertHttpVersion(Version version);

src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -103,6 +103,26 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient
103103
virtual void OverrideOptionsOnConnectionHandle(CURL*) const {}
104104

105105
private:
106+
struct CurlMultiHttpClientTask
107+
{
108+
std::shared_ptr<HttpRequest> request;
109+
std::shared_ptr<Aws::Utils::Threading::Executor> pExecutor;
110+
HttpClient::HttpAsyncOnDoneHandler onDoneHandler;
111+
Aws::Utils::RateLimits::RateLimiterInterface* readLimiter = nullptr;
112+
Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter = nullptr;
113+
};
114+
115+
mutable Aws::Queue<Aws::UniquePtr<CurlMultiHttpClientTask>> m_MultiCurlTasks;
116+
mutable std::mutex m_MultiCurlTasksLock;
117+
118+
/* Actually submits a request for processing */
119+
virtual void SubmitAsyncRequest(Curl::CurlEasyHandleContext* easyHandleContext,
120+
const std::shared_ptr<HttpRequest>& request,
121+
std::shared_ptr<Aws::Utils::Threading::Executor> pExecutor,
122+
HttpClient::HttpAsyncOnDoneHandler onDoneHandler,
123+
Aws::Utils::RateLimits::RateLimiterInterface* readLimiter,
124+
Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter) const;
125+
106126
bool SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const;
107127

108128
static std::shared_ptr<HttpResponse> HandleCurlResponse(Curl::CurlEasyHandleContext* pEasyHandleCtx);

src/aws-cpp-sdk-core/include/aws/core/utils/ResourceManager.h

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,31 @@ namespace Aws
2828
public:
2929
ExclusiveOwnershipResourceManager() : m_shutdown(false) {}
3030

31+
/**
32+
* Returns a resource with exclusive ownership. You must call Release on the resource when you are finished or other
33+
* threads will block waiting to acquire it.
34+
*
35+
* @return instance of RESOURCE_TYPE
36+
*/
37+
bool TryAcquire(RESOURCE_TYPE& result)
38+
{
39+
std::unique_lock<std::mutex> locker(m_queueLock);
40+
if (m_shutdown.load() || m_resources.size() == 0)
41+
{
42+
result = {};
43+
return false;
44+
}
45+
else
46+
{
47+
assert(!m_shutdown.load());
48+
49+
result = std::move(m_resources.back());
50+
m_resources.pop_back();
51+
52+
return true;
53+
}
54+
}
55+
3156
/**
3257
* Returns a resource with exclusive ownership. You must call Release on the resource when you are finished or other
3358
* threads will block waiting to acquire it.

src/aws-cpp-sdk-core/source/client/AWSClient.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -797,7 +797,7 @@ void AWSClient::HandleExhaustiveAsyncReply(std::shared_ptr<Aws::Client::AwsClien
797797
}
798798
}
799799
return false;
800-
} (); // <- IIFE (immideately invoked lambda)
800+
} (); // <- IIFE (immediately invoked lambda)
801801

802802
long sleepMillis = TracingUtils::MakeCallWithTiming<long>(
803803
[&]() -> long {

src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp

Lines changed: 62 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,27 @@ CurlMultiHandleContainer::~CurlMultiHandleContainer()
6464
}
6565
}
6666

67+
CurlEasyHandleContext* CurlMultiHandleContainer::TryAcquireCurlHandle()
68+
{
69+
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Attempting to acquire curl easy handle.");
70+
71+
if(!m_handleContainer.HasResourcesAvailable())
72+
{
73+
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "No current connections available in pool. Attempting to create new connections.");
74+
CheckAndGrowPool();
75+
}
76+
77+
CurlEasyHandleContext* handle = nullptr;
78+
if(m_handleContainer.TryAcquire(handle))
79+
{
80+
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Returning connection handle " << handle);
81+
} else {
82+
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "No connection handle available");
83+
}
84+
85+
return handle;
86+
}
87+
6788
CurlEasyHandleContext* CurlMultiHandleContainer::AcquireCurlHandle()
6889
{
6990
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Attempting to acquire curl easy handle.");
@@ -121,8 +142,45 @@ void CurlMultiHandleContainer::DestroyCurlHandle(CurlEasyHandleContext* handleCt
121142
}
122143
}
123144

145+
CurlEasyHandleContext* CurlMultiHandleContainer::ResetCurlHandle(CurlEasyHandleContext* handleCtx, const CURLcode code)
146+
{
147+
if (code != CURLE_OK)
148+
{
149+
if(handleCtx && handleCtx->m_curlEasyHandle)
150+
{
151+
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Destroy curl handle: " << handleCtx->m_curlEasyHandle);
152+
curl_easy_cleanup(handleCtx->m_curlEasyHandle);
153+
handleCtx->m_curlEasyHandle = nullptr;
154+
Aws::Delete(handleCtx);
155+
handleCtx = nullptr;
156+
}
157+
158+
{
159+
std::lock_guard<std::mutex> locker(m_containerLock);
160+
handleCtx = CreateCurlHandleInPool(false);
161+
}
162+
if (handleCtx)
163+
{
164+
AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Created replacement handle: " << handleCtx->m_curlEasyHandle);
165+
}
166+
}
167+
else
168+
{
169+
CURL* handle = handleCtx ? handleCtx->m_curlEasyHandle : nullptr;
170+
if (handle)
171+
{
172+
#if LIBCURL_VERSION_NUM >= 0x074D00 // 7.77.0
173+
curl_easy_setopt(handle, CURLOPT_COOKIEFILE, NULL); // workaround a mem leak on curl
174+
#endif
175+
curl_easy_reset(handle);
176+
SetDefaultOptionsOnHandle(*handleCtx);
177+
curl_easy_setopt(handle, CURLOPT_PRIVATE, handleCtx);
178+
}
179+
}
180+
return handleCtx;
181+
}
124182

125-
CurlEasyHandleContext* CurlMultiHandleContainer::CreateCurlHandleInPool()
183+
CurlEasyHandleContext* CurlMultiHandleContainer::CreateCurlHandleInPool(bool release /* = true */)
126184
{
127185
CurlEasyHandleContext* handleCtx = Aws::New<CurlEasyHandleContext>(CURL_HANDLE_CONTAINER_TAG);
128186
if(!handleCtx)
@@ -138,7 +196,9 @@ CurlEasyHandleContext* CurlMultiHandleContainer::CreateCurlHandleInPool()
138196
{
139197
SetDefaultOptionsOnHandle(*handleCtx);
140198
curl_easy_setopt(handleCtx->m_curlEasyHandle, CURLOPT_PRIVATE, handleCtx);
141-
m_handleContainer.Release(handleCtx);
199+
if (release) {
200+
m_handleContainer.Release(handleCtx);
201+
}
142202
}
143203
else
144204
{

src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -427,8 +427,6 @@ CurlMultiHttpClient::~CurlMultiHttpClient()
427427
m_signalRunning.notify_all();
428428
}
429429
m_multiHandleThread.join();
430-
431-
curl_multi_cleanup(m_curlMultiHandleContainer.AccessCurlMultiHandle());
432430
}
433431

434432
struct curl_slist* PrepareHeaders(const HttpRequest* request, const bool disableExpectHeader)
@@ -680,8 +678,6 @@ std::shared_ptr<HttpResponse> CurlMultiHttpClient::HandleCurlResponse(Curl::Curl
680678
AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Response body length doesn't match the content-length header.");
681679
}
682680
}
683-
684-
AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Releasing curl handle " << connectionHandle);
685681
}
686682

687683
double timep = 0.0;
@@ -748,13 +744,38 @@ std::shared_ptr<HttpResponse> CurlMultiHttpClient::HandleCurlResponse(Curl::Curl
748744
std::shared_ptr<HttpResponse> res = std::move(pEasyHandleCtx->writeContext.m_response);
749745
pEasyHandleCtx->writeContext.m_response.reset();
750746

751-
if (curlResponseCode != CURLE_OK)
747+
Aws::UniquePtr<CurlMultiHttpClientTask> newTask;
752748
{
753-
client->m_curlMultiHandleContainer.DestroyCurlHandle(pEasyHandleCtx);
749+
std::unique_lock<std::mutex> lockGuard(client->m_MultiCurlTasksLock);
750+
if(!client->m_MultiCurlTasks.empty()) {
751+
newTask = std::move(client->m_MultiCurlTasks.front());
752+
client->m_MultiCurlTasks.pop();
753+
}
754+
}
755+
756+
if(newTask)
757+
{
758+
AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Submitting a new task from the queue to the curl handle " << connectionHandle);
759+
pEasyHandleCtx = client->m_curlMultiHandleContainer.ResetCurlHandle(pEasyHandleCtx, curlResponseCode);
760+
761+
client->SubmitAsyncRequest(pEasyHandleCtx,
762+
std::move(newTask->request),
763+
std::move(newTask->pExecutor),
764+
std::move(newTask->onDoneHandler),
765+
newTask->readLimiter,
766+
newTask->writeLimiter);
754767
}
755768
else
756769
{
770+
AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Releasing curl handle " << connectionHandle);
771+
if (curlResponseCode != CURLE_OK)
772+
{
773+
client->m_curlMultiHandleContainer.DestroyCurlHandle(pEasyHandleCtx);
774+
}
775+
else
776+
{
757777
client->m_curlMultiHandleContainer.ReleaseCurlHandle(pEasyHandleCtx);
778+
}
758779
}
759780

760781
return res;
@@ -788,10 +809,57 @@ bool CurlMultiHttpClient::SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx
788809

789810
// Async
790811
void CurlMultiHttpClient::MakeAsyncRequest(const std::shared_ptr<HttpRequest>& request,
791-
std::shared_ptr<Aws::Utils::Threading::Executor> pExecutor,
792-
HttpClient::HttpAsyncOnDoneHandler onDoneHandler,
793-
Aws::Utils::RateLimits::RateLimiterInterface* readLimiter,
794-
Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter) const
812+
std::shared_ptr<Aws::Utils::Threading::Executor> pExecutor,
813+
HttpClient::HttpAsyncOnDoneHandler onDoneHandler,
814+
Aws::Utils::RateLimits::RateLimiterInterface* readLimiter,
815+
Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter) const
816+
{
817+
if(!pExecutor)
818+
{
819+
assert(pExecutor);
820+
AWS_LOGSTREAM_FATAL(CURL_HTTP_CLIENT_TAG, "Failed to submit async http request: executor is a nullptr");
821+
}
822+
823+
Curl::CurlEasyHandleContext* easyHandleContext = m_curlMultiHandleContainer.TryAcquireCurlHandle();
824+
if(easyHandleContext)
825+
{
826+
return SubmitAsyncRequest(easyHandleContext, std::move(request), std::move(pExecutor), std::move(onDoneHandler), readLimiter, writeLimiter);
827+
}
828+
else
829+
{
830+
AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "No available curl handle, queueing the request.");
831+
Aws::UniquePtr<CurlMultiHttpClientTask> task = Aws::MakeUnique<CurlMultiHttpClientTask>(CURL_HTTP_CLIENT_TAG);
832+
if(!task) {
833+
AWS_LOGSTREAM_FATAL(CURL_HTTP_CLIENT_TAG, "Failed to allocate a UniquePtr for holding enqueued http client task!");
834+
pExecutor->Submit([request, onDoneHandler]()
835+
{
836+
std::shared_ptr<HttpResponse> response = Aws::MakeShared<StandardHttpResponse>(CURL_HTTP_CLIENT_TAG, request);
837+
if(response)
838+
{
839+
response->SetClientErrorType(CoreErrors::MEMORY_ALLOCATION);
840+
response->SetClientErrorMessage("Failed to allocate a UniquePtr for holding enqueued http client task");
841+
}
842+
onDoneHandler(response);
843+
} );
844+
return;
845+
}
846+
task->request = std::move(request);
847+
task->pExecutor = std::move(pExecutor);
848+
task->onDoneHandler = std::move(onDoneHandler);
849+
task->readLimiter = readLimiter;
850+
task->writeLimiter = writeLimiter;
851+
852+
std::unique_lock<std::mutex> lockGuard(m_MultiCurlTasksLock);
853+
m_MultiCurlTasks.emplace(std::move(task));
854+
}
855+
}
856+
857+
void CurlMultiHttpClient::SubmitAsyncRequest(Curl::CurlEasyHandleContext* easyHandleContext,
858+
const std::shared_ptr<HttpRequest>& request,
859+
std::shared_ptr<Aws::Utils::Threading::Executor> pExecutor,
860+
HttpClient::HttpAsyncOnDoneHandler onDoneHandler,
861+
Aws::Utils::RateLimits::RateLimiterInterface* readLimiter,
862+
Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter) const
795863
{
796864
if(!pExecutor)
797865
{
@@ -810,7 +878,6 @@ void CurlMultiHttpClient::MakeAsyncRequest(const std::shared_ptr<HttpRequest>& r
810878
return;
811879
}
812880

813-
Curl::CurlEasyHandleContext* easyHandleContext = m_curlMultiHandleContainer.AcquireCurlHandle();
814881
if(!easyHandleContext)
815882
{
816883
response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION);

0 commit comments

Comments
 (0)