Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .changes/next-release/feature-AWSSDKforJavav2-ceb5337.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"type": "feature",
"category": "AWS SDK for Java v2",
"contributor": "",
"description": "Add support for signing async payloads in the default `AwsV4aHttpSigner`."
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,19 @@
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD;
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER;
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.STREAMING_UNSIGNED_PAYLOAD_TRAILER;
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_DECODED_CONTENT_LENGTH;
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerConstant.X_AMZ_TRAILER;
import static software.amazon.awssdk.http.auth.aws.internal.signer.util.SignerUtils.computeAndMoveContentLength;

import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.checksums.SdkChecksum;
import software.amazon.awssdk.checksums.spi.ChecksumAlgorithm;
Expand All @@ -35,8 +41,12 @@
import software.amazon.awssdk.http.SdkHttpRequest;
import software.amazon.awssdk.http.auth.aws.internal.signer.CredentialScope;
import software.amazon.awssdk.http.auth.aws.internal.signer.NoOpPayloadChecksumStore;
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.AsyncChunkEncodedPayload;
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChecksumTrailerProvider;
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedInputStream;
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPayload;
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.ChunkedEncodedPublisher;
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.SyncChunkEncodedPayload;
import software.amazon.awssdk.http.auth.aws.internal.signer.chunkedencoding.TrailerProvider;
import software.amazon.awssdk.http.auth.aws.internal.signer.io.ChecksumInputStream;
import software.amazon.awssdk.http.auth.aws.internal.signer.io.ResettableContentStreamProvider;
Expand Down Expand Up @@ -83,39 +93,73 @@ public ContentStreamProvider sign(ContentStreamProvider payload, V4aRequestSigni
.chunkSize(chunkSize)
.header(chunk -> Integer.toHexString(chunk.remaining()).getBytes(StandardCharsets.UTF_8));

preExistingTrailers.forEach(trailer -> chunkedEncodedInputStreamBuilder.addTrailer(() -> trailer));
SyncChunkEncodedPayload chunkedPayload = new SyncChunkEncodedPayload(chunkedEncodedInputStreamBuilder);

signCommon(chunkedPayload, requestSigningResult);

return new ResettableContentStreamProvider(chunkedEncodedInputStreamBuilder::build);
}

/**
* Given a payload and result of request signing, sign the payload via the SigV4 process.
*/
@Override
public Publisher<ByteBuffer> signAsync(Publisher<ByteBuffer> payload, V4aRequestSigningResult requestSigningResult) {
ChunkedEncodedPublisher.Builder chunkedStreamBuilder = ChunkedEncodedPublisher.builder()
.publisher(payload)
.chunkSize(chunkSize)
.addEmptyTrailingChunk(true);
AsyncChunkEncodedPayload chunkedPayload = new AsyncChunkEncodedPayload(chunkedStreamBuilder);

signCommon(chunkedPayload, requestSigningResult);

return chunkedStreamBuilder.build();
}

private ChunkedEncodedPayload signCommon(ChunkedEncodedPayload payload, V4aRequestSigningResult requestSigningResult) {
SdkHttpRequest.Builder request = requestSigningResult.getSignedRequest();

payload.decodedContentLength(request.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH)
.map(Long::parseLong)
.orElseThrow(() -> {
String msg = String.format("Expected header '%s' to be present",
X_AMZ_DECODED_CONTENT_LENGTH);
return new RuntimeException(msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be a more specific exception?

}));

preExistingTrailers.forEach(trailer -> payload.addTrailer(() -> trailer));

switch (requestSigningResult.getSigningConfig().getSignedBodyValue()) {
case STREAMING_ECDSA_SIGNED_PAYLOAD: {
RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSignature(),
requestSigningResult.getSigningConfig());
chunkedEncodedInputStreamBuilder.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope));
payload.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope));
break;
}
case STREAMING_UNSIGNED_PAYLOAD_TRAILER:
setupChecksumTrailerIfNeeded(chunkedEncodedInputStreamBuilder);
setupChecksumTrailerIfNeeded(payload);
break;
case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER: {
RollingSigner rollingSigner = new RollingSigner(requestSigningResult.getSignature(),
requestSigningResult.getSigningConfig());
chunkedEncodedInputStreamBuilder.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope));
setupChecksumTrailerIfNeeded(chunkedEncodedInputStreamBuilder);
chunkedEncodedInputStreamBuilder.addTrailer(
new SigV4aTrailerProvider(chunkedEncodedInputStreamBuilder.trailers(), rollingSigner, credentialScope)
payload.addExtension(new SigV4aChunkExtensionProvider(rollingSigner, credentialScope));
setupChecksumTrailerIfNeeded(payload);
payload.addTrailer(
new SigV4aTrailerProvider(payload.trailers(), rollingSigner, credentialScope)
);
break;
}
default:
throw new UnsupportedOperationException();
}

return new ResettableContentStreamProvider(chunkedEncodedInputStreamBuilder::build);
return payload;
}

