diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h new file mode 100644 index 00000000000..f947b6475b1 --- /dev/null +++ b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlEasyHandleContext.h @@ -0,0 +1,99 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once + +#include +#include +#include + +#include +#include + +namespace Aws +{ +namespace Utils +{ +namespace RateLimits +{ + class RateLimiterInterface; +} +} + +namespace Http +{ + class CurlMultiHttpClient; + class HttpRequest; + class HttpResponse; + + namespace Curl + { + enum class ExecutionPolicy + { + BLOCKING, + ASYNC + }; + + /* A composition wrapper on curl easy handle to have curl handle and SDK context together*/ + struct CurlEasyHandleContext final + { + struct WriteContext final + { + WriteContext() = default; + + bool m_HasBody = false; + HttpRequest *m_request = nullptr; + std::shared_ptr m_response; + Aws::Utils::RateLimits::RateLimiterInterface *m_rateLimiter = nullptr; + int64_t m_numBytesResponseReceived = 0; + }; + + struct ReadContext final + { + ReadContext() = default; + + Aws::Utils::RateLimits::RateLimiterInterface *m_rateLimiter = nullptr; + HttpRequest *m_request = nullptr; + bool m_chunkEnd = false; + }; + + CurlEasyHandleContext() = default; + + /* Curl side */ + CURL* m_curlEasyHandle = nullptr; + // headers set on curl handle, to be cleaned up by SDK after the execution + curl_slist* m_curlHandleHeaders = nullptr; + + /* SDK side */ + const CurlMultiHttpClient* m_client = nullptr; + ExecutionPolicy m_execPolicy; + std::function m_onCurlDoneFn; // called in a main multi loop => must be lightweight. + Aws::Utils::DateTime startTransmissionTime; + + /* Curl calls the SDK back */ + WriteContext writeContext; + ReadContext readContext; + + /* SDK polls the curl result */ + CURLcode curlResult; + // ptr acquired by curl_multi_info_read, free-ed by curl in easy handle cleanup, do not free in SDK + CURLMsg* curlResultMsg = nullptr; + + /* callbacks set on easy handle */ + static size_t WriteData(char* ptr, size_t size, size_t nmemb, void* userdata); + static size_t WriteHeader(char* ptr, size_t size, size_t nmemb, void* userdata); + static size_t ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, bool isStreaming); + static size_t ReadBodyStreaming(char* ptr, size_t size, size_t nmemb, void* userdata); + static size_t ReadBodyFunc(char* ptr, size_t size, size_t nmemb, void* userdata); + static size_t SeekBody(void* userdata, curl_off_t offset, int origin); +#if LIBCURL_VERSION_NUM >= 0x072000 // 7.32.0 + static int CurlProgressCallback(void *userdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t); +#else + static int CurlProgressCallback(void *userdata, double, double, double, double); +#endif + }; + } // namespace Curl +} // namespace Http +} // namespace Aws \ No newline at end of file diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h new file mode 100644 index 00000000000..187f96df4da --- /dev/null +++ b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHandleContainer.h @@ -0,0 +1,92 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#pragma once + +#include +#include + +#include +#include + +namespace Aws +{ + namespace Http + { + namespace Curl + { + struct CurlEasyHandleContext; + + /** + * Simple Connection pool manager for Curl. It maintains connections in a thread safe manner. You + * can call into acquire a handle, then put it back when finished. It is assumed that reusing an already + * initialized handle is preferable (especially for synchronous clients). The pool doubles in capacity as + * needed up to the maximum amount of connections. + */ + class CurlMultiHandleContainer + { + public: + /** + * Initializes an empty stack of CURL handles. + */ + CurlMultiHandleContainer(unsigned maxSize = 50, + long httpRequestTimeout = 0, + long connectTimeout = 1000, + bool tcpKeepAlive = true, + unsigned long tcpKeepAliveIntervalMs = 30000, + long lowSpeedTime = 3000, + unsigned long lowSpeedLimit = 1, + Version version = Version::HTTP_VERSION_2TLS); + ~CurlMultiHandleContainer(); + + /** + * Blocks until a curl handle from the pool is available for use. + */ + CurlEasyHandleContext* AcquireCurlHandle(); + /** + * Returns a handle to the pool for reuse. It is imperative that this is called + * after you are finished with the handle. + */ + void ReleaseCurlHandle(CurlEasyHandleContext* handleCtx); + + /** + * When the handle has bad DNS entries, problematic live connections, we need to destroy the handle from pool. + */ + void DestroyCurlHandle(CurlEasyHandleContext* handleCtx); + + inline CURLM* AccessCurlMultiHandle() + { + return m_curlMultiHandle; + } + + private: + CurlMultiHandleContainer(const CurlMultiHandleContainer&) = delete; + const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&) = delete; + CurlMultiHandleContainer(const CurlMultiHandleContainer&&) = delete; + const CurlMultiHandleContainer& operator = (const CurlMultiHandleContainer&&) = delete; + + CurlEasyHandleContext* CreateCurlHandleInPool(); + bool CheckAndGrowPool(); + void SetDefaultOptionsOnHandle(CurlEasyHandleContext& handleCtx); + static long ConvertHttpVersion(Version version); + + Aws::Utils::ExclusiveOwnershipResourceManager m_handleContainer; + CURLM* m_curlMultiHandle = nullptr; + + unsigned m_maxPoolSize; + unsigned long m_httpRequestTimeout; + unsigned long m_connectTimeout; + bool m_enableTcpKeepAlive; + unsigned long m_tcpKeepAliveIntervalMs; + unsigned long m_lowSpeedTime; + unsigned long m_lowSpeedLimit; + unsigned m_poolSize; + std::mutex m_containerLock; + Version m_version; + }; + + } // namespace Curl + } // namespace Http +} // namespace Aws \ No newline at end of file diff --git a/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h new file mode 100644 index 00000000000..6b6497d0ce8 --- /dev/null +++ b/src/aws-cpp-sdk-core/include/aws/core/http/curl-multi/CurlMultiHttpClient.h @@ -0,0 +1,117 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + + +#pragma once + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Aws +{ +namespace Http +{ +namespace Curl +{ + struct CurlEasyHandleContext; +} +namespace Standard +{ + class StandardHttpResponse; +} + +//Curl implementation of an http client. Right now it is only synchronous. +class AWS_CORE_API CurlMultiHttpClient: public HttpClient +{ +public: + struct CurlMultiHttpClientConfig + { + struct ProxyConfig + { + bool isEnabled = false; + Aws::String userName; + Aws::String password; + Aws::String scheme; + Aws::String host; + Aws::String sslCertPath; + Aws::String sslCertType; + Aws::String sslKeyPath; + Aws::String sslKeyType; + Aws::String keyPasswd; + unsigned port = 0; + Aws::String nonProxyHosts; + }; + + struct SslConfig + { + bool verifySSL = true; + Aws::String caPath; + Aws::String caFile; + }; + + ProxyConfig proxyConfig; + SslConfig sslConfig; + + bool disableExpectHeader = false; + bool allowRedirects = false; + }; + using Base = HttpClient; + + //Creates client, initializes curl handle if it hasn't been created already. + CurlMultiHttpClient(const Aws::Client::ClientConfiguration& clientConfig); + virtual ~CurlMultiHttpClient(); + + //Makes request and receives response synchronously + std::shared_ptr MakeRequest(const std::shared_ptr& request, + Aws::Utils::RateLimits::RateLimiterInterface* readLimiter = nullptr, + Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter = nullptr) const override; + + static void InitGlobalState(); + static void CleanupGlobalState(); + +protected: + /** + * Override any configuration on CURL handle for each request before sending. + * The usage is to have a subclass of CurlMultiHttpClient and have your own implementation of this function to configure whatever you want on CURL handle. + */ + virtual void OverrideOptionsOnConnectionHandle(CURL*) const {} + +private: + bool SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const; + + static std::shared_ptr HandleCurlResponse(Curl::CurlEasyHandleContext* pEasyHandleCtx); + static void CurlMultiPerformThread(CurlMultiHttpClient* pClient); + void CurlMultiPerformReset(); + + std::thread m_multiHandleThread; + std::atomic m_isRunning; + mutable std::mutex m_signalMutex; + mutable std::condition_variable m_signalRunning; + + // mutable std::mutex m_tasksMutex; + mutable std::atomic m_tasksQueued; + mutable std::mutex m_tasksMutex; + // used to track tasks sent to multi handle, for handling multi perform errors + mutable Aws::UnorderedSet m_multiTasks; + + CurlMultiHttpClientConfig m_config; + + mutable Curl::CurlMultiHandleContainer m_curlMultiHandleContainer; + + static std::atomic isGlobalStateInit; + std::shared_ptr m_telemetryProvider; +}; + +} // namespace Http +} // namespace Aws + diff --git a/src/aws-cpp-sdk-core/include/aws/core/utils/ResourceManager.h b/src/aws-cpp-sdk-core/include/aws/core/utils/ResourceManager.h index 718c32beba6..2b754919d53 100644 --- a/src/aws-cpp-sdk-core/include/aws/core/utils/ResourceManager.h +++ b/src/aws-cpp-sdk-core/include/aws/core/utils/ResourceManager.h @@ -15,7 +15,7 @@ namespace Aws namespace Utils { /** - * Generic resource manager with Acquire/Release semantics. Acquire will block waiting on a an available resource. Release will + * Generic resource manager with Acquire/Release semantics. Acquire will block waiting on an available resource. Release will * cause one blocked acquisition to unblock. * * You must call ShutdownAndWait() when finished with this container, this unblocks the listening thread and gives you a chance to diff --git a/src/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp b/src/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp index 615d0de0a3f..158c741ffb8 100644 --- a/src/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp +++ b/src/aws-cpp-sdk-core/source/http/HttpClientFactory.cpp @@ -12,6 +12,7 @@ #endif #if ENABLE_CURL_CLIENT #include +#include #include #elif ENABLE_WINDOWS_CLIENT @@ -93,7 +94,7 @@ namespace Aws } #endif // ENABLE_WINDOWS_IXML_HTTP_REQUEST_2_CLIENT #elif ENABLE_CURL_CLIENT - return Aws::MakeShared(HTTP_CLIENT_FACTORY_ALLOCATION_TAG, clientConfiguration); + return Aws::MakeShared(HTTP_CLIENT_FACTORY_ALLOCATION_TAG, clientConfiguration); #else // When neither of these clients is enabled, gcc gives a warning (converted // to error by -Werror) about the unused clientConfiguration parameter. We diff --git a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlEasyHandleContext.cpp b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlEasyHandleContext.cpp new file mode 100644 index 00000000000..80137b2af5b --- /dev/null +++ b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlEasyHandleContext.cpp @@ -0,0 +1,406 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace Aws +{ +namespace Http +{ +namespace Curl +{ + +// TODO: enable memory allocation through SDK on curl +#ifdef AWS_CUSTOM_MEMORY_MANAGEMENT + +static const char* MemTag = "libcurl"; +static size_t offset = sizeof(size_t); + +static void* malloc_callback(size_t size) +{ + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, size + offset)); + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = size; + return reinterpret_cast(newMem + offset); +} + +static void free_callback(void* ptr) +{ + if(ptr) + { + char* shiftedMemory = reinterpret_cast(ptr); + Aws::Free(shiftedMemory - offset); + } +} + +static void* realloc_callback(void* ptr, size_t size) +{ + if(!ptr) + { + return malloc_callback(size); + } + + + if(!size && ptr) + { + free_callback(ptr); + return nullptr; + } + + char* originalLenCharPtr = reinterpret_cast(ptr) - offset; + size_t originalLen = *reinterpret_cast(originalLenCharPtr); + + char* rawMemory = reinterpret_cast(Aws::Malloc(MemTag, size + offset)); + if(rawMemory) + { + std::size_t* pointerToSize = reinterpret_cast(rawMemory); + *pointerToSize = size; + + size_t copyLength = (std::min)(originalLen, size); +#ifdef _MSC_VER + memcpy_s(rawMemory + offset, size, ptr, copyLength); +#else + memcpy(rawMemory + offset, ptr, copyLength); +#endif + free_callback(ptr); + return reinterpret_cast(rawMemory + offset); + } + else + { + return ptr; + } + +} + +static void* calloc_callback(size_t nmemb, size_t size) +{ + size_t dataSize = nmemb * size; + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, dataSize + offset)); + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = dataSize; +#ifdef _MSC_VER + memset_s(newMem + offset, dataSize, 0, dataSize); +#else + memset(newMem + offset, 0, dataSize); +#endif + + return reinterpret_cast(newMem + offset); +} + +static char* strdup_callback(const char* str) +{ + size_t len = strlen(str) + 1; + size_t newLen = len + offset; + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, newLen)); + + if(newMem) + { + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = len; +#ifdef _MSC_VER + memcpy_s(newMem + offset, len, str, len); +#else + memcpy(newMem + offset, str, len); +#endif + return newMem + offset; + } + return nullptr; +} + +#endif + +static const char* CURL_EASY_HANDLE_CTX_TAG = "CurlMultiHttpClient"; + +size_t CurlEasyHandleContext::WriteData(char* ptr, size_t size, size_t nmemb, void* userdata) +{ + if (ptr) + { + CurlEasyHandleContext* context = reinterpret_cast(userdata); + assert(context); + + const CurlMultiHttpClient* client = context->m_client; + assert(client); + HttpRequest* pRequest = context->writeContext.m_request; + assert(pRequest); + if(!client->ContinueRequest(*pRequest) || !client->IsRequestProcessingEnabled()) + { + return 0; + } + + HttpResponse* response = context->writeContext.m_response.get(); + assert(response); + + size_t sizeToWrite = size * nmemb; // TODO: check for overflow + if (context->writeContext.m_rateLimiter) + { + context->writeContext.m_rateLimiter->ApplyAndPayForCost(static_cast(sizeToWrite)); + } + + for (const auto& hashIterator : pRequest->GetResponseValidationHashes()) + { + hashIterator.second->Update(reinterpret_cast(ptr), sizeToWrite); + } + + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_EASY_HANDLE_CTX_TAG, "Response output stream in bad state (eof: " + << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + + size_t cur = response->GetResponseBody().tellp(); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_EASY_HANDLE_CTX_TAG, "Unable to query response output position (eof: " + << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + + response->GetResponseBody().write(ptr, static_cast(sizeToWrite)); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_EASY_HANDLE_CTX_TAG, "Failed to write " << size << " / " << sizeToWrite << " B response" + << " at " << cur << " (eof: " << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + if (pRequest->IsEventStreamRequest() && !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE)) + { + response->GetResponseBody().flush(); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + AWS_LOGSTREAM_ERROR(CURL_EASY_HANDLE_CTX_TAG, "Failed to flush event response (eof: " + << ref.eof() << ", bad: " << ref.bad() << ")"); + return 0; + } + } + auto& receivedHandler = pRequest->GetDataReceivedEventHandler(); + if (receivedHandler) + { + receivedHandler(pRequest, response, static_cast(sizeToWrite)); + } + + AWS_LOGSTREAM_TRACE(CURL_EASY_HANDLE_CTX_TAG, sizeToWrite << " bytes written to response."); + context->writeContext.m_numBytesResponseReceived += sizeToWrite; + return sizeToWrite; + } + return 0; +} + +size_t CurlEasyHandleContext::WriteHeader(char* ptr, size_t size, size_t nmemb, void* userdata) +{ + if (ptr) + { + CurlEasyHandleContext* context = reinterpret_cast(userdata); + assert(context); + AWS_LOGSTREAM_TRACE(CURL_EASY_HANDLE_CTX_TAG, ptr); + + HttpResponse* response = context->writeContext.m_response.get(); + assert(response); + + Aws::String headerLine(ptr); + Aws::Vector keyValuePair = Aws::Utils::StringUtils::Split(headerLine, ':', 2); + + if (keyValuePair.size() == 2) + { + response->AddHeader(Aws::Utils::StringUtils::Trim(keyValuePair[0].c_str()), Aws::Utils::StringUtils::Trim(keyValuePair[1].c_str())); + } + + return size * nmemb; + } + return 0; +} + +size_t CurlEasyHandleContext::ReadBody(char* ptr, size_t size, size_t nmemb, void* userdata, bool isStreaming) +{ + CurlEasyHandleContext* context = reinterpret_cast(userdata); + if(!context->writeContext.m_HasBody) + { + return 0; + } + + const CurlMultiHttpClient* client = context->m_client; + assert(client); + + HttpRequest* request = context->writeContext.m_request; + assert(request); + + if(!client->ContinueRequest(*request) || !client->IsRequestProcessingEnabled()) + { + return CURL_READFUNC_ABORT; + } + + const std::shared_ptr& ioStream = request->GetContentBody(); + + size_t amountToRead = size * nmemb; // TODO: check for overflow + bool isAwsChunked = request->HasHeader(Aws::Http::CONTENT_ENCODING_HEADER) && + request->GetHeaderValue(Aws::Http::CONTENT_ENCODING_HEADER) == Aws::Http::AWS_CHUNKED_VALUE; + // aws-chunk = hex(chunk-size) + CRLF + chunk-data + CRLF + // Needs to reserve bytes of sizeof(hex(chunk-size)) + sizeof(CRLF) + sizeof(CRLF) + if (isAwsChunked) + { + Aws::String amountToReadHexString = Aws::Utils::StringUtils::ToHexString(amountToRead); + amountToRead -= (amountToReadHexString.size() + 4); + } + + if (ioStream != nullptr && amountToRead > 0) + { + if (isStreaming) + { + if (ioStream->readsome(ptr, amountToRead) == 0 && !ioStream->eof()) + { + return CURL_READFUNC_PAUSE; + } + } + else + { + ioStream->read(ptr, amountToRead); + } + size_t amountRead = static_cast(ioStream->gcount()); + + if (isAwsChunked) + { + if (amountRead > 0) + { + if (request->GetRequestHash().second != nullptr) + { + request->GetRequestHash().second->Update(reinterpret_cast(ptr), amountRead); + } + + Aws::String hex = Aws::Utils::StringUtils::ToHexString(amountRead); + memmove(ptr + hex.size() + 2, ptr, amountRead); + memmove(ptr + hex.size() + 2 + amountRead, "\r\n", 2); + memmove(ptr, hex.c_str(), hex.size()); + memmove(ptr + hex.size(), "\r\n", 2); + amountRead += hex.size() + 4; + } + else if (!context->readContext.m_chunkEnd) + { + Aws::StringStream chunkedTrailer; + chunkedTrailer << "0\r\n"; + if (request->GetRequestHash().second != nullptr) + { + chunkedTrailer << "x-amz-checksum-" << request->GetRequestHash().first << ":" + << Aws::Utils::HashingUtils::Base64Encode(request->GetRequestHash().second->GetHash().GetResult()) << "\r\n"; + } + chunkedTrailer << "\r\n"; + amountRead = chunkedTrailer.str().size(); + memcpy(ptr, chunkedTrailer.str().c_str(), amountRead); + context->readContext.m_chunkEnd = true; + } + } + + auto& sentHandler = request->GetDataSentEventHandler(); + if (sentHandler) + { + sentHandler(request, static_cast(amountRead)); + } + + if (context->readContext.m_rateLimiter) + { + context->readContext.m_rateLimiter->ApplyAndPayForCost(static_cast(amountRead)); + } + + return amountRead; + } + + return 0; +} + +size_t CurlEasyHandleContext::ReadBodyStreaming(char* ptr, size_t size, size_t nmemb, void* userdata) { + return ReadBody(ptr, size, nmemb, userdata, true); +} + +size_t CurlEasyHandleContext::ReadBodyFunc(char* ptr, size_t size, size_t nmemb, void* userdata) { + return ReadBody(ptr, size, nmemb, userdata, false); +} + +size_t CurlEasyHandleContext::SeekBody(void* userdata, curl_off_t offset, int origin) +{ + CurlEasyHandleContext* context = reinterpret_cast(userdata); + if(context == nullptr) + { + return CURL_SEEKFUNC_FAIL; + } + + const CurlMultiHttpClient* client = context->m_client; + assert(client); + HttpRequest* request = context->writeContext.m_request; + assert(request); + + if(!client->ContinueRequest(*request) || !client->IsRequestProcessingEnabled()) + { + return CURL_SEEKFUNC_FAIL; + } + + const std::shared_ptr& ioStream = request->GetContentBody(); + + std::ios_base::seekdir dir; + switch(origin) + { + case SEEK_SET: + dir = std::ios_base::beg; + break; + case SEEK_CUR: + dir = std::ios_base::cur; + break; + case SEEK_END: + dir = std::ios_base::end; + break; + default: + return CURL_SEEKFUNC_FAIL; + } + + ioStream->clear(); + ioStream->seekg(offset, dir); + if (ioStream->fail()) { + return CURL_SEEKFUNC_CANTSEEK; + } + + return CURL_SEEKFUNC_OK; +} +#if LIBCURL_VERSION_NUM >= 0x072000 // 7.32.0 +int CurlEasyHandleContext::CurlProgressCallback(void *userdata, curl_off_t, curl_off_t, curl_off_t, curl_off_t) +#else +int CurlEasyHandleContext::CurlProgressCallback(void *userdata, double, double, double, double) +#endif +{ + CurlEasyHandleContext* context = reinterpret_cast(userdata); + assert(context && context->writeContext.m_request); + + const std::shared_ptr& ioStream = context->writeContext.m_request->GetContentBody(); + if (ioStream->eof()) + { + curl_easy_pause(context->m_curlEasyHandle, CURLPAUSE_CONT); + return 0; + } + char output[1]; + if (ioStream->readsome(output, 1) > 0) + { + ioStream->unget(); + if (!ioStream->good()) + { + AWS_LOGSTREAM_WARN(CURL_EASY_HANDLE_CTX_TAG, "Input stream failed to perform unget()."); + } + curl_easy_pause(context->m_curlEasyHandle, CURLPAUSE_CONT); + } + + return 0; +} + +} // namespace Curl +} // namespace Http +} // namespace Aws \ No newline at end of file diff --git a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp new file mode 100644 index 00000000000..f07de957531 --- /dev/null +++ b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHandleContainer.cpp @@ -0,0 +1,289 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include + +#include + +namespace Aws +{ +namespace Http +{ +namespace Curl +{ + +static const char* CURL_HANDLE_CONTAINER_TAG = "CurlMultiHandleContainer"; + + +CurlMultiHandleContainer::CurlMultiHandleContainer(unsigned maxSize, + long httpRequestTimeout, + long connectTimeout, + bool enableTcpKeepAlive, + unsigned long tcpKeepAliveIntervalMs, + long lowSpeedTime, + unsigned long lowSpeedLimit, + Version version) : + m_maxPoolSize(maxSize), m_httpRequestTimeout(httpRequestTimeout), m_connectTimeout(connectTimeout), m_enableTcpKeepAlive(enableTcpKeepAlive), + m_tcpKeepAliveIntervalMs(tcpKeepAliveIntervalMs), m_lowSpeedTime(lowSpeedTime), m_lowSpeedLimit(lowSpeedLimit), m_poolSize(0), + m_version(version) +{ + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Initializing CurlMultiHandleContainer with size " << maxSize); + + m_curlMultiHandle = curl_multi_init(); + assert(m_curlMultiHandle); + if(!m_curlMultiHandle) + { + AWS_LOGSTREAM_ERROR(CURL_HANDLE_CONTAINER_TAG, "curl_multi_init failed."); + return; + } + curl_multi_setopt(m_curlMultiHandle, CURLMOPT_MAXCONNECTS, m_maxPoolSize); + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Initialized curl multi handle " << m_curlMultiHandle); +} + +CurlMultiHandleContainer::~CurlMultiHandleContainer() +{ + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Cleaning up CurlMultiHandleContainer."); + for (CurlEasyHandleContext* easyHandleContext : m_handleContainer.ShutdownAndWait(m_poolSize)) + { + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Cleaning up " << easyHandleContext->m_curlEasyHandle); + assert(easyHandleContext->m_curlEasyHandle); + curl_easy_cleanup(easyHandleContext->m_curlEasyHandle); + + Aws::Delete(easyHandleContext); + } + + if(m_curlMultiHandle) + { + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Cleaning up curl multi handdle" << m_curlMultiHandle); + curl_multi_cleanup(m_curlMultiHandle); + m_curlMultiHandle = nullptr; + } +} + +CurlEasyHandleContext* CurlMultiHandleContainer::AcquireCurlHandle() +{ + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Attempting to acquire curl easy handle."); + + if(!m_handleContainer.HasResourcesAvailable()) + { + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "No current connections available in pool. Attempting to create new connections."); + CheckAndGrowPool(); + } + + CurlEasyHandleContext* handle = m_handleContainer.Acquire(); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Connection has been released. Continuing."); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Returning connection handle " << handle); + return handle; +} + +void CurlMultiHandleContainer::ReleaseCurlHandle(CurlEasyHandleContext* handleCtx) +{ + CURL* handle = handleCtx ? handleCtx->m_curlEasyHandle : nullptr; + if (handle) + { +#if LIBCURL_VERSION_NUM >= 0x074D00 // 7.77.0 + curl_easy_setopt(handle, CURLOPT_COOKIEFILE, NULL); // workaround a mem leak on curl +#endif + curl_easy_reset(handle); + SetDefaultOptionsOnHandle(*handleCtx); + curl_easy_setopt(handle, CURLOPT_PRIVATE, handleCtx); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Releasing curl handle " << handle); + m_handleContainer.Release(handleCtx); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Notified waiting threads."); + } +} + +void CurlMultiHandleContainer::DestroyCurlHandle(CurlEasyHandleContext* handleCtx) +{ + if(handleCtx && handleCtx->m_curlEasyHandle) + { + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Destroy curl handle: " << handleCtx->m_curlEasyHandle); + curl_easy_cleanup(handleCtx->m_curlEasyHandle); + handleCtx->m_curlEasyHandle = nullptr; + Aws::Delete(handleCtx); + handleCtx = nullptr; + } + + { + std::lock_guard locker(m_containerLock); + // Other threads could be blocked and waiting on m_handleContainer.Acquire() + // If the handle is not released back to the pool, it could create a deadlock + // Create a new handle and release that into the pool + handleCtx = CreateCurlHandleInPool(); + } + if (handleCtx) + { + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "Created replacement handle and released to pool: " << handleCtx->m_curlEasyHandle); + } +} + + +CurlEasyHandleContext* CurlMultiHandleContainer::CreateCurlHandleInPool() +{ + CurlEasyHandleContext* handleCtx = Aws::New(CURL_HANDLE_CONTAINER_TAG); + if(!handleCtx) + { + assert(handleCtx); + AWS_LOGSTREAM_ERROR(CURL_HANDLE_CONTAINER_TAG, "curl_easy_init failed to allocate."); + return nullptr; + } + + handleCtx->m_curlEasyHandle = curl_easy_init(); + + if (handleCtx->m_curlEasyHandle) + { + SetDefaultOptionsOnHandle(*handleCtx); + curl_easy_setopt(handleCtx->m_curlEasyHandle, CURLOPT_PRIVATE, handleCtx); + m_handleContainer.Release(handleCtx); + } + else + { + AWS_LOGSTREAM_ERROR(CURL_HANDLE_CONTAINER_TAG, "curl_easy_init failed to allocate."); + } + return handleCtx; +} + +bool CurlMultiHandleContainer::CheckAndGrowPool() +{ + std::lock_guard locker(m_containerLock); + if (m_poolSize < m_maxPoolSize) + { + unsigned multiplier = m_poolSize > 0 ? m_poolSize : 1; + unsigned amountToAdd = (std::min)(multiplier * 2, m_maxPoolSize - m_poolSize); + AWS_LOGSTREAM_DEBUG(CURL_HANDLE_CONTAINER_TAG, "attempting to grow pool size by " << amountToAdd); + + unsigned actuallyAdded = 0; + for (unsigned i = 0; i < amountToAdd; ++i) + { + CURL* curlHandle = CreateCurlHandleInPool(); + + if (curlHandle) + { + ++actuallyAdded; + } + else + { + break; + } + } + + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Pool grown by " << actuallyAdded); + m_poolSize += actuallyAdded; + + return actuallyAdded > 0; + } + + AWS_LOGSTREAM_INFO(CURL_HANDLE_CONTAINER_TAG, "Pool cannot be grown any further, already at max size."); + + return false; +} + +void CurlMultiHandleContainer::SetDefaultOptionsOnHandle(CurlEasyHandleContext& handleCtx) +{ + CURL* handle = handleCtx.m_curlEasyHandle; + assert(handle); + if(handle) { + handleCtx.writeContext.m_HasBody = false; + handleCtx.writeContext.m_request = nullptr; + handleCtx.writeContext.m_rateLimiter = nullptr; + handleCtx.writeContext.m_numBytesResponseReceived = 0; + + handleCtx.readContext.m_rateLimiter = 0; + handleCtx.readContext.m_request = 0; + handleCtx.readContext.m_chunkEnd = false; + if (handleCtx.m_curlHandleHeaders) + { + curl_slist_free_all(handleCtx.m_curlHandleHeaders); + handleCtx.m_curlHandleHeaders = nullptr; + } + handleCtx.curlResult = CURLE_FAILED_INIT; + handleCtx.curlResultMsg = nullptr; + + //for timeouts to work in a multi-threaded context, + //always turn signals off. This also forces dns queries to + //not be included in the timeout calculations. + curl_easy_setopt(handle, CURLOPT_NOSIGNAL, 1L); + curl_easy_setopt(handle, CURLOPT_TIMEOUT_MS, m_httpRequestTimeout); + curl_easy_setopt(handle, CURLOPT_CONNECTTIMEOUT_MS, m_connectTimeout); + curl_easy_setopt(handle, CURLOPT_LOW_SPEED_LIMIT, m_lowSpeedLimit); + curl_easy_setopt(handle, CURLOPT_LOW_SPEED_TIME, + m_lowSpeedTime < 1000 ? (m_lowSpeedTime == 0 ? 0 : 1) : m_lowSpeedTime / 1000); + curl_easy_setopt(handle, CURLOPT_TCP_KEEPALIVE, m_enableTcpKeepAlive ? 1L : 0L); + curl_easy_setopt(handle, CURLOPT_TCP_KEEPINTVL, m_tcpKeepAliveIntervalMs / 1000); + curl_easy_setopt(handle, CURLOPT_TCP_KEEPIDLE, m_tcpKeepAliveIntervalMs / 1000); + curl_easy_setopt(handle, CURLOPT_HTTP_VERSION, ConvertHttpVersion(m_version)); + + // Set callbacks and their context + // Curl to SDK write + curl_easy_setopt(handle, CURLOPT_WRITEFUNCTION, CurlEasyHandleContext::WriteData); + curl_easy_setopt(handle, CURLOPT_WRITEDATA, &handleCtx); + curl_easy_setopt(handle, CURLOPT_HEADERFUNCTION, CurlEasyHandleContext::WriteHeader); + curl_easy_setopt(handle, CURLOPT_HEADERDATA, &handleCtx); + // SDK to Curl default (non streaming) read + curl_easy_setopt(handle, CURLOPT_READFUNCTION, CurlEasyHandleContext::ReadBodyFunc); + curl_easy_setopt(handle, CURLOPT_READDATA, &handleCtx); + curl_easy_setopt(handle, CURLOPT_SEEKFUNCTION, CurlEasyHandleContext::SeekBody); + curl_easy_setopt(handle, CURLOPT_SEEKDATA, &handleCtx); + + // enable the cookie engine without reading any initial cookies. + curl_easy_setopt(handle, CURLOPT_COOKIEFILE, ""); + } +} + +long CurlMultiHandleContainer::ConvertHttpVersion(Version version) { + if (version == Version::HTTP_VERSION_NONE) + { + return CURL_HTTP_VERSION_NONE; + } + else if (version == Version::HTTP_VERSION_1_0) + { + return CURL_HTTP_VERSION_1_0; + } + else if (version == Version::HTTP_VERSION_1_1) + { + return CURL_HTTP_VERSION_1_1; + } +#if LIBCURL_VERSION_NUM >= 0x072100 // 7.33.0 + else if (version == Version::HTTP_VERSION_2_0) + { + return CURL_HTTP_VERSION_2_0; + } +#endif +#if LIBCURL_VERSION_NUM >= 0x072F00 // 7.47.0 + else if (version == Version::HTTP_VERSION_2TLS) + { + return CURL_HTTP_VERSION_2TLS; + } +#endif +#if LIBCURL_VERSION_NUM >= 0x073100 // 7.49.0 + else if (version == Version::HTTP_VERSION_2_PRIOR_KNOWLEDGE) + { + return CURL_HTTP_VERSION_2_PRIOR_KNOWLEDGE; + } +#endif +#if LIBCURL_VERSION_NUM >= 0x074200 // 7.66.0 + else if (version == Version::HTTP_VERSION_3) + { + return CURL_HTTP_VERSION_3; + } +#endif +#if LIBCURL_VERSION_NUM >= 0x075800 // 7.88.0 + else if (version == Version::HTTP_VERSION_3ONLY) + { + return CURL_HTTP_VERSION_3ONLY; + } +#endif +#if LIBCURL_VERSION_NUM >= 0x073E00 // 7.62.0 + return CURL_HTTP_VERSION_2TLS; +#else + return CURL_HTTP_VERSION_1_1; +#endif +} + +} // namespace Curl +} // namespace Http +} // namespace Aws \ No newline at end of file diff --git a/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp new file mode 100644 index 00000000000..6eccc6f053c --- /dev/null +++ b/src/aws-cpp-sdk-core/source/http/curl-multi/CurlMultiHttpClient.cpp @@ -0,0 +1,833 @@ +/** + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * SPDX-License-Identifier: Apache-2.0. + */ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + + +using namespace Aws::Client; +using namespace Aws::Http; +using namespace Aws::Http::Standard; +using namespace Aws::Utils; +using namespace Aws::Utils::Logging; +using namespace Aws::Monitoring; + +#ifdef AWS_CUSTOM_MEMORY_MANAGEMENT + +static const char* MemTag = "libcurl"; +static size_t offset = sizeof(size_t); + +static void* malloc_callback(size_t size) +{ + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, size + offset)); + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = size; + return reinterpret_cast(newMem + offset); +} + +static void free_callback(void* ptr) +{ + if(ptr) + { + char* shiftedMemory = reinterpret_cast(ptr); + Aws::Free(shiftedMemory - offset); + } +} + +static void* realloc_callback(void* ptr, size_t size) +{ + if(!ptr) + { + return malloc_callback(size); + } + + + if(!size && ptr) + { + free_callback(ptr); + return nullptr; + } + + char* originalLenCharPtr = reinterpret_cast(ptr) - offset; + size_t originalLen = *reinterpret_cast(originalLenCharPtr); + + char* rawMemory = reinterpret_cast(Aws::Malloc(MemTag, size + offset)); + if(rawMemory) + { + std::size_t* pointerToSize = reinterpret_cast(rawMemory); + *pointerToSize = size; + + size_t copyLength = (std::min)(originalLen, size); +#ifdef _MSC_VER + memcpy_s(rawMemory + offset, size, ptr, copyLength); +#else + memcpy(rawMemory + offset, ptr, copyLength); +#endif + free_callback(ptr); + return reinterpret_cast(rawMemory + offset); + } + else + { + return ptr; + } + +} + +static void* calloc_callback(size_t nmemb, size_t size) +{ + size_t dataSize = nmemb * size; + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, dataSize + offset)); + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = dataSize; +#ifdef _MSC_VER + memset_s(newMem + offset, dataSize, 0, dataSize); +#else + memset(newMem + offset, 0, dataSize); +#endif + + return reinterpret_cast(newMem + offset); +} + +static char* strdup_callback(const char* str) +{ + size_t len = strlen(str) + 1; + size_t newLen = len + offset; + char* newMem = reinterpret_cast(Aws::Malloc(MemTag, newLen)); + + if(newMem) + { + std::size_t* pointerToSize = reinterpret_cast(newMem); + *pointerToSize = len; +#ifdef _MSC_VER + memcpy_s(newMem + offset, len, str, len); +#else + memcpy(newMem + offset, str, len); +#endif + return newMem + offset; + } + return nullptr; +} + +#endif + +static const char* CURL_HTTP_CLIENT_TAG = "CurlMultiHttpClient"; + +static int64_t GetContentLengthFromHeader(CURL* connectionHandle, + bool& hasContentLength) { +#if LIBCURL_VERSION_NUM >= 0x073700 // 7.55.0 + curl_off_t contentLength = {}; + CURLcode res = curl_easy_getinfo( + connectionHandle, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &contentLength); +#else + double contentLength = {}; + CURLcode res = curl_easy_getinfo( + connectionHandle, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &contentLength); +#endif + hasContentLength = (res == CURLE_OK) && (contentLength != -1); + return hasContentLength ? static_cast(contentLength) : -1; +} + +void SetOptCodeForHttpMethod(CURL* const requestHandle, Aws::Http::HttpRequest const * const request) +{ + assert(requestHandle && request); + switch (request->GetMethod()) + { + case HttpMethod::HTTP_GET: + curl_easy_setopt(requestHandle, CURLOPT_HTTPGET, 1L); + break; + case HttpMethod::HTTP_POST: + if (request->HasHeader(Aws::Http::CONTENT_LENGTH_HEADER) && request->GetHeaderValue(Aws::Http::CONTENT_LENGTH_HEADER) == "0") + { + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "POST"); + } + else + { + curl_easy_setopt(requestHandle, CURLOPT_POST, 1L); + } + break; + case HttpMethod::HTTP_PUT: + if ((!request->HasHeader(Aws::Http::CONTENT_LENGTH_HEADER) || request->GetHeaderValue(Aws::Http::CONTENT_LENGTH_HEADER) == "0") && + !request->HasHeader(Aws::Http::TRANSFER_ENCODING_HEADER)) + { + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "PUT"); + } + else + { +#if LIBCURL_VERSION_NUM >= 0x070c01 // 7.12.1 + curl_easy_setopt(requestHandle, CURLOPT_UPLOAD, 1L); +#else + curl_easy_setopt(requestHandle, CURLOPT_PUT, 1L); +#endif + } + break; + case HttpMethod::HTTP_HEAD: + curl_easy_setopt(requestHandle, CURLOPT_HTTPGET, 1L); + curl_easy_setopt(requestHandle, CURLOPT_NOBODY, 1L); + break; + case HttpMethod::HTTP_PATCH: + if ((!request->HasHeader(Aws::Http::CONTENT_LENGTH_HEADER)|| request->GetHeaderValue(Aws::Http::CONTENT_LENGTH_HEADER) == "0") && + !request->HasHeader(Aws::Http::TRANSFER_ENCODING_HEADER)) + { + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "PATCH"); + } + else + { + curl_easy_setopt(requestHandle, CURLOPT_POST, 1L); + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "PATCH"); + } + + break; + case HttpMethod::HTTP_DELETE: + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "DELETE"); + break; + default: + assert(0); + curl_easy_setopt(requestHandle, CURLOPT_CUSTOMREQUEST, "GET"); + break; + } +} + + +std::atomic CurlMultiHttpClient::isGlobalStateInit(false); + +void CurlMultiHttpClient::InitGlobalState() +{ + if (!isGlobalStateInit) + { + auto curlVersionData = curl_version_info(CURLVERSION_NOW); + AWS_LOGSTREAM_INFO(CURL_HTTP_CLIENT_TAG, "Initializing Curl library with version: " << curlVersionData->version + << ", ssl version: " << curlVersionData->ssl_version); + isGlobalStateInit = true; +#ifdef AWS_CUSTOM_MEMORY_MANAGEMENT + curl_global_init_mem(CURL_GLOBAL_ALL, &malloc_callback, &free_callback, &realloc_callback, &strdup_callback, &calloc_callback); +#else + curl_global_init(CURL_GLOBAL_ALL); +#endif + } +} + + +void CurlMultiHttpClient::CleanupGlobalState() +{ + curl_global_cleanup(); +} + +Aws::String CurlMultiInfoTypeToString(curl_infotype type) +{ + switch(type) + { + case CURLINFO_TEXT: + return "Text"; + + case CURLINFO_HEADER_IN: + return "HeaderIn"; + + case CURLINFO_HEADER_OUT: + return "HeaderOut"; + + case CURLINFO_DATA_IN: + return "DataIn"; + + case CURLINFO_DATA_OUT: + return "DataOut"; + + case CURLINFO_SSL_DATA_IN: + return "SSLDataIn"; + + case CURLINFO_SSL_DATA_OUT: + return "SSLDataOut"; + + default: + return "Unknown"; + } +} + +int CurlMultiDebugCallback(CURL *handle, curl_infotype type, char *data, size_t size, void *userptr) +{ + AWS_UNREFERENCED_PARAM(handle); + AWS_UNREFERENCED_PARAM(userptr); + + if(type == CURLINFO_SSL_DATA_IN || type == CURLINFO_SSL_DATA_OUT) + { + AWS_LOGSTREAM_DEBUG("CURL", "(" << CurlMultiInfoTypeToString(type) << ") " << size << "bytes"); + } + else + { + Aws::String debugString(data, size); + AWS_LOGSTREAM_DEBUG("CURL", "(" << CurlMultiInfoTypeToString(type) << ") " << debugString); + } + + return 0; +} + +void CurlMultiHttpClient::CurlMultiPerformReset() +{ + // TODO: refactor + std::unique_lock lockGuard(m_signalMutex); + std::unique_lock lock(m_tasksMutex); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Removing all easy handles from a multi handle and triggering callbacks."); + + for(Curl::CurlEasyHandleContext* handleCtx : m_multiTasks) + { + curl_multi_remove_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(), handleCtx->m_curlEasyHandle); + handleCtx->curlResultMsg = nullptr; + handleCtx->curlResult = static_cast(-1); + handleCtx->m_onCurlDoneFn(); + m_multiTasks.erase(handleCtx); + } + m_multiTasks.clear(); +} + +void CurlMultiHttpClient::CurlMultiPerformThread(CurlMultiHttpClient* pClient) +{ + assert(pClient && pClient->m_curlMultiHandleContainer.AccessCurlMultiHandle()); + CURLM *multi_handle = pClient->m_curlMultiHandleContainer.AccessCurlMultiHandle(); + + int stillRunning = 0; + while(pClient->m_isRunning) + { + // wait for new task signal + { + std::unique_lock lockGuard(pClient->m_signalMutex); + pClient->m_signalRunning.wait(lockGuard, + [pClient, &stillRunning] + { + // return true to unlock + if(!pClient->m_isRunning.load()) + return true; + + return pClient->m_tasksQueued || stillRunning; + }); + } + if(!pClient->m_isRunning.load()) + { + break; + } + + pClient->m_tasksQueued = 0; + + CURLMcode mc = curl_multi_perform(multi_handle, &stillRunning); + if(mc != CURLM_OK) + { + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Curl curl_multi_perform returned error code " << mc + << " resetting multi handle."); + pClient->CurlMultiPerformReset(); + } + int msgQueue = 0; + do { + + struct CURLMsg* message = curl_multi_info_read(multi_handle, &msgQueue); + if(message) + { + if(message->msg == CURLMSG_DONE) + { + CURL* easyHandle = message->easy_handle; + assert(easyHandle); + Curl::CurlEasyHandleContext* pEasyHandleCtx = nullptr; + curl_easy_getinfo(easyHandle, CURLINFO_PRIVATE, &pEasyHandleCtx); + assert(pEasyHandleCtx); + + pEasyHandleCtx->curlResult = message->data.result; + pEasyHandleCtx->m_onCurlDoneFn(); + } else { + assert(!"Todo"); + } + } + } while(msgQueue > 0); + + if(!mc && stillRunning) + { + /* wait for activity, timeout or "nothing" */ + mc = curl_multi_poll(multi_handle, NULL, 0, 1000, NULL); + if(mc) + { + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Curl curl_multi_poll returned error code " << mc); + break; + } + } + }; +} + +CurlMultiHttpClient::CurlMultiHttpClient(const ClientConfiguration& clientConfig) : + Base(), + m_curlMultiHandleContainer(clientConfig.maxConnections, clientConfig.httpRequestTimeoutMs, clientConfig.connectTimeoutMs, clientConfig.enableTcpKeepAlive, + clientConfig.tcpKeepAliveIntervalMs, clientConfig.requestTimeoutMs, clientConfig.lowSpeedLimit, clientConfig.version), + m_telemetryProvider(clientConfig.telemetryProvider) +{ + m_config.proxyConfig.isEnabled = !clientConfig.proxyHost.empty(); + m_config.proxyConfig.userName = clientConfig.proxyUserName; + m_config.proxyConfig.password = clientConfig.proxyPassword; + m_config.proxyConfig.scheme = SchemeMapper::ToString(clientConfig.proxyScheme); + m_config.proxyConfig.host = clientConfig.proxyHost; + m_config.proxyConfig.sslCertPath = clientConfig.proxySSLCertPath; + m_config.proxyConfig.sslCertType = clientConfig.proxySSLCertType; + m_config.proxyConfig.sslKeyPath = clientConfig.proxySSLKeyPath; + m_config.proxyConfig.sslKeyType = clientConfig.proxySSLKeyType; + m_config.proxyConfig.keyPasswd = clientConfig.proxySSLKeyPassword; + m_config.proxyConfig.port = clientConfig.proxyPort; + m_config.sslConfig.verifySSL = clientConfig.verifySSL; + m_config.sslConfig.caPath = clientConfig.caPath; + m_config.sslConfig.caFile = clientConfig.caFile; + m_config.disableExpectHeader = clientConfig.disableExpectHeader; + + if (clientConfig.followRedirects == FollowRedirectsPolicy::NEVER || + (clientConfig.followRedirects == FollowRedirectsPolicy::DEFAULT && clientConfig.region == Aws::Region::AWS_GLOBAL)) + { + m_config.allowRedirects = false; + } + else + { + m_config.allowRedirects = true; + } + if(clientConfig.nonProxyHosts.GetLength() > 0) + { + Aws::StringStream ss; + ss << clientConfig.nonProxyHosts.GetItem(0); + for (auto i=1u; i < clientConfig.nonProxyHosts.GetLength(); i++) + { + ss << "," << clientConfig.nonProxyHosts.GetItem(i); + } + m_config.proxyConfig.nonProxyHosts = ss.str(); + } + + // LAUNCH + m_isRunning = true; + m_tasksQueued = 0; + m_multiHandleThread = std::thread(CurlMultiPerformThread, this); +} + +CurlMultiHttpClient::~CurlMultiHttpClient() +{ + m_isRunning.store(false); + { + std::unique_lock lockGuard(m_signalMutex); + m_signalRunning.notify_all(); + } + m_multiHandleThread.join(); + + curl_multi_cleanup(m_curlMultiHandleContainer.AccessCurlMultiHandle()); +} + +struct curl_slist* PrepareHeaders(const HttpRequest* request, const bool disableExpectHeader) +{ + assert(request); + struct curl_slist* headers = NULL; + + Aws::StringStream headerStream; + HeaderValueCollection requestHeaders = request->GetHeaders(); + + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, "Including headers:"); + for (auto& requestHeader : requestHeaders) + { + headerStream.str(""); + headerStream << requestHeader.first << ": " << requestHeader.second; + Aws::String headerString = headerStream.str(); + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, headerString); + headers = curl_slist_append(headers, headerString.c_str()); + } + + if (!request->HasHeader(Aws::Http::TRANSFER_ENCODING_HEADER)) + { + headers = curl_slist_append(headers, "transfer-encoding:"); + } + + if (!request->HasHeader(Aws::Http::CONTENT_LENGTH_HEADER)) + { + headers = curl_slist_append(headers, "content-length:"); + } + + if (!request->HasHeader(Aws::Http::CONTENT_TYPE_HEADER)) + { + headers = curl_slist_append(headers, "content-type:"); + } + + // Discard Expect header so as to avoid using multiple payloads to send a http request (header + body) + if (disableExpectHeader) + { + headers = curl_slist_append(headers, "Expect:"); + } + + return headers; +} + + +void ConfigureEasyHandle( + CurlMultiHttpClient const * const pClient, + const Aws::Http::CurlMultiHttpClient::CurlMultiHttpClientConfig& config, + std::function onDoneFn, + Curl::CurlEasyHandleContext* const easyHandleContext, + Aws::Http::HttpRequest* const request, + const std::shared_ptr response, + Aws::Utils::RateLimits::RateLimiterInterface* readLimiter, + Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter) +{ + assert(easyHandleContext && request && response); + assert(easyHandleContext->m_curlEasyHandle); + struct curl_slist* headers = PrepareHeaders(request, config.disableExpectHeader); + if (headers) + { + curl_easy_setopt(easyHandleContext->m_curlEasyHandle, CURLOPT_HTTPHEADER, headers); + easyHandleContext->m_curlHandleHeaders = headers; + } + + easyHandleContext->writeContext.m_request = request; + easyHandleContext->writeContext.m_response = response; + easyHandleContext->writeContext.m_rateLimiter = readLimiter; // not a typo, the same in a legacy wrapper + easyHandleContext->readContext.m_request = request; + easyHandleContext->readContext.m_rateLimiter = writeLimiter; // not a typo, the same in a legacy wrapper + easyHandleContext->curlResult = CURLE_FAILED_INIT; + easyHandleContext->m_onCurlDoneFn = onDoneFn; + easyHandleContext->m_client = pClient; + + CURL* connectionHandle = easyHandleContext->m_curlEasyHandle; + SetOptCodeForHttpMethod(connectionHandle, request); + + const URI& uri = request->GetUri(); + Aws::String url = uri.GetURIString(); + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, "Making request to " << url); + curl_easy_setopt(connectionHandle, CURLOPT_URL, url.c_str()); + + //we only want to override the default path if someone has explicitly told us to. + if(!config.sslConfig.caPath.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_CAPATH, config.sslConfig.caPath.c_str()); + } + if(!config.sslConfig.caFile.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_CAINFO, config.sslConfig.caFile.c_str()); + } + + // enable the cookie engine without reading any initial cookies. + curl_easy_setopt(connectionHandle, CURLOPT_COOKIEFILE, ""); + +// only set by android test builds because the emulator is missing a cert needed for aws services +#ifdef TEST_CERT_PATH +curl_easy_setopt(connectionHandle, CURLOPT_CAPATH, TEST_CERT_PATH); +#endif // TEST_CERT_PATH + + if (config.sslConfig.verifySSL) + { + curl_easy_setopt(connectionHandle, CURLOPT_SSL_VERIFYPEER, 1L); + curl_easy_setopt(connectionHandle, CURLOPT_SSL_VERIFYHOST, 2L); + +#if defined(ENFORCE_TLS_V1_3) && LIBCURL_VERSION_NUM >= 0x073400 // 7.52.0 + curl_easy_setopt(connectionHandle, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_3); +#elif defined(ENFORCE_TLS_V1_2) && LIBCURL_VERSION_NUM >= 0x072200 // 7.34.0 + curl_easy_setopt(connectionHandle, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); +#else + curl_easy_setopt(connectionHandle, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1); +#endif + } + else + { + curl_easy_setopt(connectionHandle, CURLOPT_SSL_VERIFYPEER, 0L); + curl_easy_setopt(connectionHandle, CURLOPT_SSL_VERIFYHOST, 0L); + } + + if (config.allowRedirects) + { + curl_easy_setopt(connectionHandle, CURLOPT_FOLLOWLOCATION, 1L); + } + else + { + curl_easy_setopt(connectionHandle, CURLOPT_FOLLOWLOCATION, 0L); + } + +#ifdef ENABLE_CURL_LOGGING + curl_easy_setopt(connectionHandle, CURLOPT_VERBOSE, 1); + curl_easy_setopt(connectionHandle, CURLOPT_DEBUGFUNCTION, CurlMultiDebugCallback); +#endif + if (config.proxyConfig.isEnabled) + { + Aws::StringStream ss; + ss << config.proxyConfig.scheme << "://" << config.proxyConfig.host; + curl_easy_setopt(connectionHandle, CURLOPT_PROXY, ss.str().c_str()); + curl_easy_setopt(connectionHandle, CURLOPT_PROXYPORT, (long) config.proxyConfig.port); + if (!config.proxyConfig.userName.empty() || !config.proxyConfig.password.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXYUSERNAME, config.proxyConfig.userName.c_str()); + curl_easy_setopt(connectionHandle, CURLOPT_PROXYPASSWORD, config.proxyConfig.password.c_str()); + } + curl_easy_setopt(connectionHandle, CURLOPT_NOPROXY, config.proxyConfig.nonProxyHosts.c_str()); +#ifdef CURL_HAS_TLS_PROXY + if (!config.proxyConfig.sslCertPath.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY_SSLCERT, config.proxyConfig.sslCertPath.c_str()); + if (!config.proxyConfig.sslCertType.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY_SSLCERTTYPE, config.proxyConfig.sslCertType.c_str()); + } + } + if (!config.proxyConfig.sslKeyPath.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY_SSLKEY, config.proxyConfig.sslKeyPath.c_str()); + if (!config.proxyConfig.sslKeyType.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY_SSLKEYTYPE, config.proxyConfig.sslKeyType.c_str()); + } + if (!config.proxyConfig.keyPasswd.empty()) + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY_KEYPASSWD, config.proxyConfig.keyPasswd.c_str()); + } + } +#endif //CURL_HAS_TLS_PROXY + } + else + { + curl_easy_setopt(connectionHandle, CURLOPT_PROXY, ""); + } + + if (request->GetContentBody()) + { + easyHandleContext->writeContext.m_HasBody = true; +// curl_easy_setopt(connectionHandle, CURLOPT_READFUNCTION, ReadBodyFunc); +// curl_easy_setopt(connectionHandle, CURLOPT_READDATA, &handleContext.readContext); +// curl_easy_setopt(connectionHandle, CURLOPT_SEEKFUNCTION, SeekBody); +// curl_easy_setopt(connectionHandle, CURLOPT_SEEKDATA, &handleContext.readContext); + if (request->IsEventStreamRequest() && !response->HasHeader(Aws::Http::X_AMZN_ERROR_TYPE)) + { + curl_easy_setopt(connectionHandle, CURLOPT_READFUNCTION, Curl::CurlEasyHandleContext::ReadBodyStreaming); + curl_easy_setopt(connectionHandle, CURLOPT_NOPROGRESS, 0L); +#if LIBCURL_VERSION_NUM >= 0x072000 // 7.32.0 + curl_easy_setopt(connectionHandle, CURLOPT_XFERINFOFUNCTION, Curl::CurlEasyHandleContext::CurlProgressCallback); + curl_easy_setopt(connectionHandle, CURLOPT_XFERINFODATA, easyHandleContext); +#else + curl_easy_setopt(connectionHandle, CURLOPT_PROGRESSFUNCTION, Curl::CurlEasyHandleContext::CurlProgressCallback); + curl_easy_setopt(connectionHandle, CURLOPT_PROGRESSDATA, easyHandleContext); +#endif + } + } +} + +std::shared_ptr CurlMultiHttpClient::HandleCurlResponse(Curl::CurlEasyHandleContext* pEasyHandleCtx) +{ + const CURLcode curlResponseCode = pEasyHandleCtx->curlResult; + Aws::Http::HttpRequest * const request = pEasyHandleCtx->writeContext.m_request; + Aws::Http::HttpResponse * const response = pEasyHandleCtx->writeContext.m_response.get(); + CurlMultiHttpClient const * const client = pEasyHandleCtx->m_client; + CURL* const connectionHandle = pEasyHandleCtx->m_curlEasyHandle; + + assert(request && response && client && connectionHandle); + + bool shouldContinueRequest = client->ContinueRequest(*request); + if (curlResponseCode != CURLE_OK && shouldContinueRequest) + { + response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); + Aws::StringStream ss; + ss << "curlCode: " << curlResponseCode << ", " << curl_easy_strerror(curlResponseCode); + response->SetClientErrorMessage(ss.str()); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Curl returned error code " << curlResponseCode + << " - " << curl_easy_strerror(curlResponseCode)); + } + else if(!shouldContinueRequest) + { + response->SetClientErrorType(CoreErrors::USER_CANCELLED); + response->SetClientErrorMessage("Request cancelled by user's continuation handler"); + } + else + { + long responseCode; + curl_easy_getinfo(connectionHandle, CURLINFO_RESPONSE_CODE, &responseCode); + response->SetResponseCode(static_cast(responseCode)); + AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Returned http response code " << responseCode); + + char* contentType = nullptr; + curl_easy_getinfo(connectionHandle, CURLINFO_CONTENT_TYPE, &contentType); + if (contentType) + { + response->SetContentType(contentType); + AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Returned content type " << contentType); + } + + bool hasContentLength = false; + int64_t contentLength = + GetContentLengthFromHeader(connectionHandle, hasContentLength); + + if (request->GetMethod() != HttpMethod::HTTP_HEAD && + client->IsRequestProcessingEnabled() && + hasContentLength) + { + int64_t numBytesResponseReceived = pEasyHandleCtx->writeContext.m_numBytesResponseReceived; + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, "Response content-length header: " << contentLength); + AWS_LOGSTREAM_TRACE(CURL_HTTP_CLIENT_TAG, "Response body length: " << numBytesResponseReceived); + if (contentLength != numBytesResponseReceived) + { + response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage("Response body length doesn't match the content-length header."); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Response body length doesn't match the content-length header."); + } + } + + AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Releasing curl handle " << connectionHandle); + } + + double timep = 0.0; + CURLcode ret = curl_easy_getinfo(connectionHandle, CURLINFO_NAMELOOKUP_TIME, &timep); // DNS Resolve Latency, seconds. + if (ret == CURLE_OK) + { + request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::DnsLatency), static_cast(timep * 1000));// to milliseconds + } + + ret = curl_easy_getinfo(connectionHandle, CURLINFO_STARTTRANSFER_TIME, &timep); // Connect Latency + if (ret == CURLE_OK) + { + request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::ConnectLatency), static_cast(timep * 1000)); + } + + ret = curl_easy_getinfo(connectionHandle, CURLINFO_APPCONNECT_TIME, &timep); // Ssl Latency + if (ret == CURLE_OK) + { + request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::SslLatency), static_cast(timep * 1000)); + } + + curl_off_t speed; +#if LIBCURL_VERSION_NUM >= 0x073700 // 7.55.0 + ret = curl_easy_getinfo(connectionHandle, CURLINFO_SPEED_DOWNLOAD_T, &speed); // throughput +#else + ret = curl_easy_getinfo(connectionHandle, CURLINFO_SPEED_DOWNLOAD, &speed); // throughput +#endif + if (ret == CURLE_OK) + { + request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::Throughput), static_cast(speed)); + } + + const char* ip = nullptr; + auto curlGetInfoResult = curl_easy_getinfo(connectionHandle, CURLINFO_PRIMARY_IP, &ip); // Get the IP address of the remote endpoint + if (curlGetInfoResult == CURLE_OK && ip) + { + request->SetResolvedRemoteHost(ip); + } + //go ahead and flush the response body stream + response->GetResponseBody().flush(); + if (response->GetResponseBody().fail()) { + const auto& ref = response->GetResponseBody(); + Aws::StringStream ss; + ss << "Failed to flush response stream (eof: " << ref.eof() << ", bad: " << ref.bad() << ")"; + response->SetClientErrorType(CoreErrors::INTERNAL_FAILURE); + response->SetClientErrorMessage(ss.str()); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, ss.str()); + } + request->AddRequestMetric(GetHttpClientMetricNameByType(HttpClientMetricsType::RequestLatency), + (DateTime::Now() - pEasyHandleCtx->startTransmissionTime).count()); + + if (pEasyHandleCtx->m_curlHandleHeaders) + { + curl_slist_free_all(pEasyHandleCtx->m_curlHandleHeaders); + pEasyHandleCtx->m_curlHandleHeaders = nullptr; + } + + curl_multi_remove_handle(client->m_curlMultiHandleContainer.AccessCurlMultiHandle(), connectionHandle); + { + std::unique_lock lock(client->m_tasksMutex); + client->m_multiTasks.erase(pEasyHandleCtx); + } + + std::shared_ptr res = std::move(pEasyHandleCtx->writeContext.m_response); + pEasyHandleCtx->writeContext.m_response.reset(); + + if (curlResponseCode != CURLE_OK) + { + client->m_curlMultiHandleContainer.DestroyCurlHandle(pEasyHandleCtx); + } + else + { + client->m_curlMultiHandleContainer.ReleaseCurlHandle(pEasyHandleCtx); + } + + return res; +} + +bool CurlMultiHttpClient::SubmitTask(Curl::CurlEasyHandleContext* pEasyHandleCtx) const +{ + assert(pEasyHandleCtx); + CURLMcode curlMultiResponseCode = curl_multi_add_handle(m_curlMultiHandleContainer.AccessCurlMultiHandle(), + pEasyHandleCtx->m_curlEasyHandle); + if (CURLM_OK != curlMultiResponseCode) + { + return false; + } + + { + std::unique_lock lock(m_tasksMutex); + m_multiTasks.insert(pEasyHandleCtx); + m_tasksQueued++; + } + { + std::unique_lock lockGuard(m_signalMutex); + m_signalRunning.notify_one(); + curl_multi_wakeup(m_curlMultiHandleContainer.AccessCurlMultiHandle()); + } + return true; +} + +// Blocking +std::shared_ptr CurlMultiHttpClient::MakeRequest(const std::shared_ptr& request, + Aws::Utils::RateLimits::RateLimiterInterface* readLimiter, + Aws::Utils::RateLimits::RateLimiterInterface* writeLimiter) const +{ + std::shared_ptr response = Aws::MakeShared(CURL_HTTP_CLIENT_TAG, request); + if(!response) + { + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to allocate shared_ptr"); + return response; + } + + Curl::CurlEasyHandleContext* easyHandleContext = m_curlMultiHandleContainer.AcquireCurlHandle(); + if(!easyHandleContext) + { + response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage("Failed to Acquire curl handle"); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to Acquire curl handle"); + return response; + } + + AWS_LOGSTREAM_DEBUG(CURL_HTTP_CLIENT_TAG, "Obtained connection handle " << easyHandleContext); + + std::mutex taskMutex; + std::condition_variable signal; + bool completed = false; + auto onDone = [&signal, &taskMutex, &completed]() + { + std::unique_lock lockGuard(taskMutex); + completed = true; + signal.notify_one(); + }; + + ConfigureEasyHandle(this, m_config, onDone, easyHandleContext, request.get(), response, readLimiter, writeLimiter); + OverrideOptionsOnConnectionHandle(easyHandleContext->m_curlEasyHandle); + + easyHandleContext->startTransmissionTime = Aws::Utils::DateTime::Now(); + + if(!SubmitTask(easyHandleContext)) + { + response->SetClientErrorType(CoreErrors::NETWORK_CONNECTION); + response->SetClientErrorMessage("Failed to add curl_easy_handle to curl_multi_handle."); + AWS_LOGSTREAM_ERROR(CURL_HTTP_CLIENT_TAG, "Failed to add curl_easy_handle to curl_multi_handle."); + return response; + } + + // Task submitted, wait for it's completion + std::unique_lock lockGuard(taskMutex); + signal.wait(lockGuard, + [this, &completed] + { + return !this->m_isRunning.load() || completed; + }); + + // This is a blocking mode, handle response within the submitter thread + return HandleCurlResponse(easyHandleContext); +}