Skip to content

Commit 080f956

Browse files
SergeyRyabininSergey Ryabinin
authored andcommitted
Avoid locking on submitted tasks to multi curl handle by keeping easy handle sdk wrappers in a container
1 parent 90a2ee5 commit 080f956

File tree

6 files changed

+766
-512
lines changed

6 files changed

+766
-512
lines changed
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0.
4+
*/
5+
6+
#pragma once
7+
8+
#include <aws/core/utils/DateTime.h>
9+
#include <aws/core/utils/ResourceManager.h>
10+
#include <aws/core/http/Version.h>
11+
12+
#include <utility>
13+
#include <curl/curl.h>
14+
15+
namespace Aws
16+
{
17+
namespace Utils
18+
{
19+
namespace RateLimits
20+
{
21+
class RateLimiterInterface;
22+
}
23+
}
24+
25+
namespace Http
26+
{
27+
class CurlMultiHttpClient;
28+
class HttpRequest;
29+
class HttpResponse;
30+
31+
namespace Curl
32+
{
33+
enum class ExecutionPolicy
34+
{
35+
BLOCKING,
36+
ASYNC
37+
};
38+
39+
/* A composition wrapper on curl easy handle to have curl handle and SDK context together*/
40+
struct CurlEasyHandleContext final
41+
{
42+
struct WriteContext final
43+
{
44+
WriteContext() = default;
45+
46+
bool m_HasBody = false;
47+
HttpRequest *m_request = nullptr;
48+
std::shared_ptr<HttpResponse> m_response;
49+
Aws::Utils::RateLimits::RateLimiterInterface *m_rateLimiter = nullptr;
50+
int64_t m_numBytesResponseReceived = 0;
51+
};
52+
53+
struct ReadContext final
54+
{
55+
ReadContext() = default;
56+
57+
Aws::Utils::RateLimits::RateLimiterInterface *m_rateLimiter = nullptr;
58+
HttpRequest *m_request = nullptr;
59+
bool m_chunkEnd = false;
60+
};
61+
62+
CurlEasyHandleContext() = default;
63+
64+
/* Curl side */
65+
CURL* m_curlEasyHandle = nullptr;
66+
// headers set on curl handle, to be cleaned up by SDK after the execution
67+
curl_slist* m_curlHandleHeaders = nullptr;
68+
69+
/* SDK side */
70+
const CurlMultiHttpClient* m_client = nullptr;
71+
ExecutionPolicy m_execPolicy;
72+
std::function<void()> m_onCurlDoneFn;
73+
Aws::Utils::DateTime startTransmissionTime;
74+
75+
/* Curl calls the SDK back */
76+
WriteContext writeContext;
77+
ReadContext readContext;
78+
79+
/* SDK polls the curl result */
80+
CURLcode curlResult;
81+
// ptr acquired by curl_multi_info_read, free-ed by curl in easy handle cleanup, do not free in SDK
82+
CURLMsg* curlResultMsg = nullptr;
83+
84+
/* callbacks set on easy handle */
85+
static size_t WriteData(char* ptr, size_t size, size_t nmemb, void* userdata);
86+
static size_t WriteHeader(char* ptr, size_t size, size_t nmemb, void* userdata);
87+
static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, bool isStreaming);
88+
static size_t ReadBodyStreaming(char* ptr, size_t size, size_t nmemb, void* userdata);
89+
static size_t ReadBodyFunc(char* ptr, size_t size, size_t nmemb, void* userdata);
90+
static size_t SeekBody(void* userdata, curl_off_t offset, int origin);
91+
#if LIBCURL_VERSION_NUM >= 0x072000 // 7.32.0
92+
static int CurlProgressCallback(void *userdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t);
93+
#else
94+
static int CurlProgressCallback(void *userdata, double, double, double, double);
95+
#endif
96+
};
97+
} // namespace Curl
98+
} // namespace Http
99+
} // namespace Aws

src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h

Lines changed: 68 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -13,73 +13,80 @@
1313

