From efd8398b51eb9c84b68d0806409759ad2e95b6bc Mon Sep 17 00:00:00 2001 From: SergeyRyabinin Date: Wed, 13 Sep 2023 21:58:28 +0000 Subject: [PATCH 1/3] Use curl_multi_handle --- .../curl-multi/CurlMultiHandleContainer.h | 85 ++ .../http/curl-multi/CurlMultiHttpClient.h | 112 ++ .../include/aws/core/utils/ResourceManager.h | 2 +- .../source/http/HttpClientFactory.cpp | 3 +- .../curl-multi/CurlMultiHandleContainer.cpp | 220 ++++ .../http/curl-multi/CurlMultiHttpClient.cpp | 1128 +++++++++++++++++ 6 files changed, 1548 insertions(+), 2 deletions(-) create mode 100644 src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h create mode 100644 src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h create mode 100644 src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp create mode 100644 src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h new file mode 100644 index 00000000000..8fadf89ca9a --- /dev/null +++ b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h @@ -0,0 +1,85 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once + +#include +#include + +#include +#include + +namespace Aws +{ +namespace Http +{ + +/** + * Simple Connection pool manager for Curl. It maintains connections in a thread safe manner. You + * can call into acquire a handle, then put it back when finished. It is assumed that reusing an already + * initialized handle is preferable (especially for synchronous clients). The pool doubles in capacity as + * needed up to the maximum amount of connections. + */ +class CurlMultiHandleContainer +{ +public: + /** + * Initializes an empty stack of CURL handles. If you are only making synchronous calls via your http client + * then a small size is best. For async support, a good value would be 6 * number of Processors. * + */ + CurlMultiHandleContainer(unsigned maxSize = 50, long httpRequestTimeout = 0, long connectTimeout = 1000, bool tcpKeepAlive = true, + unsigned long tcpKeepAliveIntervalMs = 30000, long lowSpeedTime = 3000, unsigned long lowSpeedLimit = 1, + Version version = Version::HTTP_VERSION_2TLS); + ~CurlMultiHandleContainer(); + + /** + * Blocks until a curl handle from the pool is available for use. + */ + CURL* AcquireCurlHandle(); + /** + * Returns a handle to the pool for reuse. It is imperative that this is called + * after you are finished with the handle. + */ + void ReleaseCurlHandle(CURL* handle); + + /** + * When the handle has bad DNS entries, problematic live connections, we need to destroy the handle from pool. + */ + void DestroyCurlHandle(CURL* handle); + + inline CURLM* AccessCurlMultiHandle() + { + return m_curlMultiHandle; + } + +private: + CurlMultiHandleContainer(const CurlMultiHandleContainer&) = delete; + const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&) = delete; + CurlMultiHandleContainer(const CurlMultiHandleContainer&&) = delete; + const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&&) = delete; + + CURL* CreateCurlHandleInPool(); + bool CheckAndGrowPool(); + void SetDefaultOptionsOnHandle(CURL* handle); + static long ConvertHttpVersion(Version version); + + Aws::Utils::ExclusiveOwnershipResourceManager m_handleContainer; + CURLM* m_curlMultiHandle = nullptr; + + unsigned m_maxPoolSize; + unsigned long m_httpRequestTimeout; + unsigned long m_connectTimeout; + bool m_enableTcpKeepAlive; + unsigned long m_tcpKeepAliveIntervalMs; + unsigned long m_lowSpeedTime; + unsigned long m_lowSpeedLimit; + unsigned m_poolSize; + std::mutex m_containerLock; + Version m_version; +}; + +} // namespace Http +} // namespace Aws + diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h new file mode 100644 index 00000000000..28f258dfd59 --- /dev/null +++ b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h @@ -0,0 +1,112 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Aws +{ +namespace Http +{ +namespace Standard +{ + class StandardHttpResponse; +} + +//Curl implementation of an http client. Right now it is only synchronous. +class AWS_CORE_API CurlMultiHttpClient: public HttpClient +{ +public: + struct CurlMultiHttpClientConfig + { + struct ProxyConfig + { + bool isEnabled = false; + Aws::String userName; + Aws::String password; + Aws::String scheme; + Aws::String host; + Aws::String sslCertPath; + Aws::String sslCertType; + Aws::String sslKeyPath; + Aws::String sslKeyType; + Aws::String keyPasswd; + unsigned port = 0; + Aws::String nonProxyHosts; + }; + + struct SslConfig + { + bool verifySSL = true; + Aws::String caPath; + Aws::String caFile; + }; + + ProxyConfig proxyConfig; + SslConfig sslConfig; + + bool disableExpectHeader = false; + bool allowRedirects = false; + }; + struct CurlEasyHandleContext; + + using Base = HttpClient; + + //Creates client, initializes curl handle if it hasn't been created already. + CurlMultiHttpClient(const Aws::Client::ClientConfiguration& clientConfig); + virtual ~CurlMultiHttpClient(); + + //Makes request and receives response synchronously + std::shared_ptr MakeRequest(const std::shared_ptr& request, + Aws::Utils::RateLimits::RateLimiterInterface* readLimiter = nullptr, + Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter = nullptr) const override; + + static void InitGlobalState(); + static void CleanupGlobalState(); + +protected: + /** + * Override any configuration on CURL handle for each request before sending. + * The usage is to have a subclass of CurlMultiHttpClient and have your own implementation of this function to configure whatever you want on CURL handle. + */ + virtual void OverrideOptionsOnConnectionHandle(CURL*) const {} + +private: + void SubmitTask(std::shared_ptr pEasyHandleCtx) const; + + static std::shared_ptr HandleCurlResponse(std::shared_ptr pEasyHandleCtx); + static void CurlMultiPerformThread(CurlMultiHttpClient* pClient); + + std::thread m_multiHandleThread; + std::atomic m_isRunning; + mutable std::mutex m_signalMutex; + mutable std::condition_variable m_signalRunning; + + // mutable std::mutex m_tasksMutex; + mutable std::atomic m_tasksQueued; + mutable std::mutex m_tasksMutex; + mutable Aws::UnorderedMap> m_tasks; + + CurlMultiHttpClientConfig m_config; + + mutable CurlMultiHandleContainer m_curlMultiHandleContainer; + + static std::atomic isGlobalStateInit; + std::shared_ptr m_telemetryProvider; +}; + +} // namespace Http +} // namespace Aws + diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/ResourceManager.h b/src/aws-cpp-sdk-core/include/aws/core/utils/ResourceManager.h index 718c32beba6..2b754919d53 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/ResourceManager.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/ResourceManager.h @@ -15,7 +15,7 @@ namespace Aws namespace Utils { /** - * Generic resource manager with Acquire/Release semantics. Acquire will block waiting on a an available resource. Release will + * Generic resource manager with Acquire/Release semantics. Acquire will block waiting on an available resource. Release will * cause one blocked acquisition to unblock. * * You must call ShutdownAndWait() when finished with this container, this unblocks the listening thread and gives you a chance to diff --git a/src/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp b/src/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp index 615d0de0a3f..158c741ffb8 100644 --- a/src/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp +++ b/src/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp @@ -12,6 +12,7 @@ #endif #if ENABLE_CURL_CLIENT #include +#include #include #elif ENABLE_WINDOWS_CLIENT @@ -93,7 +94,7 @@ namespace Aws } #endif // ENABLE_WINDOWS_IXML_HTTP_REQUEST_2_CLIENT #elif ENABLE_CURL_CLIENT - return Aws::MakeShared(HTTP_CLIENT_FACTORY_ALLOCATION_TAG, clientConfiguration); + return Aws::MakeShared(HTTP_CLIENT_FACTORY_ALLOCATION_TAG, clientConfiguration); #else // When neither of these clients is enabled, gcc gives a warning (converted // to error by -Werror) about the unused clientConfiguration parameter. We diff --git a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp new file mode 100644 index 00000000000..ea1232737c1 --- /dev/null +++ b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp @@ -0,0 +1,220 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include + +#include + +using namespace Aws::Utils::Logging; +using namespace Aws::Http; + +static const char* CURL_HANDLE_CONTAINER_TAG = "CurlMultiHandleContainer"; + + +CurlMultiHandleContainer::CurlMultiHandleContainer(unsigned maxSize, long httpRequestTimeout, long connectTimeout, bool enableTcpKeepAlive, + unsigned long tcpKeepAliveIntervalMs, long lowSpeedTime, unsigned long lowSpeedLimit, + Version version) : + m_maxPoolSize(maxSize), m_httpRequestTimeout(httpRequestTimeout), m_connectTimeout(connectTimeout), m_enableTcpKeepAlive(enableTcpKeepAlive), + m_tcpKeepAliveIntervalMs(tcpKeepAliveIntervalMs), m_lowSpeedTime(lowSpeedTime), m_lowSpeedLimit(lowSpeedLimit), m_poolSize(0), + m_version(version) +{ + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Initializing CurlMultiHandleContainer with size " << maxSize); + + this->m_curlMultiHandle = curl_multi_init(); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "CURLMOPT_MAXCONNECTS = " << m_maxPoolSize); + curl_multi_setopt(this->m_curlMultiHandle, CURLMOPT_MAXCONNECTS, m_maxPoolSize); +// this->m_curlShare = curl_share_init(); +// CURLSHcode curlShareCode = curl_share_setopt(this->m_curlShare, CURLSHOPT_SHARE, CURL_LOCK_DATA_COOKIE); +// curlShareCode = curl_share_setopt(this->m_curlShare, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION); +// assert(curlShareCode == CURLSHE_OK); +// AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Enabled CURLSHOPT_SHARE CURL_LOCK_DATA_SSL_SESSION"); +} + +CurlMultiHandleContainer::~CurlMultiHandleContainer() +{ + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Cleaning up CurlMultiHandleContainer."); + for (CURL* handle : m_handleContainer.ShutdownAndWait(m_poolSize)) + { + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Cleaning up " << handle); + curl_easy_cleanup(handle); + } + + curl_multi_cleanup(m_curlMultiHandle); + m_curlMultiHandle = nullptr; +// curl_share_cleanup(m_curlShare); +// m_curlShare = nullptr; +} + +CURL* CurlMultiHandleContainer::AcquireCurlHandle() +{ + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Attempting to acquire curl easy handle."); + + if(!m_handleContainer.HasResourcesAvailable()) + { + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "No current connections available in pool. Attempting to create new connections."); + CheckAndGrowPool(); + } + + CURL* handle = m_handleContainer.Acquire(); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Connection has been released. Continuing."); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Returning connection handle " << handle); + return handle; +} + +void CurlMultiHandleContainer::ReleaseCurlHandle(CURL* handle) +{ + if (handle) + { +#if LIBCURL_VERSION_NUM >= 0x074D00 // 7.77.0 + curl_easy_setopt(handle, CURLOPT_COOKIEFILE, NULL); // workaround a mem leak on curl +#endif + curl_easy_reset(handle); + SetDefaultOptionsOnHandle(handle); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Releasing curl handle " << handle); + m_handleContainer.Release(handle); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Notified waiting threads."); + } +} + +void CurlMultiHandleContainer::DestroyCurlHandle(CURL* handle) +{ + if (!handle) + { + return; + } + + curl_easy_cleanup(handle); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Destroy curl handle: " << handle); + { + std::lock_guard locker(m_containerLock); + // Other threads could be blocked and waiting on m_handleContainer.Acquire() + // If the handle is not released back to the pool, it could create a deadlock + // Create a new handle and release that into the pool + handle = CreateCurlHandleInPool(); + } + if (handle) + { + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Created replacement handle and released to pool: " << handle); + } +} + + +CURL* CurlMultiHandleContainer::CreateCurlHandleInPool() +{ + CURL* curlHandle = curl_easy_init(); + + if (curlHandle) + { + SetDefaultOptionsOnHandle(curlHandle); + m_handleContainer.Release(curlHandle); + } + else + { + AWS_LOGSTREAM_ERROR(CURL_HANDLE_CONTAINER_TAG, "curl_easy_init failed to allocate."); + } + return curlHandle; +} + +bool CurlMultiHandleContainer::CheckAndGrowPool() +{ + std::lock_guard locker(m_containerLock); + if (m_poolSize < m_maxPoolSize) + { + unsigned multiplier = m_poolSize > 0 ? m_poolSize : 1; + unsigned amountToAdd = (std::min)(multiplier * 2, m_maxPoolSize - m_poolSize); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "attempting to grow pool size by " << amountToAdd); + + unsigned actuallyAdded = 0; + for (unsigned i = 0; i < amountToAdd; ++i) + { + CURL* curlHandle = CreateCurlHandleInPool(); + + if (curlHandle) + { + ++actuallyAdded; + } + else + { + break; + } + } + + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Pool grown by " << actuallyAdded); + m_poolSize += actuallyAdded; + + return actuallyAdded > 0; + } + + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Pool cannot be grown any further, already at max size."); + + return false; +} + +void CurlMultiHandleContainer::SetDefaultOptionsOnHandle(CURL* handle) +{ + //for timeouts to work in a multi-threaded context, + //always turn signals off. This also forces dns queries to + //not be included in the timeout calculations. + curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L); + curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, m_httpRequestTimeout); + curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT_MS, m_connectTimeout); + curl_easy_setopt(handle, CURLOPT_LOW_SPEED_LIMIT, m_lowSpeedLimit); + curl_easy_setopt(handle, CURLOPT_LOW_SPEED_TIME, m_lowSpeedTime < 1000 ? (m_lowSpeedTime == 0 ? 0 : 1) : m_lowSpeedTime / 1000); + curl_easy_setopt(handle, CURLOPT_TCP_KEEPALIVE, m_enableTcpKeepAlive ? 1L : 0L); + curl_easy_setopt(handle, CURLOPT_TCP_KEEPINTVL, m_tcpKeepAliveIntervalMs / 1000); + curl_easy_setopt(handle, CURLOPT_TCP_KEEPIDLE, m_tcpKeepAliveIntervalMs / 1000); + curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, ConvertHttpVersion(m_version)); +} + +long CurlMultiHandleContainer::ConvertHttpVersion(Version version) { + if (version == Version::HTTP_VERSION_NONE) + { + return CURL_HTTP_VERSION_NONE; + } + else if (version == Version::HTTP_VERSION_1_0) + { + return CURL_HTTP_VERSION_1_0; + } + else if (version == Version::HTTP_VERSION_1_1) + { + return CURL_HTTP_VERSION_1_1; + } +#if LIBCURL_VERSION_NUM >= 0x072100 // 7.33.0 + else if (version == Version::HTTP_VERSION_2_0) + { + return CURL_HTTP_VERSION_2_0; + } +#endif +#if LIBCURL_VERSION_NUM >= 0x072F00 // 7.47.0 + else if (version == Version::HTTP_VERSION_2TLS) + { + return CURL_HTTP_VERSION_2TLS; + } +#endif +#if LIBCURL_VERSION_NUM >= 0x073100 // 7.49.0 + else if (version == Version::HTTP_VERSION_2_PRIOR_KNOWLEDGE) + { + return CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE; + } +#endif +#if LIBCURL_VERSION_NUM >= 0x074200 // 7.66.0 + else if (version == Version::HTTP_VERSION_3) + { + return CURL_HTTP_VERSION_3; + } +#endif +#if LIBCURL_VERSION_NUM >= 0x075800 // 7.88.0 + else if (version == Version::HTTP_VERSION_3ONLY) + { + return CURL_HTTP_VERSION_3ONLY; + } +#endif +#if LIBCURL_VERSION_NUM >= 0x073E00 // 7.62.0 + return CURL_HTTP_VERSION_2TLS; +#else + return CURL_HTTP_VERSION_1_1; +#endif +} diff --git a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp new file mode 100644 index 00000000000..0a32f5c2970 --- /dev/null +++ b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp @@ -0,0 +1,1128 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +using namespace Aws::Client; +using namespace Aws::Http; +using namespace Aws::Http::Standard; +using namespace Aws::Utils; +using namespace Aws::Utils::Logging; +using namespace Aws::Monitoring; + +#ifdef AWS_CUSTOM_MEMORY_MANAGEMENT + +static const char* MemTag = "libcurl"; +static size_t offset = sizeof(size_t); + +static void* malloc_callback(size_t size) +{ + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, size + offset)); + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = size; + return reinterpret_cast(newMem + offset); +} + +static void free_callback(void* ptr) +{ + if(ptr) + { + char* shiftedMemory = reinterpret_cast(ptr); + Aws::Free(shiftedMemory - offset); + } +} + +static void* realloc_callback(void* ptr, size_t size) +{ + if(!ptr) + { + return malloc_callback(size); + } + + + if(!size && ptr) + { + free_callback(ptr); + return nullptr; + } + + char* originalLenCharPtr = reinterpret_cast(ptr) - offset; + size_t originalLen = *reinterpret_cast(originalLenCharPtr); + + char* rawMemory = reinterpret_cast(Aws::Malloc(MemTag, size + offset)); + if(rawMemory) + { + std::size_t* pointerToSize = reinterpret_cast(rawMemory); + *pointerToSize = size; + + size_t copyLength = (std::min)(originalLen, size); +#ifdef _MSC_VER + memcpy_s(rawMemory + offset, size, ptr, copyLength); +#else + memcpy(rawMemory + offset, ptr, copyLength); +#endif + free_callback(ptr); + return reinterpret_cast(rawMemory + offset); + } + else + { + return ptr; + } + +} + +static void* calloc_callback(size_t nmemb, size_t size) +{ + size_t dataSize = nmemb * size; + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, dataSize + offset)); + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = dataSize; +#ifdef _MSC_VER + memset_s(newMem + offset, dataSize, 0, dataSize); +#else + memset(newMem + offset, 0, dataSize); +#endif + + return reinterpret_cast(newMem + offset); +} + +static char* strdup_callback(const char* str) +{ + size_t len = strlen(str) + 1; + size_t newLen = len + offset; + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, newLen)); + + if(newMem) + { + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = len; +#ifdef _MSC_VER + memcpy_s(newMem + offset, len, str, len); +#else + memcpy(newMem + offset, str, len); +#endif + return newMem + offset; + } + return nullptr; +} + +#endif + +struct CurlWriteCallbackContext +{ + CurlWriteCallbackContext(const CurlMultiHttpClient* client, + HttpRequest* request, + std::shared_ptr response, + Aws::Utils::RateLimits::RateLimiterInterface* rateLimiter) : + m_client(client), + m_request(request), + m_response(std::move(response)), + m_rateLimiter(rateLimiter), + m_numBytesResponseReceived(0) + {} + + const CurlMultiHttpClient* m_client; + HttpRequest* m_request; + std::shared_ptr m_response; + Aws::Utils::RateLimits::RateLimiterInterface* m_rateLimiter; + int64_t m_numBytesResponseReceived; +}; + +struct CurlReadCallbackContext +{ + CurlReadCallbackContext(const CurlMultiHttpClient* client, CURL* curlHandle, HttpRequest* request, Aws::Utils::RateLimits::RateLimiterInterface* limiter) : + m_client(client), + m_curlHandle(curlHandle), + m_rateLimiter(limiter), + m_request(request), + m_chunkEnd(false) + {} + + const CurlMultiHttpClient* m_client; + CURL* m_curlHandle; + Aws::Utils::RateLimits::RateLimiterInterface* m_rateLimiter; + HttpRequest* m_request; + bool m_chunkEnd; +}; + +enum class ExecutionPolicy +{ + BLOCKING, + ASYNC +}; + +struct CurlMultiHttpClient::CurlEasyHandleContext +{ + ExecutionPolicy execPolicy; + CurlWriteCallbackContext writeContext; + CurlReadCallbackContext readContext; + Aws::Utils::DateTime startTransmissionTime; + curl_slist* curlHandleHeaders; + + std::function onCurlDoneFn; + // TODO: C++14: add "= nullptr" and other default initializers + CURLcode curlResult; + // ptr acquired by curl_multi_info_read, free-ed by curl + CURLMsg* curlResultMsg; +}; + +static const char* CURL_HTTP_CLIENT_TAG = "CurlMultiHttpClient"; + +static int64_t GetContentLengthFromHeader(CURL* connectionHandle, + bool& hasContentLength) { +#if LIBCURL_VERSION_NUM >= 0x073700 // 7.55.0 + curl_off_t contentLength = {}; + CURLcode res = curl_easy_getinfo( + connectionHandle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &contentLength); +#else + double contentLength = {}; + CURLcode res = curl_easy_getinfo( + connectionHandle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &contentLength); +#endif + hasContentLength = (res == CURLE_OK) && (contentLength != -1); + return hasContentLength ? static_cast(contentLength) : -1; +} + +static size_t WriteData(char* ptr, size_t size, size_t nmemb, void* userdata) +{ + if (ptr) + { + CurlWriteCallbackContext* context = reinterpret_cast(userdata); + + const CurlMultiHttpClient* client = context->m_client; + if(!client->ContinueRequest(*context->m_request) || !client->IsRequestProcessingEnabled()) + { + return 0; + } + + HttpResponse* response = context->m_response.get(); + size_t sizeToWrite = size * nmemb; + if (context->m_rateLimiter) + { + context->m_rateLimiter->ApplyAndPayForCost(static_cast(sizeToWrite)); + } + + for (const auto& hashIterator : context->m_request->GetResponseValidationHashes()) + { + hashIterator.second->Update(reinterpret_cast(ptr), sizeToWrite); + } + + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Response output stream in bad state (eof: " + << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + + size_t cur = response->GetResponseBody().tellp(); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Unable to query response output position (eof: " + << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + + response->GetResponseBody().write(ptr, static_cast(sizeToWrite)); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to write " << size << " / " << sizeToWrite << " B response" + << " at " << cur << " (eof: " << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + if (context->m_request->IsEventStreamRequest() && !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE)) + { + response->GetResponseBody().flush(); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to flush event response (eof: " + << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + } + auto& receivedHandler = context->m_request->GetDataReceivedEventHandler(); + if (receivedHandler) + { + receivedHandler(context->m_request, context->m_response.get(), static_cast(sizeToWrite)); + } + + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, sizeToWrite << " bytes written to response."); + context->m_numBytesResponseReceived += sizeToWrite; + return sizeToWrite; + } + return 0; +} + +static size_t WriteHeader(char* ptr, size_t size, size_t nmemb, void* userdata) +{ + if (ptr) + { + CurlWriteCallbackContext* context = reinterpret_cast(userdata); + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, ptr); + HttpResponse* response = context->m_response.get(); + Aws::String headerLine(ptr); + Aws::Vector keyValuePair = StringUtils::Split(headerLine, ':', 2); + + if (keyValuePair.size() == 2) + { + response->AddHeader(StringUtils::Trim(keyValuePair[0].c_str()), StringUtils::Trim(keyValuePair[1].c_str())); + } + + return size * nmemb; + } + return 0; +} + +static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, bool isStreaming) +{ + CurlReadCallbackContext* context = reinterpret_cast(userdata); + if(context == nullptr) + { + return 0; + } + + const CurlMultiHttpClient* client = context->m_client; + if(!client->ContinueRequest(*context->m_request) || !client->IsRequestProcessingEnabled()) + { + return CURL_READFUNC_ABORT; + } + + HttpRequest* request = context->m_request; + const std::shared_ptr& ioStream = request->GetContentBody(); + + size_t amountToRead = size * nmemb; + bool isAwsChunked = request->HasHeader(Aws::Http::CONTENT_ENCODING_HEADER) && + request->GetHeaderValue(Aws::Http::CONTENT_ENCODING_HEADER) == Aws::Http::AWS_CHUNKED_VALUE; + // aws-chunk = hex(chunk-size) + CRLF + chunk-data + CRLF + // Needs to reserve bytes of sizeof(hex(chunk-size)) + sizeof(CRLF) + sizeof(CRLF) + if (isAwsChunked) + { + Aws::String amountToReadHexString = Aws::Utils::StringUtils::ToHexString(amountToRead); + amountToRead -= (amountToReadHexString.size() + 4); + } + + if (ioStream != nullptr && amountToRead > 0) + { + if (isStreaming) + { + if (ioStream->readsome(ptr, amountToRead) == 0 && !ioStream->eof()) + { + return CURL_READFUNC_PAUSE; + } + } + else + { + ioStream->read(ptr, amountToRead); + } + size_t amountRead = static_cast(ioStream->gcount()); + + if (isAwsChunked) + { + if (amountRead > 0) + { + if (request->GetRequestHash().second != nullptr) + { + request->GetRequestHash().second->Update(reinterpret_cast(ptr), amountRead); + } + + Aws::String hex = Aws::Utils::StringUtils::ToHexString(amountRead); + memmove(ptr + hex.size() + 2, ptr, amountRead); + memmove(ptr + hex.size() + 2 + amountRead, "\r\n", 2); + memmove(ptr, hex.c_str(), hex.size()); + memmove(ptr + hex.size(), "\r\n", 2); + amountRead += hex.size() + 4; + } + else if (!context->m_chunkEnd) + { + Aws::StringStream chunkedTrailer; + chunkedTrailer << "0\r\n"; + if (request->GetRequestHash().second != nullptr) + { + chunkedTrailer << "x-amz-checksum-" << request->GetRequestHash().first << ":" + << HashingUtils::Base64Encode(request->GetRequestHash().second->GetHash().GetResult()) << "\r\n"; + } + chunkedTrailer << "\r\n"; + amountRead = chunkedTrailer.str().size(); + memcpy(ptr, chunkedTrailer.str().c_str(), amountRead); + context->m_chunkEnd = true; + } + } + + auto& sentHandler = request->GetDataSentEventHandler(); + if (sentHandler) + { + sentHandler(request, static_cast(amountRead)); + } + + if (context->m_rateLimiter) + { + context->m_rateLimiter->ApplyAndPayForCost(static_cast(amountRead)); + } + + return amountRead; + } + + return 0; +} + +static size_t ReadBodyStreaming(char* ptr, size_t size, size_t nmemb, void* userdata) { + return ReadBody(ptr, size, nmemb, userdata, true); +} + +static size_t ReadBodyFunc(char* ptr, size_t size, size_t nmemb, void* userdata) { + return ReadBody(ptr, size, nmemb, userdata, false); +} + +static size_t SeekBody(void* userdata, curl_off_t offset, int origin) +{ + CurlReadCallbackContext* context = reinterpret_cast(userdata); + if(context == nullptr) + { + return CURL_SEEKFUNC_FAIL; + } + + const CurlMultiHttpClient* client = context->m_client; + if(!client->ContinueRequest(*context->m_request) || !client->IsRequestProcessingEnabled()) + { + return CURL_SEEKFUNC_FAIL; + } + + HttpRequest* request = context->m_request; + const std::shared_ptr& ioStream = request->GetContentBody(); + + std::ios_base::seekdir dir; + switch(origin) + { + case SEEK_SET: + dir = std::ios_base::beg; + break; + case SEEK_CUR: + dir = std::ios_base::cur; + break; + case SEEK_END: + dir = std::ios_base::end; + break; + default: + return CURL_SEEKFUNC_FAIL; + } + + ioStream->clear(); + ioStream->seekg(offset, dir); + if (ioStream->fail()) { + return CURL_SEEKFUNC_CANTSEEK; + } + + return CURL_SEEKFUNC_OK; +} +#if LIBCURL_VERSION_NUM >= 0x072000 // 7.32.0 +static int CurlProgressCallback(void *userdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) +#else +static int CurlProgressCallback(void *userdata, double, double, double, double) +#endif +{ + CurlReadCallbackContext* context = reinterpret_cast(userdata); + + const std::shared_ptr& ioStream = context->m_request->GetContentBody(); + if (ioStream->eof()) + { + curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT); + return 0; + } + char output[1]; + if (ioStream->readsome(output, 1) > 0) + { + ioStream->unget(); + if (!ioStream->good()) + { + AWS_LOGSTREAM_WARN(CURL_HTTP_CLIENT_TAG, "Input stream failed to perform unget()."); + } + curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT); + } + + return 0; +} + +void SetOptCodeForHttpMethod(CURL* const requestHandle, Aws::Http::HttpRequest const * const request) +{ + assert(requestHandle && request); + switch (request->GetMethod()) + { + case HttpMethod::HTTP_GET: + curl_easy_setopt(requestHandle, CURLOPT_HTTPGET, 1L); + break; + case HttpMethod::HTTP_POST: + if (request->HasHeader(Aws::Http::CONTENT_LENGTH_HEADER) && request->GetHeaderValue(Aws::Http::CONTENT_LENGTH_HEADER) == "0") + { + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "POST"); + } + else + { + curl_easy_setopt(requestHandle, CURLOPT_POST, 1L); + } + break; + case HttpMethod::HTTP_PUT: + if ((!request->HasHeader(Aws::Http::CONTENT_LENGTH_HEADER) || request->GetHeaderValue(Aws::Http::CONTENT_LENGTH_HEADER) == "0") && + !request->HasHeader(Aws::Http::TRANSFER_ENCODING_HEADER)) + { + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "PUT"); + } + else + { +#if LIBCURL_VERSION_NUM >= 0x070c01 // 7.12.1 + curl_easy_setopt(requestHandle, CURLOPT_UPLOAD, 1L); +#else + curl_easy_setopt(requestHandle, CURLOPT_PUT, 1L); +#endif + } + break; + case HttpMethod::HTTP_HEAD: + curl_easy_setopt(requestHandle, CURLOPT_HTTPGET, 1L); + curl_easy_setopt(requestHandle, CURLOPT_NOBODY, 1L); + break; + case HttpMethod::HTTP_PATCH: + if ((!request->HasHeader(Aws::Http::CONTENT_LENGTH_HEADER)|| request->GetHeaderValue(Aws::Http::CONTENT_LENGTH_HEADER) == "0") && + !request->HasHeader(Aws::Http::TRANSFER_ENCODING_HEADER)) + { + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "PATCH"); + } + else + { + curl_easy_setopt(requestHandle, CURLOPT_POST, 1L); + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "PATCH"); + } + + break; + case HttpMethod::HTTP_DELETE: + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "DELETE"); + break; + default: + assert(0); + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "GET"); + break; + } +} + + +std::atomic CurlMultiHttpClient::isGlobalStateInit(false); + +void CurlMultiHttpClient::InitGlobalState() +{ + if (!isGlobalStateInit) + { + auto curlVersionData = curl_version_info(CURLVERSION_NOW); + AWS_LOGSTREAM_INFO(CURL_HTTP_CLIENT_TAG, "Initializing Curl library with version: " << curlVersionData->version + << ", ssl version: " << curlVersionData->ssl_version); + isGlobalStateInit = true; +#ifdef AWS_CUSTOM_MEMORY_MANAGEMENT + curl_global_init_mem(CURL_GLOBAL_ALL, &malloc_callback, &free_callback, &realloc_callback, &strdup_callback, &calloc_callback); +#else + curl_global_init(CURL_GLOBAL_ALL); +#endif + } +} + + +void CurlMultiHttpClient::CleanupGlobalState() +{ + curl_global_cleanup(); +} + +Aws::String CurlMultiInfoTypeToString(curl_infotype type) +{ + switch(type) + { + case CURLINFO_TEXT: + return "Text"; + + case CURLINFO_HEADER_IN: + return "HeaderIn"; + + case CURLINFO_HEADER_OUT: + return "HeaderOut"; + + case CURLINFO_DATA_IN: + return "DataIn"; + + case CURLINFO_DATA_OUT: + return "DataOut"; + + case CURLINFO_SSL_DATA_IN: + return "SSLDataIn"; + + case CURLINFO_SSL_DATA_OUT: + return "SSLDataOut"; + + default: + return "Unknown"; + } +} + +int CurlMultiDebugCallback(CURL *handle, curl_infotype type, char *data, size_t size, void *userptr) +{ + AWS_UNREFERENCED_PARAM(handle); + AWS_UNREFERENCED_PARAM(userptr); + + if(type == CURLINFO_SSL_DATA_IN || type == CURLINFO_SSL_DATA_OUT) + { + AWS_LOGSTREAM_DEBUG("CURL", "(" << CurlMultiInfoTypeToString(type) << ") " << size << "bytes"); + } + else + { + Aws::String debugString(data, size); + AWS_LOGSTREAM_DEBUG("CURL", "(" << CurlMultiInfoTypeToString(type) << ") " << debugString); + } + + return 0; +} + +void CurlMultiHttpClient::CurlMultiPerformThread(CurlMultiHttpClient* pClient) +{ + assert(pClient && pClient->m_curlMultiHandleContainer.AccessCurlMultiHandle()); + CURLM *multi_handle = pClient->m_curlMultiHandleContainer.AccessCurlMultiHandle(); + + int stillRunning = 0; + while(pClient->m_isRunning) + { + // wait for new task signal + { + std::unique_lock lockGuard(pClient->m_signalMutex); + pClient->m_signalRunning.wait(lockGuard, + [pClient, &stillRunning] + { + // return true to unlock + if(!pClient->m_isRunning.load()) + return true; + + return pClient->m_tasksQueued || stillRunning; + }); + } + if(!pClient->m_isRunning.load()) + { + break; + } + + pClient->m_tasksQueued = 0; + + CURLMcode mc = curl_multi_perform(multi_handle, &stillRunning); + int msgQueue = 0; + do { + + struct CURLMsg* message = curl_multi_info_read(multi_handle, &msgQueue); + if(message) + { + if(message->msg == CURLMSG_DONE) + { + CURL* easyHandle = message->easy_handle; + std::shared_ptr pEasyHandleCtx; + { + std::unique_lock lock(pClient->m_tasksMutex); + assert(pClient->m_tasks.find(easyHandle) != pClient->m_tasks.end()); + pEasyHandleCtx = pClient->m_tasks[easyHandle]; + } + assert(pEasyHandleCtx); + pEasyHandleCtx->curlResult = message->data.result; + pEasyHandleCtx->onCurlDoneFn(); + } else { + assert(!"Todo"); + } + } + } while(msgQueue > 0); + + if(!mc && stillRunning) + /* wait for activity, timeout or "nothing" */ + mc = curl_multi_poll(multi_handle, NULL, 0, 1000, NULL); + + if(mc) { + fprintf(stderr, "curl_multi_poll() failed, code %d.\n", (int)mc); + break; + } + }; +} + +CurlMultiHttpClient::CurlMultiHttpClient(const ClientConfiguration& clientConfig) : + Base(), + m_curlMultiHandleContainer(clientConfig.maxConnections, clientConfig.httpRequestTimeoutMs, clientConfig.connectTimeoutMs, clientConfig.enableTcpKeepAlive, + clientConfig.tcpKeepAliveIntervalMs, clientConfig.requestTimeoutMs, clientConfig.lowSpeedLimit, clientConfig.version), + m_telemetryProvider(clientConfig.telemetryProvider) +{ + m_config.proxyConfig.isEnabled = !clientConfig.proxyHost.empty(); + m_config.proxyConfig.userName = clientConfig.proxyUserName; + m_config.proxyConfig.password = clientConfig.proxyPassword; + m_config.proxyConfig.scheme = SchemeMapper::ToString(clientConfig.proxyScheme); + m_config.proxyConfig.host = clientConfig.proxyHost; + m_config.proxyConfig.sslCertPath = clientConfig.proxySSLCertPath; + m_config.proxyConfig.sslCertType = clientConfig.proxySSLCertType; + m_config.proxyConfig.sslKeyPath = clientConfig.proxySSLKeyPath; + m_config.proxyConfig.sslKeyType = clientConfig.proxySSLKeyType; + m_config.proxyConfig.keyPasswd = clientConfig.proxySSLKeyPassword; + m_config.proxyConfig.port = clientConfig.proxyPort; + m_config.sslConfig.verifySSL = clientConfig.verifySSL; + m_config.sslConfig.caPath = clientConfig.caPath; + m_config.sslConfig.caFile = clientConfig.caFile; + m_config.disableExpectHeader = clientConfig.disableExpectHeader; + + if (clientConfig.followRedirects == FollowRedirectsPolicy::NEVER || + (clientConfig.followRedirects == FollowRedirectsPolicy::DEFAULT && clientConfig.region == Aws::Region::AWS_GLOBAL)) + { + m_config.allowRedirects = false; + } + else + { + m_config.allowRedirects = true; + } + if(clientConfig.nonProxyHosts.GetLength() > 0) + { + Aws::StringStream ss; + ss << clientConfig.nonProxyHosts.GetItem(0); + for (auto i=1u; i < clientConfig.nonProxyHosts.GetLength(); i++) + { + ss << "," << clientConfig.nonProxyHosts.GetItem(i); + } + m_config.proxyConfig.nonProxyHosts = ss.str(); + } + + // LAUNCH + m_isRunning = true; + m_tasksQueued = 0; + m_multiHandleThread = std::thread(CurlMultiPerformThread, this); +} + +CurlMultiHttpClient::~CurlMultiHttpClient() +{ + m_isRunning.store(false); + { + std::unique_lock lockGuard(m_signalMutex); + m_signalRunning.notify_all(); + } + m_multiHandleThread.join(); + + curl_multi_cleanup(m_curlMultiHandleContainer.AccessCurlMultiHandle()); +} + +struct curl_slist* PrepareHeaders(const HttpRequest* request, const bool disableExpectHeader) +{ + assert(request); + struct curl_slist* headers = NULL; + + Aws::StringStream headerStream; + HeaderValueCollection requestHeaders = request->GetHeaders(); + + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, "Including headers:"); + for (auto& requestHeader : requestHeaders) + { + headerStream.str(""); + headerStream << requestHeader.first << ": " << requestHeader.second; + Aws::String headerString = headerStream.str(); + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, headerString); + headers = curl_slist_append(headers, headerString.c_str()); + } + + if (!request->HasHeader(Aws::Http::TRANSFER_ENCODING_HEADER)) + { + headers = curl_slist_append(headers, "transfer-encoding:"); + } + + if (!request->HasHeader(Aws::Http::CONTENT_LENGTH_HEADER)) + { + headers = curl_slist_append(headers, "content-length:"); + } + + if (!request->HasHeader(Aws::Http::CONTENT_TYPE_HEADER)) + { + headers = curl_slist_append(headers, "content-type:"); + } + + // Discard Expect header so as to avoid using multiple payloads to send a http request (header + body) + if (disableExpectHeader) + { + headers = curl_slist_append(headers, "Expect:"); + } + + return headers; +} + + +std::shared_ptr ConfigureEasyHandle( + CurlMultiHttpClient const * const pClient, + const Aws::Http::CurlMultiHttpClient::CurlMultiHttpClientConfig& config, + std::function onDoneFn, + CURL* const connectionHandle, + Aws::Http::HttpRequest* const request, + const std::shared_ptr response, + Aws::Utils::RateLimits::RateLimiterInterface* readLimiter, + Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter) +{ + assert(connectionHandle && request && response); + struct curl_slist* headers = PrepareHeaders(request, config.disableExpectHeader); + if (headers) + { + curl_easy_setopt(connectionHandle, CURLOPT_HTTPHEADER, headers); + } + + std::shared_ptr pHandleCtx = + Aws::MakeShared(CURL_HTTP_CLIENT_TAG, + CurlMultiHttpClient::CurlEasyHandleContext{ + ExecutionPolicy::BLOCKING, + CurlWriteCallbackContext(pClient, request, response, readLimiter), + CurlReadCallbackContext(pClient, connectionHandle, request, writeLimiter), + Aws::Utils::DateTime(), + headers, + onDoneFn, + CURLE_FAILED_INIT, + nullptr}); + + CurlMultiHttpClient::CurlEasyHandleContext& handleContext = *pHandleCtx; + + SetOptCodeForHttpMethod(connectionHandle, request); + + URI uri = request->GetUri(); + Aws::String url = uri.GetURIString(); + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, "Making request to " << url); + curl_easy_setopt(connectionHandle, CURLOPT_URL, url.c_str()); + curl_easy_setopt(connectionHandle, CURLOPT_WRITEFUNCTION, WriteData); + curl_easy_setopt(connectionHandle, CURLOPT_WRITEDATA, &handleContext.writeContext); + curl_easy_setopt(connectionHandle, CURLOPT_HEADERFUNCTION, WriteHeader); + curl_easy_setopt(connectionHandle, CURLOPT_HEADERDATA, &handleContext.writeContext); + + //we only want to override the default path if someone has explicitly told us to. + if(!config.sslConfig.caPath.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_CAPATH, config.sslConfig.caPath.c_str()); + } + if(!config.sslConfig.caFile.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_CAINFO, config.sslConfig.caFile.c_str()); + } + + // enable the cookie engine without reading any initial cookies. + curl_easy_setopt(connectionHandle, CURLOPT_COOKIEFILE, ""); + +// only set by android test builds because the emulator is missing a cert needed for aws services +#ifdef TEST_CERT_PATH +curl_easy_setopt(connectionHandle, CURLOPT_CAPATH, TEST_CERT_PATH); +#endif // TEST_CERT_PATH + + if (config.sslConfig.verifySSL) + { + curl_easy_setopt(connectionHandle, CURLOPT_SSL_VERIFYPEER, 1L); + curl_easy_setopt(connectionHandle, CURLOPT_SSL_VERIFYHOST, 2L); + +#if defined(ENFORCE_TLS_V1_3) && LIBCURL_VERSION_NUM >= 0x073400 // 7.52.0 + curl_easy_setopt(connectionHandle, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_3); +#elif defined(ENFORCE_TLS_V1_2) && LIBCURL_VERSION_NUM >= 0x072200 // 7.34.0 + curl_easy_setopt(connectionHandle, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); +#else + curl_easy_setopt(connectionHandle, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1); +#endif + } + else + { + curl_easy_setopt(connectionHandle, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(connectionHandle, CURLOPT_SSL_VERIFYHOST, 0L); + } + + if (config.allowRedirects) + { + curl_easy_setopt(connectionHandle, CURLOPT_FOLLOWLOCATION, 1L); + } + else + { + curl_easy_setopt(connectionHandle, CURLOPT_FOLLOWLOCATION, 0L); + } + +#ifdef ENABLE_CURL_LOGGING + curl_easy_setopt(connectionHandle, CURLOPT_VERBOSE, 1); + curl_easy_setopt(connectionHandle, CURLOPT_DEBUGFUNCTION, CurlMultiDebugCallback); +#endif + if (config.proxyConfig.isEnabled) + { + Aws::StringStream ss; + ss << config.proxyConfig.scheme << "://" << config.proxyConfig.host; + curl_easy_setopt(connectionHandle, CURLOPT_PROXY, ss.str().c_str()); + curl_easy_setopt(connectionHandle, CURLOPT_PROXYPORT, (long) config.proxyConfig.port); + if (!config.proxyConfig.userName.empty() || !config.proxyConfig.password.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXYUSERNAME, config.proxyConfig.userName.c_str()); + curl_easy_setopt(connectionHandle, CURLOPT_PROXYPASSWORD, config.proxyConfig.password.c_str()); + } + curl_easy_setopt(connectionHandle, CURLOPT_NOPROXY, config.proxyConfig.nonProxyHosts.c_str()); +#ifdef CURL_HAS_TLS_PROXY + if (!config.proxyConfig.sslCertPath.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY_SSLCERT, config.proxyConfig.sslCertPath.c_str()); + if (!config.proxyConfig.sslCertType.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY_SSLCERTTYPE, config.proxyConfig.sslCertType.c_str()); + } + } + if (!config.proxyConfig.sslKeyPath.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY_SSLKEY, config.proxyConfig.sslKeyPath.c_str()); + if (!config.proxyConfig.sslKeyType.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY_SSLKEYTYPE, config.proxyConfig.sslKeyType.c_str()); + } + if (!config.proxyConfig.keyPasswd.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY_KEYPASSWD, config.proxyConfig.keyPasswd.c_str()); + } + } +#endif //CURL_HAS_TLS_PROXY + } + else + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY, ""); + } + + if (request->GetContentBody()) + { + curl_easy_setopt(connectionHandle, CURLOPT_READFUNCTION, ReadBodyFunc); + curl_easy_setopt(connectionHandle, CURLOPT_READDATA, &handleContext.readContext); + curl_easy_setopt(connectionHandle, CURLOPT_SEEKFUNCTION, SeekBody); + curl_easy_setopt(connectionHandle, CURLOPT_SEEKDATA, &handleContext.readContext); + if (request->IsEventStreamRequest() && !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE)) + { + curl_easy_setopt(connectionHandle, CURLOPT_READFUNCTION, ReadBodyStreaming); + curl_easy_setopt(connectionHandle, CURLOPT_NOPROGRESS, 0L); +#if LIBCURL_VERSION_NUM >= 0x072000 // 7.32.0 + curl_easy_setopt(connectionHandle, CURLOPT_XFERINFOFUNCTION, CurlProgressCallback); + curl_easy_setopt(connectionHandle, CURLOPT_XFERINFODATA, &handleContext.readContext); +#else + curl_easy_setopt(connectionHandle, CURLOPT_PROGRESSFUNCTION, CurlProgressCallback); + curl_easy_setopt(connectionHandle, CURLOPT_PROGRESSDATA, &readContext); +#endif + } + } + + return pHandleCtx; +} + +std::shared_ptr CurlMultiHttpClient::HandleCurlResponse(std::shared_ptr pEasyHandleCtx) +{ + const CURLcode curlResponseCode = pEasyHandleCtx->curlResult; + Aws::Http::HttpRequest * const request = pEasyHandleCtx->writeContext.m_request; + Aws::Http::HttpResponse * const response = pEasyHandleCtx->writeContext.m_response.get(); + CurlMultiHttpClient const * const client = pEasyHandleCtx->writeContext.m_client; + CURL* const connectionHandle = pEasyHandleCtx->readContext.m_curlHandle; + + { + // TODO: refactor + std::unique_lock lock(client->m_tasksMutex); + assert(client->m_tasks.find(connectionHandle) != client->m_tasks.end()); + client->m_tasks.erase(connectionHandle); + } + + assert(request && response && client && connectionHandle); + + bool shouldContinueRequest = client->ContinueRequest(*request); + if (curlResponseCode != CURLE_OK && shouldContinueRequest) + { + response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); + Aws::StringStream ss; + ss << "curlCode: " << curlResponseCode << ", " << curl_easy_strerror(curlResponseCode); + response->SetClientErrorMessage(ss.str()); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Curl returned error code " << curlResponseCode + << " - " << curl_easy_strerror(curlResponseCode)); + } + else if(!shouldContinueRequest) + { + response->SetClientErrorType(CoreErrors::USER_CANCELLED); + response->SetClientErrorMessage("Request cancelled by user's continuation handler"); + } + else + { + long responseCode; + curl_easy_getinfo(connectionHandle, CURLINFO_RESPONSE_CODE, &responseCode); + response->SetResponseCode(static_cast(responseCode)); + AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Returned http response code " << responseCode); + + char* contentType = nullptr; + curl_easy_getinfo(connectionHandle, CURLINFO_CONTENT_TYPE, &contentType); + if (contentType) + { + response->SetContentType(contentType); + AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Returned content type " << contentType); + } + + bool hasContentLength = false; + int64_t contentLength = + GetContentLengthFromHeader(connectionHandle, hasContentLength); + + if (request->GetMethod() != HttpMethod::HTTP_HEAD && + pEasyHandleCtx->writeContext.m_client->IsRequestProcessingEnabled() && + hasContentLength) + { + int64_t numBytesResponseReceived = pEasyHandleCtx->writeContext.m_numBytesResponseReceived; + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, "Response content-length header: " << contentLength); + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, "Response body length: " << numBytesResponseReceived); + if (contentLength != numBytesResponseReceived) + { + response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage("Response body length doesn't match the content-length header."); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Response body length doesn't match the content-length header."); + } + } + + AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Releasing curl handle " << connectionHandle); + } + + double timep = 0.0; + CURLcode ret = curl_easy_getinfo(connectionHandle, CURLINFO_NAMELOOKUP_TIME, &timep); // DNS Resolve Latency, seconds. + if (ret == CURLE_OK) + { + request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::DnsLatency), static_cast(timep * 1000));// to milliseconds + } + + ret = curl_easy_getinfo(connectionHandle, CURLINFO_STARTTRANSFER_TIME, &timep); // Connect Latency + if (ret == CURLE_OK) + { + request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::ConnectLatency), static_cast(timep * 1000)); + } + + ret = curl_easy_getinfo(connectionHandle, CURLINFO_APPCONNECT_TIME, &timep); // Ssl Latency + if (ret == CURLE_OK) + { + request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::SslLatency), static_cast(timep * 1000)); + } + + curl_off_t speed; +#if LIBCURL_VERSION_NUM >= 0x073700 // 7.55.0 + ret = curl_easy_getinfo(connectionHandle, CURLINFO_SPEED_DOWNLOAD_T, &speed); // throughput +#else + ret = curl_easy_getinfo(connectionHandle, CURLINFO_SPEED_DOWNLOAD, &speed); // throughput +#endif + if (ret == CURLE_OK) + { + request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::Throughput), static_cast(speed)); + } + + const char* ip = nullptr; + auto curlGetInfoResult = curl_easy_getinfo(connectionHandle, CURLINFO_PRIMARY_IP, &ip); // Get the IP address of the remote endpoint + if (curlGetInfoResult == CURLE_OK && ip) + { + request->SetResolvedRemoteHost(ip); + } + if (curlResponseCode != CURLE_OK) + { + client->m_curlMultiHandleContainer.DestroyCurlHandle(connectionHandle); + } + else + { + client->m_curlMultiHandleContainer.ReleaseCurlHandle(connectionHandle); + } + //go ahead and flush the response body stream + response->GetResponseBody().flush(); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + Aws::StringStream ss; + ss << "Failed to flush response stream (eof: " << ref.eof() << ", bad: " << ref.bad() << ")"; + response->SetClientErrorType(CoreErrors::INTERNAL_FAILURE); + response->SetClientErrorMessage(ss.str()); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, ss.str()); + } + request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::RequestLatency), + (DateTime::Now() - pEasyHandleCtx->startTransmissionTime).count()); + + if (pEasyHandleCtx->curlHandleHeaders) + { + curl_slist_free_all(pEasyHandleCtx->curlHandleHeaders); + pEasyHandleCtx->curlHandleHeaders = nullptr; + } + + curl_multi_remove_handle(client->m_curlMultiHandleContainer.AccessCurlMultiHandle(), connectionHandle); + + return pEasyHandleCtx->writeContext.m_response; +} + +void CurlMultiHttpClient::SubmitTask(std::shared_ptr pEasyHandleCtx) const +{ + assert(pEasyHandleCtx); + { + std::unique_lock lock(m_tasksMutex); + m_tasksQueued++; + m_tasks[pEasyHandleCtx->readContext.m_curlHandle] = std::move(pEasyHandleCtx); + } + { + std::unique_lock lockGuard(m_signalMutex); + m_signalRunning.notify_one(); + curl_multi_wakeup(m_curlMultiHandleContainer.AccessCurlMultiHandle()); + } +} + +// Blocking +std::shared_ptr CurlMultiHttpClient::MakeRequest(const std::shared_ptr& request, + Aws::Utils::RateLimits::RateLimiterInterface* readLimiter, + Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter) const +{ + std::shared_ptr response = Aws::MakeShared(CURL_HTTP_CLIENT_TAG, request); + if(!response) + { + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to allocate shared_ptr"); + return response; + } + + CURL* connectionHandle = m_curlMultiHandleContainer.AcquireCurlHandle(); + if(!connectionHandle) + { + response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage("Failed to Acquire curl handle"); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to Acquire curl handle"); + return response; + } + + AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Obtained connection handle " << connectionHandle); + + std::mutex taskMutex; + std::condition_variable signal; + bool completed = false; + auto onDone = [&signal, &taskMutex, &completed]() + { + std::unique_lock lockGuard(taskMutex); + completed = true; + signal.notify_one(); + }; + + std::shared_ptr pHandleCtx = ConfigureEasyHandle(this, m_config, onDone, + connectionHandle, request.get(), response, + readLimiter, writeLimiter); + assert(pHandleCtx); + OverrideOptionsOnConnectionHandle(connectionHandle); + + pHandleCtx->startTransmissionTime = Aws::Utils::DateTime::Now(); + + CURLMcode curlMultiResponseCode = curl_multi_add_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(), connectionHandle); + if (CURLM_OK != curlMultiResponseCode) + { + response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage("Failed to add curl_easy_handle to curl_multi_handle."); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to add curl_easy_handle to curl_multi_handle."); + return response; + } + + SubmitTask(pHandleCtx); + + // Task submitted, wait for it's completion + std::unique_lock lockGuard(taskMutex); + signal.wait(lockGuard, + [this, &completed] + { + return !this->m_isRunning.load() || completed; + }); + + // This is a blocking mode, handle response within the submitter thread + return HandleCurlResponse(pHandleCtx); +} From 7e85c620d99e3304e3e330dd0a08b95de3436d4e Mon Sep 17 00:00:00 2001 From: SergeyRyabinin Date: Tue, 19 Sep 2023 04:55:18 +0000 Subject: [PATCH 2/3] Avoid locking on submitted tasks to multi curl handle by keeping easy handle sdk wrappers in a container --- .../http/curl-multi/CurlEasyHandleContext.h | 99 ++++ .../curl-multi/CurlMultiHandleContainer.h | 129 ++--- .../http/curl-multi/CurlMultiHttpClient.h | 14 +- .../http/curl-multi/CurlEasyHandleContext.cpp | 406 ++++++++++++++++ .../curl-multi/CurlMultiHandleContainer.cpp | 173 +++++-- .../http/curl-multi/CurlMultiHttpClient.cpp | 457 +++--------------- 6 files changed, 766 insertions(+), 512 deletions(-) create mode 100644 src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h create mode 100644 src/aws-cpp-sdk-core/source/http/curl-multi/CurlEasyHandleContext.cpp diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h new file mode 100644 index 00000000000..23a9fb75045 --- /dev/null +++ b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h @@ -0,0 +1,99 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +namespace Aws +{ +namespace Utils +{ +namespace RateLimits +{ + class RateLimiterInterface; +} +} + +namespace Http +{ + class CurlMultiHttpClient; + class HttpRequest; + class HttpResponse; + + namespace Curl + { + enum class ExecutionPolicy + { + BLOCKING, + ASYNC + }; + + /* A composition wrapper on curl easy handle to have curl handle and SDK context together*/ + struct CurlEasyHandleContext final + { + struct WriteContext final + { + WriteContext() = default; + + bool m_HasBody = false; + HttpRequest *m_request = nullptr; + std::shared_ptr m_response; + Aws::Utils::RateLimits::RateLimiterInterface *m_rateLimiter = nullptr; + int64_t m_numBytesResponseReceived = 0; + }; + + struct ReadContext final + { + ReadContext() = default; + + Aws::Utils::RateLimits::RateLimiterInterface *m_rateLimiter = nullptr; + HttpRequest *m_request = nullptr; + bool m_chunkEnd = false; + }; + + CurlEasyHandleContext() = default; + + /* Curl side */ + CURL* m_curlEasyHandle = nullptr; + // headers set on curl handle, to be cleaned up by SDK after the execution + curl_slist* m_curlHandleHeaders = nullptr; + + /* SDK side */ + const CurlMultiHttpClient* m_client = nullptr; + ExecutionPolicy m_execPolicy; + std::function m_onCurlDoneFn; + Aws::Utils::DateTime startTransmissionTime; + + /* Curl calls the SDK back */ + WriteContext writeContext; + ReadContext readContext; + + /* SDK polls the curl result */ + CURLcode curlResult; + // ptr acquired by curl_multi_info_read, free-ed by curl in easy handle cleanup, do not free in SDK + CURLMsg* curlResultMsg = nullptr; + + /* callbacks set on easy handle */ + static size_t WriteData(char* ptr, size_t size, size_t nmemb, void* userdata); + static size_t WriteHeader(char* ptr, size_t size, size_t nmemb, void* userdata); + static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, bool isStreaming); + static size_t ReadBodyStreaming(char* ptr, size_t size, size_t nmemb, void* userdata); + static size_t ReadBodyFunc(char* ptr, size_t size, size_t nmemb, void* userdata); + static size_t SeekBody(void* userdata, curl_off_t offset, int origin); +#if LIBCURL_VERSION_NUM >= 0x072000 // 7.32.0 + static int CurlProgressCallback(void *userdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t); +#else + static int CurlProgressCallback(void *userdata, double, double, double, double); +#endif + }; + } // namespace Curl +} // namespace Http +} // namespace Aws \ No newline at end of file diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h index 8fadf89ca9a..187f96df4da 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h +++ b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h @@ -13,73 +13,80 @@ namespace Aws { -namespace Http -{ - -/** - * Simple Connection pool manager for Curl. It maintains connections in a thread safe manner. You - * can call into acquire a handle, then put it back when finished. It is assumed that reusing an already - * initialized handle is preferable (especially for synchronous clients). The pool doubles in capacity as - * needed up to the maximum amount of connections. - */ -class CurlMultiHandleContainer -{ -public: - /** - * Initializes an empty stack of CURL handles. If you are only making synchronous calls via your http client - * then a small size is best. For async support, a good value would be 6 * number of Processors. * - */ - CurlMultiHandleContainer(unsigned maxSize = 50, long httpRequestTimeout = 0, long connectTimeout = 1000, bool tcpKeepAlive = true, - unsigned long tcpKeepAliveIntervalMs = 30000, long lowSpeedTime = 3000, unsigned long lowSpeedLimit = 1, - Version version = Version::HTTP_VERSION_2TLS); - ~CurlMultiHandleContainer(); + namespace Http + { + namespace Curl + { + struct CurlEasyHandleContext; - /** - * Blocks until a curl handle from the pool is available for use. - */ - CURL* AcquireCurlHandle(); - /** - * Returns a handle to the pool for reuse. It is imperative that this is called - * after you are finished with the handle. - */ - void ReleaseCurlHandle(CURL* handle); + /** + * Simple Connection pool manager for Curl. It maintains connections in a thread safe manner. You + * can call into acquire a handle, then put it back when finished. It is assumed that reusing an already + * initialized handle is preferable (especially for synchronous clients). The pool doubles in capacity as + * needed up to the maximum amount of connections. + */ + class CurlMultiHandleContainer + { + public: + /** + * Initializes an empty stack of CURL handles. + */ + CurlMultiHandleContainer(unsigned maxSize = 50, + long httpRequestTimeout = 0, + long connectTimeout = 1000, + bool tcpKeepAlive = true, + unsigned long tcpKeepAliveIntervalMs = 30000, + long lowSpeedTime = 3000, + unsigned long lowSpeedLimit = 1, + Version version = Version::HTTP_VERSION_2TLS); + ~CurlMultiHandleContainer(); - /** - * When the handle has bad DNS entries, problematic live connections, we need to destroy the handle from pool. - */ - void DestroyCurlHandle(CURL* handle); + /** + * Blocks until a curl handle from the pool is available for use. + */ + CurlEasyHandleContext* AcquireCurlHandle(); + /** + * Returns a handle to the pool for reuse. It is imperative that this is called + * after you are finished with the handle. + */ + void ReleaseCurlHandle(CurlEasyHandleContext* handleCtx); - inline CURLM* AccessCurlMultiHandle() - { - return m_curlMultiHandle; - } + /** + * When the handle has bad DNS entries, problematic live connections, we need to destroy the handle from pool. + */ + void DestroyCurlHandle(CurlEasyHandleContext* handleCtx); -private: - CurlMultiHandleContainer(const CurlMultiHandleContainer&) = delete; - const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&) = delete; - CurlMultiHandleContainer(const CurlMultiHandleContainer&&) = delete; - const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&&) = delete; + inline CURLM* AccessCurlMultiHandle() + { + return m_curlMultiHandle; + } - CURL* CreateCurlHandleInPool(); - bool CheckAndGrowPool(); - void SetDefaultOptionsOnHandle(CURL* handle); - static long ConvertHttpVersion(Version version); + private: + CurlMultiHandleContainer(const CurlMultiHandleContainer&) = delete; + const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&) = delete; + CurlMultiHandleContainer(const CurlMultiHandleContainer&&) = delete; + const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&&) = delete; - Aws::Utils::ExclusiveOwnershipResourceManager m_handleContainer; - CURLM* m_curlMultiHandle = nullptr; + CurlEasyHandleContext* CreateCurlHandleInPool(); + bool CheckAndGrowPool(); + void SetDefaultOptionsOnHandle(CurlEasyHandleContext& handleCtx); + static long ConvertHttpVersion(Version version); - unsigned m_maxPoolSize; - unsigned long m_httpRequestTimeout; - unsigned long m_connectTimeout; - bool m_enableTcpKeepAlive; - unsigned long m_tcpKeepAliveIntervalMs; - unsigned long m_lowSpeedTime; - unsigned long m_lowSpeedLimit; - unsigned m_poolSize; - std::mutex m_containerLock; - Version m_version; -}; + Aws::Utils::ExclusiveOwnershipResourceManager m_handleContainer; + CURLM* m_curlMultiHandle = nullptr; -} // namespace Http -} // namespace Aws + unsigned m_maxPoolSize; + unsigned long m_httpRequestTimeout; + unsigned long m_connectTimeout; + bool m_enableTcpKeepAlive; + unsigned long m_tcpKeepAliveIntervalMs; + unsigned long m_lowSpeedTime; + unsigned long m_lowSpeedLimit; + unsigned m_poolSize; + std::mutex m_containerLock; + Version m_version; + }; + } // namespace Curl + } // namespace Http +} // namespace Aws \ No newline at end of file diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h index 28f258dfd59..4d5b8f1e4ed 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h +++ b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h @@ -20,6 +20,10 @@ namespace Aws { namespace Http { +namespace Curl +{ + struct CurlEasyHandleContext; +} namespace Standard { class StandardHttpResponse; @@ -60,8 +64,6 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient bool disableExpectHeader = false; bool allowRedirects = false; }; - struct CurlEasyHandleContext; - using Base = HttpClient; //Creates client, initializes curl handle if it hasn't been created already. @@ -84,9 +86,9 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient virtual void OverrideOptionsOnConnectionHandle(CURL*) const {} private: - void SubmitTask(std::shared_ptr pEasyHandleCtx) const; + void SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const; - static std::shared_ptr HandleCurlResponse(std::shared_ptr pEasyHandleCtx); + static std::shared_ptr HandleCurlResponse(Curl::CurlEasyHandleContext* pEasyHandleCtx); static void CurlMultiPerformThread(CurlMultiHttpClient* pClient); std::thread m_multiHandleThread; @@ -97,11 +99,11 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient // mutable std::mutex m_tasksMutex; mutable std::atomic m_tasksQueued; mutable std::mutex m_tasksMutex; - mutable Aws::UnorderedMap> m_tasks; + mutable Aws::UnorderedMap> m_tasks; CurlMultiHttpClientConfig m_config; - mutable CurlMultiHandleContainer m_curlMultiHandleContainer; + mutable Curl::CurlMultiHandleContainer m_curlMultiHandleContainer; static std::atomic isGlobalStateInit; std::shared_ptr m_telemetryProvider; diff --git a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlEasyHandleContext.cpp b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlEasyHandleContext.cpp new file mode 100644 index 00000000000..80137b2af5b --- /dev/null +++ b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlEasyHandleContext.cpp @@ -0,0 +1,406 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Aws +{ +namespace Http +{ +namespace Curl +{ + +// TODO: enable memory allocation through SDK on curl +#ifdef AWS_CUSTOM_MEMORY_MANAGEMENT + +static const char* MemTag = "libcurl"; +static size_t offset = sizeof(size_t); + +static void* malloc_callback(size_t size) +{ + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, size + offset)); + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = size; + return reinterpret_cast(newMem + offset); +} + +static void free_callback(void* ptr) +{ + if(ptr) + { + char* shiftedMemory = reinterpret_cast(ptr); + Aws::Free(shiftedMemory - offset); + } +} + +static void* realloc_callback(void* ptr, size_t size) +{ + if(!ptr) + { + return malloc_callback(size); + } + + + if(!size && ptr) + { + free_callback(ptr); + return nullptr; + } + + char* originalLenCharPtr = reinterpret_cast(ptr) - offset; + size_t originalLen = *reinterpret_cast(originalLenCharPtr); + + char* rawMemory = reinterpret_cast(Aws::Malloc(MemTag, size + offset)); + if(rawMemory) + { + std::size_t* pointerToSize = reinterpret_cast(rawMemory); + *pointerToSize = size; + + size_t copyLength = (std::min)(originalLen, size); +#ifdef _MSC_VER + memcpy_s(rawMemory + offset, size, ptr, copyLength); +#else + memcpy(rawMemory + offset, ptr, copyLength); +#endif + free_callback(ptr); + return reinterpret_cast(rawMemory + offset); + } + else + { + return ptr; + } + +} + +static void* calloc_callback(size_t nmemb, size_t size) +{ + size_t dataSize = nmemb * size; + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, dataSize + offset)); + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = dataSize; +#ifdef _MSC_VER + memset_s(newMem + offset, dataSize, 0, dataSize); +#else + memset(newMem + offset, 0, dataSize); +#endif + + return reinterpret_cast(newMem + offset); +} + +static char* strdup_callback(const char* str) +{ + size_t len = strlen(str) + 1; + size_t newLen = len + offset; + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, newLen)); + + if(newMem) + { + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = len; +#ifdef _MSC_VER + memcpy_s(newMem + offset, len, str, len); +#else + memcpy(newMem + offset, str, len); +#endif + return newMem + offset; + } + return nullptr; +} + +#endif + +static const char* CURL_EASY_HANDLE_CTX_TAG = "CurlMultiHttpClient"; + +size_t CurlEasyHandleContext::WriteData(char* ptr, size_t size, size_t nmemb, void* userdata) +{ + if (ptr) + { + CurlEasyHandleContext* context = reinterpret_cast(userdata); + assert(context); + + const CurlMultiHttpClient* client = context->m_client; + assert(client); + HttpRequest* pRequest = context->writeContext.m_request; + assert(pRequest); + if(!client->ContinueRequest(*pRequest) || !client->IsRequestProcessingEnabled()) + { + return 0; + } + + HttpResponse* response = context->writeContext.m_response.get(); + assert(response); + + size_t sizeToWrite = size * nmemb; // TODO: check for overflow + if (context->writeContext.m_rateLimiter) + { + context->writeContext.m_rateLimiter->ApplyAndPayForCost(static_cast(sizeToWrite)); + } + + for (const auto& hashIterator : pRequest->GetResponseValidationHashes()) + { + hashIterator.second->Update(reinterpret_cast(ptr), sizeToWrite); + } + + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_EASY_HANDLE_CTX_TAG, "Response output stream in bad state (eof: " + << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + + size_t cur = response->GetResponseBody().tellp(); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_EASY_HANDLE_CTX_TAG, "Unable to query response output position (eof: " + << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + + response->GetResponseBody().write(ptr, static_cast(sizeToWrite)); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_EASY_HANDLE_CTX_TAG, "Failed to write " << size << " / " << sizeToWrite << " B response" + << " at " << cur << " (eof: " << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + if (pRequest->IsEventStreamRequest() && !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE)) + { + response->GetResponseBody().flush(); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_EASY_HANDLE_CTX_TAG, "Failed to flush event response (eof: " + << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + } + auto& receivedHandler = pRequest->GetDataReceivedEventHandler(); + if (receivedHandler) + { + receivedHandler(pRequest, response, static_cast(sizeToWrite)); + } + + AWS_LOGSTREAM_TRACE(CURL_EASY_HANDLE_CTX_TAG, sizeToWrite << " bytes written to response."); + context->writeContext.m_numBytesResponseReceived += sizeToWrite; + return sizeToWrite; + } + return 0; +} + +size_t CurlEasyHandleContext::WriteHeader(char* ptr, size_t size, size_t nmemb, void* userdata) +{ + if (ptr) + { + CurlEasyHandleContext* context = reinterpret_cast(userdata); + assert(context); + AWS_LOGSTREAM_TRACE(CURL_EASY_HANDLE_CTX_TAG, ptr); + + HttpResponse* response = context->writeContext.m_response.get(); + assert(response); + + Aws::String headerLine(ptr); + Aws::Vector keyValuePair = Aws::Utils::StringUtils::Split(headerLine, ':', 2); + + if (keyValuePair.size() == 2) + { + response->AddHeader(Aws::Utils::StringUtils::Trim(keyValuePair[0].c_str()), Aws::Utils::StringUtils::Trim(keyValuePair[1].c_str())); + } + + return size * nmemb; + } + return 0; +} + +size_t CurlEasyHandleContext::ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, bool isStreaming) +{ + CurlEasyHandleContext* context = reinterpret_cast(userdata); + if(!context->writeContext.m_HasBody) + { + return 0; + } + + const CurlMultiHttpClient* client = context->m_client; + assert(client); + + HttpRequest* request = context->writeContext.m_request; + assert(request); + + if(!client->ContinueRequest(*request) || !client->IsRequestProcessingEnabled()) + { + return CURL_READFUNC_ABORT; + } + + const std::shared_ptr& ioStream = request->GetContentBody(); + + size_t amountToRead = size * nmemb; // TODO: check for overflow + bool isAwsChunked = request->HasHeader(Aws::Http::CONTENT_ENCODING_HEADER) && + request->GetHeaderValue(Aws::Http::CONTENT_ENCODING_HEADER) == Aws::Http::AWS_CHUNKED_VALUE; + // aws-chunk = hex(chunk-size) + CRLF + chunk-data + CRLF + // Needs to reserve bytes of sizeof(hex(chunk-size)) + sizeof(CRLF) + sizeof(CRLF) + if (isAwsChunked) + { + Aws::String amountToReadHexString = Aws::Utils::StringUtils::ToHexString(amountToRead); + amountToRead -= (amountToReadHexString.size() + 4); + } + + if (ioStream != nullptr && amountToRead > 0) + { + if (isStreaming) + { + if (ioStream->readsome(ptr, amountToRead) == 0 && !ioStream->eof()) + { + return CURL_READFUNC_PAUSE; + } + } + else + { + ioStream->read(ptr, amountToRead); + } + size_t amountRead = static_cast(ioStream->gcount()); + + if (isAwsChunked) + { + if (amountRead > 0) + { + if (request->GetRequestHash().second != nullptr) + { + request->GetRequestHash().second->Update(reinterpret_cast(ptr), amountRead); + } + + Aws::String hex = Aws::Utils::StringUtils::ToHexString(amountRead); + memmove(ptr + hex.size() + 2, ptr, amountRead); + memmove(ptr + hex.size() + 2 + amountRead, "\r\n", 2); + memmove(ptr, hex.c_str(), hex.size()); + memmove(ptr + hex.size(), "\r\n", 2); + amountRead += hex.size() + 4; + } + else if (!context->readContext.m_chunkEnd) + { + Aws::StringStream chunkedTrailer; + chunkedTrailer << "0\r\n"; + if (request->GetRequestHash().second != nullptr) + { + chunkedTrailer << "x-amz-checksum-" << request->GetRequestHash().first << ":" + << Aws::Utils::HashingUtils::Base64Encode(request->GetRequestHash().second->GetHash().GetResult()) << "\r\n"; + } + chunkedTrailer << "\r\n"; + amountRead = chunkedTrailer.str().size(); + memcpy(ptr, chunkedTrailer.str().c_str(), amountRead); + context->readContext.m_chunkEnd = true; + } + } + + auto& sentHandler = request->GetDataSentEventHandler(); + if (sentHandler) + { + sentHandler(request, static_cast(amountRead)); + } + + if (context->readContext.m_rateLimiter) + { + context->readContext.m_rateLimiter->ApplyAndPayForCost(static_cast(amountRead)); + } + + return amountRead; + } + + return 0; +} + +size_t CurlEasyHandleContext::ReadBodyStreaming(char* ptr, size_t size, size_t nmemb, void* userdata) { + return ReadBody(ptr, size, nmemb, userdata, true); +} + +size_t CurlEasyHandleContext::ReadBodyFunc(char* ptr, size_t size, size_t nmemb, void* userdata) { + return ReadBody(ptr, size, nmemb, userdata, false); +} + +size_t CurlEasyHandleContext::SeekBody(void* userdata, curl_off_t offset, int origin) +{ + CurlEasyHandleContext* context = reinterpret_cast(userdata); + if(context == nullptr) + { + return CURL_SEEKFUNC_FAIL; + } + + const CurlMultiHttpClient* client = context->m_client; + assert(client); + HttpRequest* request = context->writeContext.m_request; + assert(request); + + if(!client->ContinueRequest(*request) || !client->IsRequestProcessingEnabled()) + { + return CURL_SEEKFUNC_FAIL; + } + + const std::shared_ptr& ioStream = request->GetContentBody(); + + std::ios_base::seekdir dir; + switch(origin) + { + case SEEK_SET: + dir = std::ios_base::beg; + break; + case SEEK_CUR: + dir = std::ios_base::cur; + break; + case SEEK_END: + dir = std::ios_base::end; + break; + default: + return CURL_SEEKFUNC_FAIL; + } + + ioStream->clear(); + ioStream->seekg(offset, dir); + if (ioStream->fail()) { + return CURL_SEEKFUNC_CANTSEEK; + } + + return CURL_SEEKFUNC_OK; +} +#if LIBCURL_VERSION_NUM >= 0x072000 // 7.32.0 +int CurlEasyHandleContext::CurlProgressCallback(void *userdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) +#else +int CurlEasyHandleContext::CurlProgressCallback(void *userdata, double, double, double, double) +#endif +{ + CurlEasyHandleContext* context = reinterpret_cast(userdata); + assert(context && context->writeContext.m_request); + + const std::shared_ptr& ioStream = context->writeContext.m_request->GetContentBody(); + if (ioStream->eof()) + { + curl_easy_pause(context->m_curlEasyHandle, CURLPAUSE_CONT); + return 0; + } + char output[1]; + if (ioStream->readsome(output, 1) > 0) + { + ioStream->unget(); + if (!ioStream->good()) + { + AWS_LOGSTREAM_WARN(CURL_EASY_HANDLE_CTX_TAG, "Input stream failed to perform unget()."); + } + curl_easy_pause(context->m_curlEasyHandle, CURLPAUSE_CONT); + } + + return 0; +} + +} // namespace Curl +} // namespace Http +} // namespace Aws \ No newline at end of file diff --git a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp index ea1232737c1..f07de957531 100644 --- a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp +++ b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp @@ -4,51 +4,67 @@ */ #include +#include #include #include -using namespace Aws::Utils::Logging; -using namespace Aws::Http; +namespace Aws +{ +namespace Http +{ +namespace Curl +{ static const char* CURL_HANDLE_CONTAINER_TAG = "CurlMultiHandleContainer"; -CurlMultiHandleContainer::CurlMultiHandleContainer(unsigned maxSize, long httpRequestTimeout, long connectTimeout, bool enableTcpKeepAlive, - unsigned long tcpKeepAliveIntervalMs, long lowSpeedTime, unsigned long lowSpeedLimit, - Version version) : +CurlMultiHandleContainer::CurlMultiHandleContainer(unsigned maxSize, + long httpRequestTimeout, + long connectTimeout, + bool enableTcpKeepAlive, + unsigned long tcpKeepAliveIntervalMs, + long lowSpeedTime, + unsigned long lowSpeedLimit, + Version version) : m_maxPoolSize(maxSize), m_httpRequestTimeout(httpRequestTimeout), m_connectTimeout(connectTimeout), m_enableTcpKeepAlive(enableTcpKeepAlive), m_tcpKeepAliveIntervalMs(tcpKeepAliveIntervalMs), m_lowSpeedTime(lowSpeedTime), m_lowSpeedLimit(lowSpeedLimit), m_poolSize(0), m_version(version) { AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Initializing CurlMultiHandleContainer with size " << maxSize); - this->m_curlMultiHandle = curl_multi_init(); - AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "CURLMOPT_MAXCONNECTS = " << m_maxPoolSize); - curl_multi_setopt(this->m_curlMultiHandle, CURLMOPT_MAXCONNECTS, m_maxPoolSize); -// this->m_curlShare = curl_share_init(); -// CURLSHcode curlShareCode = curl_share_setopt(this->m_curlShare, CURLSHOPT_SHARE, CURL_LOCK_DATA_COOKIE); -// curlShareCode = curl_share_setopt(this->m_curlShare, CURLSHOPT_SHARE, CURL_LOCK_DATA_SSL_SESSION); -// assert(curlShareCode == CURLSHE_OK); -// AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Enabled CURLSHOPT_SHARE CURL_LOCK_DATA_SSL_SESSION"); + m_curlMultiHandle = curl_multi_init(); + assert(m_curlMultiHandle); + if(!m_curlMultiHandle) + { + AWS_LOGSTREAM_ERROR(CURL_HANDLE_CONTAINER_TAG, "curl_multi_init failed."); + return; + } + curl_multi_setopt(m_curlMultiHandle, CURLMOPT_MAXCONNECTS, m_maxPoolSize); + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Initialized curl multi handle " << m_curlMultiHandle); } CurlMultiHandleContainer::~CurlMultiHandleContainer() { AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Cleaning up CurlMultiHandleContainer."); - for (CURL* handle : m_handleContainer.ShutdownAndWait(m_poolSize)) + for (CurlEasyHandleContext* easyHandleContext : m_handleContainer.ShutdownAndWait(m_poolSize)) { - AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Cleaning up " << handle); - curl_easy_cleanup(handle); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Cleaning up " << easyHandleContext->m_curlEasyHandle); + assert(easyHandleContext->m_curlEasyHandle); + curl_easy_cleanup(easyHandleContext->m_curlEasyHandle); + + Aws::Delete(easyHandleContext); } - curl_multi_cleanup(m_curlMultiHandle); - m_curlMultiHandle = nullptr; -// curl_share_cleanup(m_curlShare); -// m_curlShare = nullptr; + if(m_curlMultiHandle) + { + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Cleaning up curl multi handdle" << m_curlMultiHandle); + curl_multi_cleanup(m_curlMultiHandle); + m_curlMultiHandle = nullptr; + } } -CURL* CurlMultiHandleContainer::AcquireCurlHandle() +CurlEasyHandleContext* CurlMultiHandleContainer::AcquireCurlHandle() { AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Attempting to acquire curl easy handle."); @@ -58,64 +74,77 @@ CURL* CurlMultiHandleContainer::AcquireCurlHandle() CheckAndGrowPool(); } - CURL* handle = m_handleContainer.Acquire(); + CurlEasyHandleContext* handle = m_handleContainer.Acquire(); AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Connection has been released. Continuing."); AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Returning connection handle " << handle); return handle; } -void CurlMultiHandleContainer::ReleaseCurlHandle(CURL* handle) +void CurlMultiHandleContainer::ReleaseCurlHandle(CurlEasyHandleContext* handleCtx) { + CURL* handle = handleCtx ? handleCtx->m_curlEasyHandle : nullptr; if (handle) { #if LIBCURL_VERSION_NUM >= 0x074D00 // 7.77.0 curl_easy_setopt(handle, CURLOPT_COOKIEFILE, NULL); // workaround a mem leak on curl #endif curl_easy_reset(handle); - SetDefaultOptionsOnHandle(handle); + SetDefaultOptionsOnHandle(*handleCtx); + curl_easy_setopt(handle, CURLOPT_PRIVATE, handleCtx); AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Releasing curl handle " << handle); - m_handleContainer.Release(handle); + m_handleContainer.Release(handleCtx); AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Notified waiting threads."); } } -void CurlMultiHandleContainer::DestroyCurlHandle(CURL* handle) +void CurlMultiHandleContainer::DestroyCurlHandle(CurlEasyHandleContext* handleCtx) { - if (!handle) + if(handleCtx && handleCtx->m_curlEasyHandle) { - return; + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Destroy curl handle: " << handleCtx->m_curlEasyHandle); + curl_easy_cleanup(handleCtx->m_curlEasyHandle); + handleCtx->m_curlEasyHandle = nullptr; + Aws::Delete(handleCtx); + handleCtx = nullptr; } - curl_easy_cleanup(handle); - AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Destroy curl handle: " << handle); { std::lock_guard locker(m_containerLock); // Other threads could be blocked and waiting on m_handleContainer.Acquire() // If the handle is not released back to the pool, it could create a deadlock // Create a new handle and release that into the pool - handle = CreateCurlHandleInPool(); + handleCtx = CreateCurlHandleInPool(); } - if (handle) + if (handleCtx) { - AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Created replacement handle and released to pool: " << handle); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Created replacement handle and released to pool: " << handleCtx->m_curlEasyHandle); } } -CURL* CurlMultiHandleContainer::CreateCurlHandleInPool() +CurlEasyHandleContext* CurlMultiHandleContainer::CreateCurlHandleInPool() { - CURL* curlHandle = curl_easy_init(); + CurlEasyHandleContext* handleCtx = Aws::New(CURL_HANDLE_CONTAINER_TAG); + if(!handleCtx) + { + assert(handleCtx); + AWS_LOGSTREAM_ERROR(CURL_HANDLE_CONTAINER_TAG, "curl_easy_init failed to allocate."); + return nullptr; + } - if (curlHandle) + handleCtx->m_curlEasyHandle = curl_easy_init(); + + if (handleCtx->m_curlEasyHandle) { - SetDefaultOptionsOnHandle(curlHandle); - m_handleContainer.Release(curlHandle); + SetDefaultOptionsOnHandle(*handleCtx); + curl_easy_setopt(handleCtx->m_curlEasyHandle, CURLOPT_PRIVATE, handleCtx); + m_handleContainer.Release(handleCtx); } else { AWS_LOGSTREAM_ERROR(CURL_HANDLE_CONTAINER_TAG, "curl_easy_init failed to allocate."); } - return curlHandle; + return handleCtx; } bool CurlMultiHandleContainer::CheckAndGrowPool() @@ -153,20 +182,56 @@ bool CurlMultiHandleContainer::CheckAndGrowPool() return false; } -void CurlMultiHandleContainer::SetDefaultOptionsOnHandle(CURL* handle) +void CurlMultiHandleContainer::SetDefaultOptionsOnHandle(CurlEasyHandleContext& handleCtx) { - //for timeouts to work in a multi-threaded context, - //always turn signals off. This also forces dns queries to - //not be included in the timeout calculations. - curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L); - curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, m_httpRequestTimeout); - curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT_MS, m_connectTimeout); - curl_easy_setopt(handle, CURLOPT_LOW_SPEED_LIMIT, m_lowSpeedLimit); - curl_easy_setopt(handle, CURLOPT_LOW_SPEED_TIME, m_lowSpeedTime < 1000 ? (m_lowSpeedTime == 0 ? 0 : 1) : m_lowSpeedTime / 1000); - curl_easy_setopt(handle, CURLOPT_TCP_KEEPALIVE, m_enableTcpKeepAlive ? 1L : 0L); - curl_easy_setopt(handle, CURLOPT_TCP_KEEPINTVL, m_tcpKeepAliveIntervalMs / 1000); - curl_easy_setopt(handle, CURLOPT_TCP_KEEPIDLE, m_tcpKeepAliveIntervalMs / 1000); - curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, ConvertHttpVersion(m_version)); + CURL* handle = handleCtx.m_curlEasyHandle; + assert(handle); + if(handle) { + handleCtx.writeContext.m_HasBody = false; + handleCtx.writeContext.m_request = nullptr; + handleCtx.writeContext.m_rateLimiter = nullptr; + handleCtx.writeContext.m_numBytesResponseReceived = 0; + + handleCtx.readContext.m_rateLimiter = 0; + handleCtx.readContext.m_request = 0; + handleCtx.readContext.m_chunkEnd = false; + if (handleCtx.m_curlHandleHeaders) + { + curl_slist_free_all(handleCtx.m_curlHandleHeaders); + handleCtx.m_curlHandleHeaders = nullptr; + } + handleCtx.curlResult = CURLE_FAILED_INIT; + handleCtx.curlResultMsg = nullptr; + + //for timeouts to work in a multi-threaded context, + //always turn signals off. This also forces dns queries to + //not be included in the timeout calculations. + curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L); + curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, m_httpRequestTimeout); + curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT_MS, m_connectTimeout); + curl_easy_setopt(handle, CURLOPT_LOW_SPEED_LIMIT, m_lowSpeedLimit); + curl_easy_setopt(handle, CURLOPT_LOW_SPEED_TIME, + m_lowSpeedTime < 1000 ? (m_lowSpeedTime == 0 ? 0 : 1) : m_lowSpeedTime / 1000); + curl_easy_setopt(handle, CURLOPT_TCP_KEEPALIVE, m_enableTcpKeepAlive ? 1L : 0L); + curl_easy_setopt(handle, CURLOPT_TCP_KEEPINTVL, m_tcpKeepAliveIntervalMs / 1000); + curl_easy_setopt(handle, CURLOPT_TCP_KEEPIDLE, m_tcpKeepAliveIntervalMs / 1000); + curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, ConvertHttpVersion(m_version)); + + // Set callbacks and their context + // Curl to SDK write + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CurlEasyHandleContext::WriteData); + curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handleCtx); + curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CurlEasyHandleContext::WriteHeader); + curl_easy_setopt(handle, CURLOPT_HEADERDATA, &handleCtx); + // SDK to Curl default (non streaming) read + curl_easy_setopt(handle, CURLOPT_READFUNCTION, CurlEasyHandleContext::ReadBodyFunc); + curl_easy_setopt(handle, CURLOPT_READDATA, &handleCtx); + curl_easy_setopt(handle, CURLOPT_SEEKFUNCTION, CurlEasyHandleContext::SeekBody); + curl_easy_setopt(handle, CURLOPT_SEEKDATA, &handleCtx); + + // enable the cookie engine without reading any initial cookies. + curl_easy_setopt(handle, CURLOPT_COOKIEFILE, ""); + } } long CurlMultiHandleContainer::ConvertHttpVersion(Version version) { @@ -218,3 +283,7 @@ long CurlMultiHandleContainer::ConvertHttpVersion(Version version) { return CURL_HTTP_VERSION_1_1; #endif } + +} // namespace Curl +} // namespace Http +} // namespace Aws \ No newline at end of file diff --git a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp index 0a32f5c2970..81e477cbf36 100644 --- a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp +++ b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp @@ -4,6 +4,7 @@ */ #include +#include #include #include #include @@ -123,64 +124,6 @@ static char* strdup_callback(const char* str) #endif -struct CurlWriteCallbackContext -{ - CurlWriteCallbackContext(const CurlMultiHttpClient* client, - HttpRequest* request, - std::shared_ptr response, - Aws::Utils::RateLimits::RateLimiterInterface* rateLimiter) : - m_client(client), - m_request(request), - m_response(std::move(response)), - m_rateLimiter(rateLimiter), - m_numBytesResponseReceived(0) - {} - - const CurlMultiHttpClient* m_client; - HttpRequest* m_request; - std::shared_ptr m_response; - Aws::Utils::RateLimits::RateLimiterInterface* m_rateLimiter; - int64_t m_numBytesResponseReceived; -}; - -struct CurlReadCallbackContext -{ - CurlReadCallbackContext(const CurlMultiHttpClient* client, CURL* curlHandle, HttpRequest* request, Aws::Utils::RateLimits::RateLimiterInterface* limiter) : - m_client(client), - m_curlHandle(curlHandle), - m_rateLimiter(limiter), - m_request(request), - m_chunkEnd(false) - {} - - const CurlMultiHttpClient* m_client; - CURL* m_curlHandle; - Aws::Utils::RateLimits::RateLimiterInterface* m_rateLimiter; - HttpRequest* m_request; - bool m_chunkEnd; -}; - -enum class ExecutionPolicy -{ - BLOCKING, - ASYNC -}; - -struct CurlMultiHttpClient::CurlEasyHandleContext -{ - ExecutionPolicy execPolicy; - CurlWriteCallbackContext writeContext; - CurlReadCallbackContext readContext; - Aws::Utils::DateTime startTransmissionTime; - curl_slist* curlHandleHeaders; - - std::function onCurlDoneFn; - // TODO: C++14: add "= nullptr" and other default initializers - CURLcode curlResult; - // ptr acquired by curl_multi_info_read, free-ed by curl - CURLMsg* curlResultMsg; -}; - static const char* CURL_HTTP_CLIENT_TAG = "CurlMultiHttpClient"; static int64_t GetContentLengthFromHeader(CURL* connectionHandle, @@ -198,264 +141,6 @@ static int64_t GetContentLengthFromHeader(CURL* connectionHandle, return hasContentLength ? static_cast(contentLength) : -1; } -static size_t WriteData(char* ptr, size_t size, size_t nmemb, void* userdata) -{ - if (ptr) - { - CurlWriteCallbackContext* context = reinterpret_cast(userdata); - - const CurlMultiHttpClient* client = context->m_client; - if(!client->ContinueRequest(*context->m_request) || !client->IsRequestProcessingEnabled()) - { - return 0; - } - - HttpResponse* response = context->m_response.get(); - size_t sizeToWrite = size * nmemb; - if (context->m_rateLimiter) - { - context->m_rateLimiter->ApplyAndPayForCost(static_cast(sizeToWrite)); - } - - for (const auto& hashIterator : context->m_request->GetResponseValidationHashes()) - { - hashIterator.second->Update(reinterpret_cast(ptr), sizeToWrite); - } - - if (response->GetResponseBody().fail()) { - const auto& ref = response->GetResponseBody(); - AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Response output stream in bad state (eof: " - << ref.eof() << ", bad: " << ref.bad() << ")"); - return 0; - } - - size_t cur = response->GetResponseBody().tellp(); - if (response->GetResponseBody().fail()) { - const auto& ref = response->GetResponseBody(); - AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Unable to query response output position (eof: " - << ref.eof() << ", bad: " << ref.bad() << ")"); - return 0; - } - - response->GetResponseBody().write(ptr, static_cast(sizeToWrite)); - if (response->GetResponseBody().fail()) { - const auto& ref = response->GetResponseBody(); - AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to write " << size << " / " << sizeToWrite << " B response" - << " at " << cur << " (eof: " << ref.eof() << ", bad: " << ref.bad() << ")"); - return 0; - } - if (context->m_request->IsEventStreamRequest() && !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE)) - { - response->GetResponseBody().flush(); - if (response->GetResponseBody().fail()) { - const auto& ref = response->GetResponseBody(); - AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to flush event response (eof: " - << ref.eof() << ", bad: " << ref.bad() << ")"); - return 0; - } - } - auto& receivedHandler = context->m_request->GetDataReceivedEventHandler(); - if (receivedHandler) - { - receivedHandler(context->m_request, context->m_response.get(), static_cast(sizeToWrite)); - } - - AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, sizeToWrite << " bytes written to response."); - context->m_numBytesResponseReceived += sizeToWrite; - return sizeToWrite; - } - return 0; -} - -static size_t WriteHeader(char* ptr, size_t size, size_t nmemb, void* userdata) -{ - if (ptr) - { - CurlWriteCallbackContext* context = reinterpret_cast(userdata); - AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, ptr); - HttpResponse* response = context->m_response.get(); - Aws::String headerLine(ptr); - Aws::Vector keyValuePair = StringUtils::Split(headerLine, ':', 2); - - if (keyValuePair.size() == 2) - { - response->AddHeader(StringUtils::Trim(keyValuePair[0].c_str()), StringUtils::Trim(keyValuePair[1].c_str())); - } - - return size * nmemb; - } - return 0; -} - -static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, bool isStreaming) -{ - CurlReadCallbackContext* context = reinterpret_cast(userdata); - if(context == nullptr) - { - return 0; - } - - const CurlMultiHttpClient* client = context->m_client; - if(!client->ContinueRequest(*context->m_request) || !client->IsRequestProcessingEnabled()) - { - return CURL_READFUNC_ABORT; - } - - HttpRequest* request = context->m_request; - const std::shared_ptr& ioStream = request->GetContentBody(); - - size_t amountToRead = size * nmemb; - bool isAwsChunked = request->HasHeader(Aws::Http::CONTENT_ENCODING_HEADER) && - request->GetHeaderValue(Aws::Http::CONTENT_ENCODING_HEADER) == Aws::Http::AWS_CHUNKED_VALUE; - // aws-chunk = hex(chunk-size) + CRLF + chunk-data + CRLF - // Needs to reserve bytes of sizeof(hex(chunk-size)) + sizeof(CRLF) + sizeof(CRLF) - if (isAwsChunked) - { - Aws::String amountToReadHexString = Aws::Utils::StringUtils::ToHexString(amountToRead); - amountToRead -= (amountToReadHexString.size() + 4); - } - - if (ioStream != nullptr && amountToRead > 0) - { - if (isStreaming) - { - if (ioStream->readsome(ptr, amountToRead) == 0 && !ioStream->eof()) - { - return CURL_READFUNC_PAUSE; - } - } - else - { - ioStream->read(ptr, amountToRead); - } - size_t amountRead = static_cast(ioStream->gcount()); - - if (isAwsChunked) - { - if (amountRead > 0) - { - if (request->GetRequestHash().second != nullptr) - { - request->GetRequestHash().second->Update(reinterpret_cast(ptr), amountRead); - } - - Aws::String hex = Aws::Utils::StringUtils::ToHexString(amountRead); - memmove(ptr + hex.size() + 2, ptr, amountRead); - memmove(ptr + hex.size() + 2 + amountRead, "\r\n", 2); - memmove(ptr, hex.c_str(), hex.size()); - memmove(ptr + hex.size(), "\r\n", 2); - amountRead += hex.size() + 4; - } - else if (!context->m_chunkEnd) - { - Aws::StringStream chunkedTrailer; - chunkedTrailer << "0\r\n"; - if (request->GetRequestHash().second != nullptr) - { - chunkedTrailer << "x-amz-checksum-" << request->GetRequestHash().first << ":" - << HashingUtils::Base64Encode(request->GetRequestHash().second->GetHash().GetResult()) << "\r\n"; - } - chunkedTrailer << "\r\n"; - amountRead = chunkedTrailer.str().size(); - memcpy(ptr, chunkedTrailer.str().c_str(), amountRead); - context->m_chunkEnd = true; - } - } - - auto& sentHandler = request->GetDataSentEventHandler(); - if (sentHandler) - { - sentHandler(request, static_cast(amountRead)); - } - - if (context->m_rateLimiter) - { - context->m_rateLimiter->ApplyAndPayForCost(static_cast(amountRead)); - } - - return amountRead; - } - - return 0; -} - -static size_t ReadBodyStreaming(char* ptr, size_t size, size_t nmemb, void* userdata) { - return ReadBody(ptr, size, nmemb, userdata, true); -} - -static size_t ReadBodyFunc(char* ptr, size_t size, size_t nmemb, void* userdata) { - return ReadBody(ptr, size, nmemb, userdata, false); -} - -static size_t SeekBody(void* userdata, curl_off_t offset, int origin) -{ - CurlReadCallbackContext* context = reinterpret_cast(userdata); - if(context == nullptr) - { - return CURL_SEEKFUNC_FAIL; - } - - const CurlMultiHttpClient* client = context->m_client; - if(!client->ContinueRequest(*context->m_request) || !client->IsRequestProcessingEnabled()) - { - return CURL_SEEKFUNC_FAIL; - } - - HttpRequest* request = context->m_request; - const std::shared_ptr& ioStream = request->GetContentBody(); - - std::ios_base::seekdir dir; - switch(origin) - { - case SEEK_SET: - dir = std::ios_base::beg; - break; - case SEEK_CUR: - dir = std::ios_base::cur; - break; - case SEEK_END: - dir = std::ios_base::end; - break; - default: - return CURL_SEEKFUNC_FAIL; - } - - ioStream->clear(); - ioStream->seekg(offset, dir); - if (ioStream->fail()) { - return CURL_SEEKFUNC_CANTSEEK; - } - - return CURL_SEEKFUNC_OK; -} -#if LIBCURL_VERSION_NUM >= 0x072000 // 7.32.0 -static int CurlProgressCallback(void *userdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) -#else -static int CurlProgressCallback(void *userdata, double, double, double, double) -#endif -{ - CurlReadCallbackContext* context = reinterpret_cast(userdata); - - const std::shared_ptr& ioStream = context->m_request->GetContentBody(); - if (ioStream->eof()) - { - curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT); - return 0; - } - char output[1]; - if (ioStream->readsome(output, 1) > 0) - { - ioStream->unget(); - if (!ioStream->good()) - { - AWS_LOGSTREAM_WARN(CURL_HTTP_CLIENT_TAG, "Input stream failed to perform unget()."); - } - curl_easy_pause(context->m_curlHandle, CURLPAUSE_CONT); - } - - return 0; -} - void SetOptCodeForHttpMethod(CURL* const requestHandle, Aws::Http::HttpRequest const * const request) { assert(requestHandle && request); @@ -627,15 +312,13 @@ void CurlMultiHttpClient::CurlMultiPerformThread(CurlMultiHttpClient* pClient) if(message->msg == CURLMSG_DONE) { CURL* easyHandle = message->easy_handle; - std::shared_ptr pEasyHandleCtx; - { - std::unique_lock lock(pClient->m_tasksMutex); - assert(pClient->m_tasks.find(easyHandle) != pClient->m_tasks.end()); - pEasyHandleCtx = pClient->m_tasks[easyHandle]; - } + assert(easyHandle); + Curl::CurlEasyHandleContext* pEasyHandleCtx = nullptr; + curl_easy_getinfo(easyHandle, CURLINFO_PRIVATE, &pEasyHandleCtx); assert(pEasyHandleCtx); + pEasyHandleCtx->curlResult = message->data.result; - pEasyHandleCtx->onCurlDoneFn(); + pEasyHandleCtx->m_onCurlDoneFn(); } else { assert(!"Todo"); } @@ -756,47 +439,41 @@ struct curl_slist* PrepareHeaders(const HttpRequest* request, const bool disable } -std::shared_ptr ConfigureEasyHandle( +void ConfigureEasyHandle( CurlMultiHttpClient const * const pClient, const Aws::Http::CurlMultiHttpClient::CurlMultiHttpClientConfig& config, std::function onDoneFn, - CURL* const connectionHandle, + Curl::CurlEasyHandleContext* const easyHandleContext, Aws::Http::HttpRequest* const request, const std::shared_ptr response, Aws::Utils::RateLimits::RateLimiterInterface* readLimiter, Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter) { - assert(connectionHandle && request && response); + assert(easyHandleContext && request && response); + assert(easyHandleContext->m_curlEasyHandle); struct curl_slist* headers = PrepareHeaders(request, config.disableExpectHeader); if (headers) { - curl_easy_setopt(connectionHandle, CURLOPT_HTTPHEADER, headers); + curl_easy_setopt(easyHandleContext->m_curlEasyHandle, CURLOPT_HTTPHEADER, headers); + easyHandleContext->m_curlHandleHeaders = headers; } - std::shared_ptr pHandleCtx = - Aws::MakeShared(CURL_HTTP_CLIENT_TAG, - CurlMultiHttpClient::CurlEasyHandleContext{ - ExecutionPolicy::BLOCKING, - CurlWriteCallbackContext(pClient, request, response, readLimiter), - CurlReadCallbackContext(pClient, connectionHandle, request, writeLimiter), - Aws::Utils::DateTime(), - headers, - onDoneFn, - CURLE_FAILED_INIT, - nullptr}); - - CurlMultiHttpClient::CurlEasyHandleContext& handleContext = *pHandleCtx; + easyHandleContext->writeContext.m_request = request; + easyHandleContext->writeContext.m_response = response; + easyHandleContext->writeContext.m_rateLimiter = readLimiter; // not a typo, the same in a legacy wrapper + easyHandleContext->readContext.m_request = request; + easyHandleContext->readContext.m_rateLimiter = writeLimiter; // not a typo, the same in a legacy wrapper + easyHandleContext->curlResult = CURLE_FAILED_INIT; + easyHandleContext->m_onCurlDoneFn = onDoneFn; + easyHandleContext->m_client = pClient; + CURL* connectionHandle = easyHandleContext->m_curlEasyHandle; SetOptCodeForHttpMethod(connectionHandle, request); - URI uri = request->GetUri(); + const URI& uri = request->GetUri(); Aws::String url = uri.GetURIString(); AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, "Making request to " << url); curl_easy_setopt(connectionHandle, CURLOPT_URL, url.c_str()); - curl_easy_setopt(connectionHandle, CURLOPT_WRITEFUNCTION, WriteData); - curl_easy_setopt(connectionHandle, CURLOPT_WRITEDATA, &handleContext.writeContext); - curl_easy_setopt(connectionHandle, CURLOPT_HEADERFUNCTION, WriteHeader); - curl_easy_setopt(connectionHandle, CURLOPT_HEADERDATA, &handleContext.writeContext); //we only want to override the default path if someone has explicitly told us to. if(!config.sslConfig.caPath.empty()) @@ -890,41 +567,33 @@ curl_easy_setopt(connectionHandle, CURLOPT_CAPATH, TEST_CERT_PATH); if (request->GetContentBody()) { - curl_easy_setopt(connectionHandle, CURLOPT_READFUNCTION, ReadBodyFunc); - curl_easy_setopt(connectionHandle, CURLOPT_READDATA, &handleContext.readContext); - curl_easy_setopt(connectionHandle, CURLOPT_SEEKFUNCTION, SeekBody); - curl_easy_setopt(connectionHandle, CURLOPT_SEEKDATA, &handleContext.readContext); + easyHandleContext->writeContext.m_HasBody = true; +// curl_easy_setopt(connectionHandle, CURLOPT_READFUNCTION, ReadBodyFunc); +// curl_easy_setopt(connectionHandle, CURLOPT_READDATA, &handleContext.readContext); +// curl_easy_setopt(connectionHandle, CURLOPT_SEEKFUNCTION, SeekBody); +// curl_easy_setopt(connectionHandle, CURLOPT_SEEKDATA, &handleContext.readContext); if (request->IsEventStreamRequest() && !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE)) { - curl_easy_setopt(connectionHandle, CURLOPT_READFUNCTION, ReadBodyStreaming); + curl_easy_setopt(connectionHandle, CURLOPT_READFUNCTION, Curl::CurlEasyHandleContext::ReadBodyStreaming); curl_easy_setopt(connectionHandle, CURLOPT_NOPROGRESS, 0L); #if LIBCURL_VERSION_NUM >= 0x072000 // 7.32.0 - curl_easy_setopt(connectionHandle, CURLOPT_XFERINFOFUNCTION, CurlProgressCallback); - curl_easy_setopt(connectionHandle, CURLOPT_XFERINFODATA, &handleContext.readContext); + curl_easy_setopt(connectionHandle, CURLOPT_XFERINFOFUNCTION, Curl::CurlEasyHandleContext::CurlProgressCallback); + curl_easy_setopt(connectionHandle, CURLOPT_XFERINFODATA, easyHandleContext); #else - curl_easy_setopt(connectionHandle, CURLOPT_PROGRESSFUNCTION, CurlProgressCallback); - curl_easy_setopt(connectionHandle, CURLOPT_PROGRESSDATA, &readContext); + curl_easy_setopt(connectionHandle, CURLOPT_PROGRESSFUNCTION, Curl::CurlEasyHandleContext::CurlProgressCallback); + curl_easy_setopt(connectionHandle, CURLOPT_PROGRESSDATA, easyHandleContext); #endif } } - - return pHandleCtx; } -std::shared_ptr CurlMultiHttpClient::HandleCurlResponse(std::shared_ptr pEasyHandleCtx) +std::shared_ptr CurlMultiHttpClient::HandleCurlResponse(Curl::CurlEasyHandleContext* pEasyHandleCtx) { const CURLcode curlResponseCode = pEasyHandleCtx->curlResult; Aws::Http::HttpRequest * const request = pEasyHandleCtx->writeContext.m_request; Aws::Http::HttpResponse * const response = pEasyHandleCtx->writeContext.m_response.get(); - CurlMultiHttpClient const * const client = pEasyHandleCtx->writeContext.m_client; - CURL* const connectionHandle = pEasyHandleCtx->readContext.m_curlHandle; - - { - // TODO: refactor - std::unique_lock lock(client->m_tasksMutex); - assert(client->m_tasks.find(connectionHandle) != client->m_tasks.end()); - client->m_tasks.erase(connectionHandle); - } + CurlMultiHttpClient const * const client = pEasyHandleCtx->m_client; + CURL* const connectionHandle = pEasyHandleCtx->m_curlEasyHandle; assert(request && response && client && connectionHandle); @@ -963,7 +632,7 @@ std::shared_ptr CurlMultiHttpClient::HandleCurlResponse(std::share GetContentLengthFromHeader(connectionHandle, hasContentLength); if (request->GetMethod() != HttpMethod::HTTP_HEAD && - pEasyHandleCtx->writeContext.m_client->IsRequestProcessingEnabled() && + client->IsRequestProcessingEnabled() && hasContentLength) { int64_t numBytesResponseReceived = pEasyHandleCtx->writeContext.m_numBytesResponseReceived; @@ -1016,14 +685,6 @@ std::shared_ptr CurlMultiHttpClient::HandleCurlResponse(std::share { request->SetResolvedRemoteHost(ip); } - if (curlResponseCode != CURLE_OK) - { - client->m_curlMultiHandleContainer.DestroyCurlHandle(connectionHandle); - } - else - { - client->m_curlMultiHandleContainer.ReleaseCurlHandle(connectionHandle); - } //go ahead and flush the response body stream response->GetResponseBody().flush(); if (response->GetResponseBody().fail()) { @@ -1037,24 +698,36 @@ std::shared_ptr CurlMultiHttpClient::HandleCurlResponse(std::share request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::RequestLatency), (DateTime::Now() - pEasyHandleCtx->startTransmissionTime).count()); - if (pEasyHandleCtx->curlHandleHeaders) + if (pEasyHandleCtx->m_curlHandleHeaders) { - curl_slist_free_all(pEasyHandleCtx->curlHandleHeaders); - pEasyHandleCtx->curlHandleHeaders = nullptr; + curl_slist_free_all(pEasyHandleCtx->m_curlHandleHeaders); + pEasyHandleCtx->m_curlHandleHeaders = nullptr; } curl_multi_remove_handle(client->m_curlMultiHandleContainer.AccessCurlMultiHandle(), connectionHandle); - return pEasyHandleCtx->writeContext.m_response; + std::shared_ptr res = std::move(pEasyHandleCtx->writeContext.m_response); + pEasyHandleCtx->writeContext.m_response.reset(); + + if (curlResponseCode != CURLE_OK) + { + client->m_curlMultiHandleContainer.DestroyCurlHandle(pEasyHandleCtx); + } + else + { + client->m_curlMultiHandleContainer.ReleaseCurlHandle(pEasyHandleCtx); + } + + return res; } -void CurlMultiHttpClient::SubmitTask(std::shared_ptr pEasyHandleCtx) const +void CurlMultiHttpClient::SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const { assert(pEasyHandleCtx); + AWS_UNREFERENCED_PARAM(pEasyHandleCtx); { std::unique_lock lock(m_tasksMutex); m_tasksQueued++; - m_tasks[pEasyHandleCtx->readContext.m_curlHandle] = std::move(pEasyHandleCtx); } { std::unique_lock lockGuard(m_signalMutex); @@ -1075,8 +748,8 @@ std::shared_ptr CurlMultiHttpClient::MakeRequest(const std::shared return response; } - CURL* connectionHandle = m_curlMultiHandleContainer.AcquireCurlHandle(); - if(!connectionHandle) + Curl::CurlEasyHandleContext* easyHandleContext = m_curlMultiHandleContainer.AcquireCurlHandle(); + if(!easyHandleContext) { response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); response->SetClientErrorMessage("Failed to Acquire curl handle"); @@ -1084,7 +757,7 @@ std::shared_ptr CurlMultiHttpClient::MakeRequest(const std::shared return response; } - AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Obtained connection handle " << connectionHandle); + AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Obtained connection handle " << easyHandleContext); std::mutex taskMutex; std::condition_variable signal; @@ -1096,15 +769,13 @@ std::shared_ptr CurlMultiHttpClient::MakeRequest(const std::shared signal.notify_one(); }; - std::shared_ptr pHandleCtx = ConfigureEasyHandle(this, m_config, onDone, - connectionHandle, request.get(), response, - readLimiter, writeLimiter); - assert(pHandleCtx); - OverrideOptionsOnConnectionHandle(connectionHandle); + ConfigureEasyHandle(this, m_config, onDone, easyHandleContext, request.get(), response, readLimiter, writeLimiter); + OverrideOptionsOnConnectionHandle(easyHandleContext->m_curlEasyHandle); - pHandleCtx->startTransmissionTime = Aws::Utils::DateTime::Now(); + easyHandleContext->startTransmissionTime = Aws::Utils::DateTime::Now(); - CURLMcode curlMultiResponseCode = curl_multi_add_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(), connectionHandle); + CURLMcode curlMultiResponseCode = curl_multi_add_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(), + easyHandleContext->m_curlEasyHandle); if (CURLM_OK != curlMultiResponseCode) { response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); @@ -1113,7 +784,7 @@ std::shared_ptr CurlMultiHttpClient::MakeRequest(const std::shared return response; } - SubmitTask(pHandleCtx); + SubmitTask(easyHandleContext); // Task submitted, wait for it's completion std::unique_lock lockGuard(taskMutex); @@ -1124,5 +795,5 @@ std::shared_ptr CurlMultiHttpClient::MakeRequest(const std::shared }); // This is a blocking mode, handle response within the submitter thread - return HandleCurlResponse(pHandleCtx); + return HandleCurlResponse(easyHandleContext); } From fcfea62a325993162f8e0bbbbd163a2c7742823c Mon Sep 17 00:00:00 2001 From: SergeyRyabinin Date: Tue, 19 Sep 2023 18:43:02 +0000 Subject: [PATCH 3/3] Basic error handling for curl multi --- .../http/curl-multi/CurlEasyHandleContext.h | 2 +- .../http/curl-multi/CurlMultiHttpClient.h | 7 +- .../http/curl-multi/CurlMultiHttpClient.cpp | 70 ++++++++++++++----- 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h index 23a9fb75045..f947b6475b1 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h +++ b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h @@ -69,7 +69,7 @@ namespace Http /* SDK side */ const CurlMultiHttpClient* m_client = nullptr; ExecutionPolicy m_execPolicy; - std::function m_onCurlDoneFn; + std::function m_onCurlDoneFn; // called in a main multi loop => must be lightweight. Aws::Utils::DateTime startTransmissionTime; /* Curl calls the SDK back */ diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h index 4d5b8f1e4ed..6b6497d0ce8 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h +++ b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h @@ -12,6 +12,7 @@ #include #include #include +#include #include #include #include @@ -86,10 +87,11 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient virtual void OverrideOptionsOnConnectionHandle(CURL*) const {} private: - void SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const; + bool SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const; static std::shared_ptr HandleCurlResponse(Curl::CurlEasyHandleContext* pEasyHandleCtx); static void CurlMultiPerformThread(CurlMultiHttpClient* pClient); + void CurlMultiPerformReset(); std::thread m_multiHandleThread; std::atomic m_isRunning; @@ -99,7 +101,8 @@ class AWS_CORE_API CurlMultiHttpClient: public HttpClient // mutable std::mutex m_tasksMutex; mutable std::atomic m_tasksQueued; mutable std::mutex m_tasksMutex; - mutable Aws::UnorderedMap> m_tasks; + // used to track tasks sent to multi handle, for handling multi perform errors + mutable Aws::UnorderedSet m_multiTasks; CurlMultiHttpClientConfig m_config; diff --git a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp index 81e477cbf36..6eccc6f053c 100644 --- a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp +++ b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp @@ -274,6 +274,24 @@ int CurlMultiDebugCallback(CURL *handle, curl_infotype type, char *data, size_t return 0; } +void CurlMultiHttpClient::CurlMultiPerformReset() +{ + // TODO: refactor + std::unique_lock lockGuard(m_signalMutex); + std::unique_lock lock(m_tasksMutex); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Removing all easy handles from a multi handle and triggering callbacks."); + + for(Curl::CurlEasyHandleContext* handleCtx : m_multiTasks) + { + curl_multi_remove_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(), handleCtx->m_curlEasyHandle); + handleCtx->curlResultMsg = nullptr; + handleCtx->curlResult = static_cast(-1); + handleCtx->m_onCurlDoneFn(); + m_multiTasks.erase(handleCtx); + } + m_multiTasks.clear(); +} + void CurlMultiHttpClient::CurlMultiPerformThread(CurlMultiHttpClient* pClient) { assert(pClient && pClient->m_curlMultiHandleContainer.AccessCurlMultiHandle()); @@ -297,12 +315,18 @@ void CurlMultiHttpClient::CurlMultiPerformThread(CurlMultiHttpClient* pClient) } if(!pClient->m_isRunning.load()) { - break; + break; } pClient->m_tasksQueued = 0; CURLMcode mc = curl_multi_perform(multi_handle, &stillRunning); + if(mc != CURLM_OK) + { + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Curl curl_multi_perform returned error code " << mc + << " resetting multi handle."); + pClient->CurlMultiPerformReset(); + } int msgQueue = 0; do { @@ -326,12 +350,14 @@ void CurlMultiHttpClient::CurlMultiPerformThread(CurlMultiHttpClient* pClient) } while(msgQueue > 0); if(!mc && stillRunning) - /* wait for activity, timeout or "nothing" */ - mc = curl_multi_poll(multi_handle, NULL, 0, 1000, NULL); - - if(mc) { - fprintf(stderr, "curl_multi_poll() failed, code %d.\n", (int)mc); - break; + { + /* wait for activity, timeout or "nothing" */ + mc = curl_multi_poll(multi_handle, NULL, 0, 1000, NULL); + if(mc) + { + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Curl curl_multi_poll returned error code " << mc); + break; + } } }; } @@ -705,6 +731,10 @@ std::shared_ptr CurlMultiHttpClient::HandleCurlResponse(Curl::Curl } curl_multi_remove_handle(client->m_curlMultiHandleContainer.AccessCurlMultiHandle(), connectionHandle); + { + std::unique_lock lock(client->m_tasksMutex); + client->m_multiTasks.erase(pEasyHandleCtx); + } std::shared_ptr res = std::move(pEasyHandleCtx->writeContext.m_response); pEasyHandleCtx->writeContext.m_response.reset(); @@ -721,12 +751,19 @@ std::shared_ptr CurlMultiHttpClient::HandleCurlResponse(Curl::Curl return res; } -void CurlMultiHttpClient::SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const +bool CurlMultiHttpClient::SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const { assert(pEasyHandleCtx); - AWS_UNREFERENCED_PARAM(pEasyHandleCtx); + CURLMcode curlMultiResponseCode = curl_multi_add_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(), + pEasyHandleCtx->m_curlEasyHandle); + if (CURLM_OK != curlMultiResponseCode) + { + return false; + } + { std::unique_lock lock(m_tasksMutex); + m_multiTasks.insert(pEasyHandleCtx); m_tasksQueued++; } { @@ -734,6 +771,7 @@ void CurlMultiHttpClient::SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx m_signalRunning.notify_one(); curl_multi_wakeup(m_curlMultiHandleContainer.AccessCurlMultiHandle()); } + return true; } // Blocking @@ -774,18 +812,14 @@ std::shared_ptr CurlMultiHttpClient::MakeRequest(const std::shared easyHandleContext->startTransmissionTime = Aws::Utils::DateTime::Now(); - CURLMcode curlMultiResponseCode = curl_multi_add_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(), - easyHandleContext->m_curlEasyHandle); - if (CURLM_OK != curlMultiResponseCode) + if(!SubmitTask(easyHandleContext)) { - response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); - response->SetClientErrorMessage("Failed to add curl_easy_handle to curl_multi_handle."); - AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to add curl_easy_handle to curl_multi_handle."); - return response; + response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage("Failed to add curl_easy_handle to curl_multi_handle."); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to add curl_easy_handle to curl_multi_handle."); + return response; } - SubmitTask(easyHandleContext); - // Task submitted, wait for it's completion std::unique_lock lockGuard(taskMutex); signal.wait(lockGuard,