@Override
public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider payload, String checksum) {
long encodedContentLength = 0;
long contentLength = SignerUtils.computeAndMoveContentLength(request, payload);
long contentLength = computeAndMoveContentLength(request, payload);
setupPreExistingTrailers(request);

// pre-existing trailers
Expand Down Expand Up @@ -157,6 +201,72 @@ public void beforeSigning(SdkHttpRequest.Builder request, ContentStreamProvider
// CRT-signed request doesn't expect 'aws-chunked' Content-Encoding, so we don't add it
}

@Override
public CompletableFuture<Pair<SdkHttpRequest.Builder, Optional<Publisher<ByteBuffer>>>> beforeSigningAsync(
SdkHttpRequest.Builder request, Publisher<ByteBuffer> payload, String checksum) {

return SignerUtils.moveContentLength(request, payload)
.thenApply(p -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Question: is there any difference between sigv4a and sigv4 for this logic here?

SdkHttpRequest.Builder requestBuilder = p.left();
setupPreExistingTrailers(requestBuilder);

long decodedContentLength =
requestBuilder.firstMatchingHeader(X_AMZ_DECODED_CONTENT_LENGTH)
.map(Long::parseLong)
// should not happen, this header is added by
// moveContentLength
.orElseThrow(() -> new RuntimeException(
X_AMZ_DECODED_CONTENT_LENGTH + " header not present"));

long encodedContentLength = calculateEncodedContentLength(request, decodedContentLength, checksum);

if (checksumAlgorithm != null) {
String checksumHeaderName = checksumHeaderName(checksumAlgorithm);
request.appendHeader(X_AMZ_TRAILER, checksumHeaderName);
}
request.putHeader(Header.CONTENT_LENGTH, Long.toString(encodedContentLength));

return Pair.of(requestBuilder, p.right());
});
}

private long calculateEncodedContentLength(SdkHttpRequest.Builder requestBuilder, long decodedContentLength,
String checksum) {
long encodedContentLength = 0;

encodedContentLength += calculateExistingTrailersLength();

switch (checksum) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question here, can we reuse the logic somehow?

case STREAMING_ECDSA_SIGNED_PAYLOAD: {
long extensionsLength = 161; // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - make this is a constant

encodedContentLength += calculateChunksLength(decodedContentLength, extensionsLength);
break;
}
case STREAMING_UNSIGNED_PAYLOAD_TRAILER:
if (checksumAlgorithm != null) {
encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm));
}
encodedContentLength += calculateChunksLength(decodedContentLength, 0);
break;
case STREAMING_ECDSA_SIGNED_PAYLOAD_TRAILER: {
long extensionsLength = 161; // ;chunk-signature:<sigv4a-ecsda hex signature, 144 bytes>
encodedContentLength += calculateChunksLength(decodedContentLength, extensionsLength);
if (checksumAlgorithm != null) {
encodedContentLength += calculateChecksumTrailerLength(checksumHeaderName(checksumAlgorithm));
}
encodedContentLength += 170; // x-amz-trailer-signature:<sigv4a-ecsda hex signature, 144 bytes>\r\n
break;
}
default:
throw new UnsupportedOperationException();
}

// terminating \r\n
encodedContentLength += 2;

return encodedContentLength;
}

/**
* Set up a map of pre-existing trailer (headers) for the given request to be used when chunk-encoding the payload.
* <p>
Expand Down Expand Up @@ -270,6 +380,30 @@ private void setupChecksumTrailerIfNeeded(ChunkedEncodedInputStream.Builder buil
builder.inputStream(checksumInputStream).addTrailer(checksumTrailer);
}

private void setupChecksumTrailerIfNeeded(ChunkedEncodedPayload payload) {
if (checksumAlgorithm == null) {
return;
}
String checksumHeaderName = checksumHeaderName(checksumAlgorithm);

String cachedChecksum = getCachedChecksum();

if (cachedChecksum != null) {
LOG.debug(() -> String.format("Cached payload checksum available for algorithm %s: %s. Using cached value",
checksumAlgorithm.algorithmId(), checksumHeaderName));
payload.addTrailer(() -> Pair.of(checksumHeaderName, Collections.singletonList(cachedChecksum)));
return;
}

SdkChecksum sdkChecksum = fromChecksumAlgorithm(checksumAlgorithm);
payload.checksumPayload(sdkChecksum);

TrailerProvider checksumTrailer =
new ChecksumTrailerProvider(sdkChecksum, checksumHeaderName, checksumAlgorithm, payloadChecksumStore);

payload.addTrailer(checksumTrailer);
}

private String getCachedChecksum() {
byte[] checksumBytes = payloadChecksumStore.getChecksumValue(checksumAlgorithm);
if (checksumBytes != null) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.http.auth.aws.crt.internal.signer;

import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.crt.http.HttpRequestBodyStream;
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;

@SdkInternalApi
public final class CrtRequestBodyAdapter implements HttpRequestBodyStream {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we move this existing class https://github.com/aws/aws-sdk-java-v2/blob/master/http-clients/aws-crt-client/src/main/java/software/amazon/awssdk/http/crt/internal/request/CrtRequestBodyAdapter.java to crt-core and use it directly? I understand this means we'd need to make it protected APIs, but the API surface area seems small enough...

private static final int BUFFER_SIZE = 4 * 1024 * 1024; // 4 MB
private final Publisher<ByteBuffer> requestPublisher;
private final long contentLength;
private ByteBufferStoringSubscriber requestBodySubscriber;

public CrtRequestBodyAdapter(Publisher<ByteBuffer> requestPublisher, long contentLength) {
this.requestPublisher = requestPublisher;
this.contentLength = contentLength;
this.requestBodySubscriber = new ByteBufferStoringSubscriber(BUFFER_SIZE);
}

@Override
public boolean sendRequestBody(ByteBuffer bodyBytesOut) {
return requestBodySubscriber.transferTo(bodyBytesOut) == TransferResult.END_OF_STREAM;
}

@Override
public boolean resetPosition() {
requestBodySubscriber = new ByteBufferStoringSubscriber(BUFFER_SIZE);
requestPublisher.subscribe(requestBodySubscriber);
return true;
}

@Override
public long getLength() {
return contentLength;
}
}
Loading
Loading