1414
namespace Aws
1515
{
16-
namespace Http
17-
{
18-
19-
/**
20-
* Simple Connection pool manager for Curl. It maintains connections in a thread safe manner. You
21-
* can call into acquire a handle, then put it back when finished. It is assumed that reusing an already
22-
* initialized handle is preferable (especially for synchronous clients). The pool doubles in capacity as
23-
* needed up to the maximum amount of connections.
24-
*/
25-
class CurlMultiHandleContainer
26-
{
27-
public:
28-
/**
29-
* Initializes an empty stack of CURL handles. If you are only making synchronous calls via your http client
30-
* then a small size is best. For async support, a good value would be 6 * number of Processors. *
31-
*/
32-
CurlMultiHandleContainer(unsigned maxSize = 50, long httpRequestTimeout = 0, long connectTimeout = 1000, bool tcpKeepAlive = true,
33-
unsigned long tcpKeepAliveIntervalMs = 30000, long lowSpeedTime = 3000, unsigned long lowSpeedLimit = 1,
34-
Version version = Version::HTTP_VERSION_2TLS);
35-
~CurlMultiHandleContainer();
16+
namespace Http
17+
{
18+
namespace Curl
19+
{
20+
struct CurlEasyHandleContext;
3621

37-
/**
38-
* Blocks until a curl handle from the pool is available for use.
39-
*/
40-
CURL* AcquireCurlHandle();
41-
/**
42-
* Returns a handle to the pool for reuse. It is imperative that this is called
43-
* after you are finished with the handle.
44-
*/
45-
void ReleaseCurlHandle(CURL* handle);
22+
/**
23+
* Simple Connection pool manager for Curl. It maintains connections in a thread safe manner. You
24+
* can call into acquire a handle, then put it back when finished. It is assumed that reusing an already
25+
* initialized handle is preferable (especially for synchronous clients). The pool doubles in capacity as
26+
* needed up to the maximum amount of connections.
27+
*/
28+
class CurlMultiHandleContainer
29+
{
30+
public:
31+
/**
32+
* Initializes an empty stack of CURL handles.
33+
*/
34+
CurlMultiHandleContainer(unsigned maxSize = 50,
35+
long httpRequestTimeout = 0,
36+
long connectTimeout = 1000,
37+
bool tcpKeepAlive = true,
38+
unsigned long tcpKeepAliveIntervalMs = 30000,
39+
long lowSpeedTime = 3000,
40+
unsigned long lowSpeedLimit = 1,
41+
Version version = Version::HTTP_VERSION_2TLS);
42+
~CurlMultiHandleContainer();
4643

47-
/**
48-
* When the handle has bad DNS entries, problematic live connections, we need to destroy the handle from pool.
49-
*/
50-
void DestroyCurlHandle(CURL* handle);
44+
/**
45+
* Blocks until a curl handle from the pool is available for use.
46+
*/
47+
CurlEasyHandleContext* AcquireCurlHandle();
48+
/**
49+
* Returns a handle to the pool for reuse. It is imperative that this is called
50+
* after you are finished with the handle.
51+
*/
52+
void ReleaseCurlHandle(CurlEasyHandleContext* handleCtx);
5153

52-
inline CURLM* AccessCurlMultiHandle()
53-
{
54-
return m_curlMultiHandle;
55-
}
54+
/**
55+
* When the handle has bad DNS entries, problematic live connections, we need to destroy the handle from pool.
56+
*/
57+
void DestroyCurlHandle(CurlEasyHandleContext* handleCtx);
5658

57-
private:
58-
CurlMultiHandleContainer(const CurlMultiHandleContainer&) = delete;
59-
const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&) = delete;
60-
CurlMultiHandleContainer(const CurlMultiHandleContainer&&) = delete;
61-
const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&&) = delete;
59+
inline CURLM* AccessCurlMultiHandle()
60+
{
61+
return m_curlMultiHandle;
62+
}
6263

63-
CURL* CreateCurlHandleInPool();
64-
bool CheckAndGrowPool();
65-
void SetDefaultOptionsOnHandle(CURL* handle);
66-
static long ConvertHttpVersion(Version version);
64+
private:
65+
CurlMultiHandleContainer(const CurlMultiHandleContainer&) = delete;
66+
const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&) = delete;
67+
CurlMultiHandleContainer(const CurlMultiHandleContainer&&) = delete;
68+
const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&&) = delete;
6769

68-
Aws::Utils::ExclusiveOwnershipResourceManager<CURL*> m_handleContainer;
69-
CURLM* m_curlMultiHandle = nullptr;
70+
CurlEasyHandleContext* CreateCurlHandleInPool();
71+
bool CheckAndGrowPool();
72+
void SetDefaultOptionsOnHandle(CurlEasyHandleContext& handleCtx);
73+
static long ConvertHttpVersion(Version version);
7074

71-
unsigned m_maxPoolSize;
72-
unsigned long m_httpRequestTimeout;
73-
unsigned long m_connectTimeout;
74-
bool m_enableTcpKeepAlive;
75-
unsigned long m_tcpKeepAliveIntervalMs;
76-
unsigned long m_lowSpeedTime;
77-
unsigned long m_lowSpeedLimit;
78-
unsigned m_poolSize;
79-
std::mutex m_containerLock;
80-
Version m_version;
81-
};
75+
Aws::Utils::ExclusiveOwnershipResourceManager<CurlEasyHandleContext*> m_handleContainer;
76+
CURLM* m_curlMultiHandle = nullptr;
8277

83-
} // namespace Http
84-
} // namespace Aws
78+
unsigned m_maxPoolSize;
79+
unsigned long m_httpRequestTimeout;
80+
unsigned long m_connectTimeout;
81+
bool m_enableTcpKeepAlive;
82+
unsigned long m_tcpKeepAliveIntervalMs;
83+
unsigned long m_lowSpeedTime;
84+
unsigned long m_lowSpeedLimit;
85+
unsigned m_poolSize;
86+
std::mutex m_containerLock;
87+
Version m_version;
88+
};
8589

90+
} // namespace Curl
91+
} // namespace Http
92+
} // namespace Aws

src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,10 @@ namespace Aws
2020
{
2121
namespace Http
2222
{
23+
namespace Curl
24+
{
25+
struct CurlEasyHandleContext;
26+
}
2327
namespace Standard
2428
{
2529
class StandardHttpResponse;
@@ -60,8 +64,6 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient
6064
bool disableExpectHeader = false;
6165
bool allowRedirects = false;
6266
};
63-
struct CurlEasyHandleContext;
64-
6567
using Base = HttpClient;
6668

6769
//Creates client, initializes curl handle if it hasn't been created already.
@@ -84,9 +86,9 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient
8486
virtual void OverrideOptionsOnConnectionHandle(CURL*) const {}
8587

8688
private:
87-
void SubmitTask(std::shared_ptr<CurlEasyHandleContext> pEasyHandleCtx) const;
89+
void SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const;
8890

89-
static std::shared_ptr<HttpResponse> HandleCurlResponse(std::shared_ptr<CurlEasyHandleContext> pEasyHandleCtx);
91+
static std::shared_ptr<HttpResponse> HandleCurlResponse(Curl::CurlEasyHandleContext* pEasyHandleCtx);
9092
static void CurlMultiPerformThread(CurlMultiHttpClient* pClient);
9193

9294
std::thread m_multiHandleThread;
@@ -97,11 +99,11 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient
9799
// mutable std::mutex m_tasksMutex;
98100
mutable std::atomic<size_t> m_tasksQueued;
99101
mutable std::mutex m_tasksMutex;
100-
mutable Aws::UnorderedMap<CURL*, std::shared_ptr<CurlEasyHandleContext>> m_tasks;
102+
mutable Aws::UnorderedMap<CURL*, std::shared_ptr<Curl::CurlEasyHandleContext>> m_tasks;
101103

102104
CurlMultiHttpClientConfig m_config;
103105

104-
mutable CurlMultiHandleContainer m_curlMultiHandleContainer;
106+
mutable Curl::CurlMultiHandleContainer m_curlMultiHandleContainer;
105107

106108
static std::atomic<bool> isGlobalStateInit;
107109
std::shared_ptr<smithy::components::tracing::TelemetryProvider> m_telemetryProvider;

0 commit comments

Comments
 (0)