diff --git a/.evergreen/.evg.yml b/.evergreen/.evg.yml index b1685115e7d..c20533906ec 100644 --- a/.evergreen/.evg.yml +++ b/.evergreen/.evg.yml @@ -207,8 +207,12 @@ functions: file: mo-expansion.yml "bootstrap mongohoused": + - command: ec2.assume_role + params: + role_arn: ${aws_test_secrets_role} - command: shell.exec params: + include_expansions_in_env: [ "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN" ] script: | DRIVERS_TOOLS="${DRIVERS_TOOLS}" bash ${DRIVERS_TOOLS}/.evergreen/atlas_data_lake/pull-mongohouse-image.sh - command: shell.exec @@ -831,13 +835,7 @@ functions: set -o errexit ${PREPARE_SHELL} export K8S_VARIANT=${VARIANT} - cd src - git add . - git commit --allow-empty -m "add files" - # uncompressed tar used to allow appending .git folder - export K8S_DRIVERS_TAR_FILE=/tmp/mongo-java-driver.tar - git archive -o $K8S_DRIVERS_TAR_FILE HEAD - tar -rf $K8S_DRIVERS_TAR_FILE .git + export K8S_DRIVERS_TAR_FILE=$(./.evergreen/git-archive.sh) export K8S_TEST_CMD="OIDC_ENV=k8s VARIANT=${VARIANT} ./.evergreen/run-mongodb-oidc-test.sh" bash $DRIVERS_TOOLS/.evergreen/auth_oidc/k8s/setup-pod.sh bash $DRIVERS_TOOLS/.evergreen/auth_oidc/k8s/run-self-test.sh @@ -937,13 +935,7 @@ tasks: script: |- set -o errexit ${PREPARE_SHELL} - cd src - git add . - git commit --allow-empty -m "add files" - # uncompressed tar used to allow appending .git folder - export AZUREOIDC_DRIVERS_TAR_FILE=/tmp/mongo-java-driver.tar - git archive -o $AZUREOIDC_DRIVERS_TAR_FILE HEAD - tar -rf $AZUREOIDC_DRIVERS_TAR_FILE .git + export AZUREOIDC_DRIVERS_TAR_FILE=$(./.evergreen/git-archive.sh) export AZUREOIDC_TEST_CMD="OIDC_ENV=azure ./.evergreen/run-mongodb-oidc-test.sh" bash $DRIVERS_TOOLS/.evergreen/auth_oidc/azure/run-driver-test.sh @@ -955,13 +947,7 @@ tasks: script: |- set -o errexit ${PREPARE_SHELL} - cd src - git add . - git commit --allow-empty -m "add files" - # uncompressed tar used to allow appending .git folder - export GCPOIDC_DRIVERS_TAR_FILE=/tmp/mongo-java-driver.tar - git archive -o $GCPOIDC_DRIVERS_TAR_FILE HEAD - tar -rf $GCPOIDC_DRIVERS_TAR_FILE .git + export GCPOIDC_DRIVERS_TAR_FILE=$(./.evergreen/git-archive.sh) # Define the command to run on the VM. # Ensure that we source the environment file created for us, set up any other variables we need, # and then run our test suite on the vm. @@ -969,6 +955,8 @@ tasks: bash $DRIVERS_TOOLS/.evergreen/auth_oidc/gcp/run-driver-test.sh - name: "oidc-auth-test-k8s" + # Might exceed 1 hour of execution. + exec_timeout_secs: 7200 commands: - command: ec2.assume_role params: @@ -983,6 +971,8 @@ tasks: - func: "oidc-auth-test-k8s-func" vars: VARIANT: gke + params: + include_expansions_in_env: [ "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN" ] - name: serverless-test commands: diff --git a/.evergreen/git-archive.sh b/.evergreen/git-archive.sh new file mode 100755 index 00000000000..5c22c9170a4 --- /dev/null +++ b/.evergreen/git-archive.sh @@ -0,0 +1,20 @@ +#!/bin/bash + +# Exit the script with error if any of the commands fail +set -o errexit + +# Returns the path to the root archive file which includes all git submodules. + +echo "Creating root archive" +export GIT_ARCHIVE_FILE="/tmp/mongo-java-driver.tar" + +# create root archive +git archive --output $GIT_ARCHIVE_FILE HEAD + +echo "Appending submodule archives" +git submodule status --recursive | awk '{ print $2 }' | xargs tar -rf $GIT_ARCHIVE_FILE + +echo "Appending .git directory to the root archive" +tar -rf $GIT_ARCHIVE_FILE .git + +echo "$GIT_ARCHIVE_FILE" diff --git a/driver-core/src/main/com/mongodb/internal/connection/Authenticator.java b/driver-core/src/main/com/mongodb/internal/connection/Authenticator.java index cd1809966b0..b1579cd1190 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/Authenticator.java +++ b/driver-core/src/main/com/mongodb/internal/connection/Authenticator.java @@ -103,13 +103,18 @@ abstract void authenticateAsync(InternalConnection connection, ConnectionDescrip OperationContext operationContext, SingleResultCallback callback); public void reauthenticate(final InternalConnection connection, final OperationContext operationContext) { - authenticate(connection, connection.getDescription(), operationContext); + authenticate(connection, connection.getDescription(), operationContextWithoutSession(operationContext)); } public void reauthenticateAsync(final InternalConnection connection, final OperationContext operationContext, final SingleResultCallback callback) { beginAsync().thenRun((c) -> { - authenticateAsync(connection, connection.getDescription(), operationContext, c); + authenticateAsync(connection, connection.getDescription(), operationContextWithoutSession(operationContext), c); }).finish(callback); } + + private static OperationContext operationContextWithoutSession(final OperationContext operationContext) { + return operationContext.withSessionContext( + new ReadConcernAwareNoOpSessionContext(operationContext.getSessionContext().getReadConcern())); + } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java index 03a0309a10e..e24950105bb 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java +++ b/driver-core/src/main/com/mongodb/internal/connection/DefaultServerMonitor.java @@ -30,6 +30,7 @@ import com.mongodb.event.ServerHeartbeatSucceededEvent; import com.mongodb.event.ServerMonitorListener; import com.mongodb.internal.TimeoutContext; +import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.diagnostics.logging.Logger; import com.mongodb.internal.diagnostics.logging.Loggers; import com.mongodb.internal.inject.Provider; @@ -55,6 +56,7 @@ import static com.mongodb.connection.ServerType.UNKNOWN; import static com.mongodb.internal.Locks.checkedWithLock; import static com.mongodb.internal.Locks.withLock; +import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE; import static com.mongodb.internal.connection.CommandHelper.HELLO; import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO; import static com.mongodb.internal.connection.CommandHelper.executeCommand; @@ -149,8 +151,14 @@ public void cancelCurrentCheck() { monitor.cancelCurrentCheck(); } + @VisibleForTesting(otherwise = PRIVATE) + ServerMonitor getServerMonitor() { + return monitor; + } + class ServerMonitor extends Thread implements AutoCloseable { private volatile InternalConnection connection = null; + private volatile boolean alreadyLoggedHeartBeatStarted = false; private volatile boolean currentCheckCancelled; ServerMonitor() { @@ -213,9 +221,13 @@ public void run() { private ServerDescription lookupServerDescription(final ServerDescription currentServerDescription) { try { + boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription); if (connection == null || connection.isClosed()) { + alreadyLoggedHeartBeatStarted = true; currentCheckCancelled = false; InternalConnection newConnection = internalConnectionFactory.create(serverId); + serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( + newConnection.getDescription().getConnectionId(), shouldStreamResponses)); newConnection.open(operationContextFactory.create()); connection = newConnection; roundTripTimeSampler.addSample(connection.getInitialServerDescription().getRoundTripTimeNanos()); @@ -225,9 +237,11 @@ private ServerDescription lookupServerDescription(final ServerDescription curren if (LOGGER.isDebugEnabled()) { LOGGER.debug(format("Checking status of %s", serverId.getAddress())); } - boolean shouldStreamResponses = shouldStreamResponses(currentServerDescription); - serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( - connection.getDescription().getConnectionId(), shouldStreamResponses)); + if (!alreadyLoggedHeartBeatStarted) { + serverMonitorListener.serverHearbeatStarted(new ServerHeartbeatStartedEvent( + connection.getDescription().getConnectionId(), shouldStreamResponses)); + } + alreadyLoggedHeartBeatStarted = false; long start = System.nanoTime(); try { diff --git a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java index 6fca357b080..79c21f33356 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java +++ b/driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java @@ -40,7 +40,6 @@ import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO; import static com.mongodb.internal.connection.CommandHelper.executeCommand; import static com.mongodb.internal.connection.CommandHelper.executeCommandAsync; -import static com.mongodb.internal.connection.CommandHelper.executeCommandWithoutCheckingForFailure; import static com.mongodb.internal.connection.DefaultAuthenticator.USER_NOT_FOUND_CODE; import static com.mongodb.internal.connection.DescriptionHelper.createConnectionDescription; import static com.mongodb.internal.connection.DescriptionHelper.createServerDescription; @@ -88,7 +87,8 @@ public InternalConnectionInitializationDescription finishHandshake(final Interna if (Authenticator.shouldAuthenticate(authenticator, connectionDescription)) { authenticator.authenticate(internalConnection, connectionDescription, operationContext); } - return completeConnectionDescriptionInitialization(internalConnection, description, operationContext); + + return description; } @Override @@ -121,14 +121,14 @@ public void finishHandshakeAsync(final InternalConnection internalConnection, ConnectionDescription connectionDescription = description.getConnectionDescription(); if (!Authenticator.shouldAuthenticate(authenticator, connectionDescription)) { - completeConnectionDescriptionInitializationAsync(internalConnection, description, operationContext, callback); + callback.onResult(description, null); } else { authenticator.authenticateAsync(internalConnection, connectionDescription, operationContext, (result1, t1) -> { if (t1 != null) { callback.onResult(null, t1); } else { - completeConnectionDescriptionInitializationAsync(internalConnection, description, operationContext, callback); + callback.onResult(description, null); } }); } @@ -203,21 +203,6 @@ private BsonDocument createHelloCommand(final Authenticator authenticator, final return helloCommandDocument; } - private InternalConnectionInitializationDescription completeConnectionDescriptionInitialization( - final InternalConnection internalConnection, - final InternalConnectionInitializationDescription description, - final OperationContext operationContext) { - - if (description.getConnectionDescription().getConnectionId().getServerValue() != null) { - return description; - } - - return applyGetLastErrorResult(executeCommandWithoutCheckingForFailure("admin", - new BsonDocument("getlasterror", new BsonInt32(1)), clusterConnectionMode, serverApi, - internalConnection, operationContext), - description); - } - private void setSpeculativeAuthenticateResponse(final BsonDocument helloResult) { if (authenticator instanceof SpeculativeAuthenticator) { ((SpeculativeAuthenticator) authenticator).setSpeculativeAuthenticateResponse( @@ -225,28 +210,6 @@ private void setSpeculativeAuthenticateResponse(final BsonDocument helloResult) } } - private void completeConnectionDescriptionInitializationAsync( - final InternalConnection internalConnection, - final InternalConnectionInitializationDescription description, - final OperationContext operationContext, - final SingleResultCallback callback) { - - if (description.getConnectionDescription().getConnectionId().getServerValue() != null) { - callback.onResult(description, null); - return; - } - - executeCommandAsync("admin", new BsonDocument("getlasterror", new BsonInt32(1)), clusterConnectionMode, serverApi, - internalConnection, operationContext, - (result, t) -> { - if (t != null) { - callback.onResult(description, null); - } else { - callback.onResult(applyGetLastErrorResult(result, description), null); - } - }); - } - private InternalConnectionInitializationDescription applyGetLastErrorResult( final BsonDocument getLastErrorResult, final InternalConnectionInitializationDescription description) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy deleted file mode 100644 index c452d757a28..00000000000 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorSpecification.groovy +++ /dev/null @@ -1,272 +0,0 @@ -/* - * Copyright 2008-present MongoDB, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.mongodb.internal.connection - -import com.mongodb.MongoSocketReadTimeoutException -import com.mongodb.ServerAddress -import com.mongodb.connection.ClusterConnectionMode -import com.mongodb.connection.ClusterId -import com.mongodb.connection.ConnectionDescription -import com.mongodb.connection.ServerConnectionState -import com.mongodb.connection.ServerDescription -import com.mongodb.connection.ServerId -import com.mongodb.connection.ServerSettings -import com.mongodb.connection.ServerType -import com.mongodb.event.ServerHeartbeatFailedEvent -import com.mongodb.event.ServerHeartbeatStartedEvent -import com.mongodb.event.ServerHeartbeatSucceededEvent -import com.mongodb.event.ServerMonitorListener -import com.mongodb.internal.inject.SameObjectProvider -import org.bson.BsonDocument -import org.bson.ByteBufNIO -import spock.lang.Specification - -import java.nio.ByteBuffer -import java.util.concurrent.CountDownLatch -import java.util.concurrent.TimeUnit - -import static com.mongodb.ClusterFixture.OPERATION_CONTEXT_FACTORY -import static com.mongodb.internal.connection.MessageHelper.LEGACY_HELLO_LOWER - -@SuppressWarnings('BusyWait') -class DefaultServerMonitorSpecification extends Specification { - - DefaultServerMonitor monitor - - def 'close should not send a sendStateChangedEvent'() { - given: - def stateChanged = false - def sdam = new SdamServerDescriptionManager() { - @Override - void update(final ServerDescription candidateDescription) { - assert candidateDescription != null - stateChanged = true - } - - @Override - void handleExceptionBeforeHandshake(final SdamServerDescriptionManager.SdamIssue sdamIssue) { - throw new UnsupportedOperationException() - } - - @Override - void handleExceptionAfterHandshake(final SdamServerDescriptionManager.SdamIssue sdamIssue) { - throw new UnsupportedOperationException() - } - - @Override - SdamServerDescriptionManager.SdamIssue.Context context() { - throw new UnsupportedOperationException() - } - - @Override - SdamServerDescriptionManager.SdamIssue.Context context(final InternalConnection connection) { - throw new UnsupportedOperationException() - } - } - def internalConnectionFactory = Mock(InternalConnectionFactory) { - create(_) >> { - Mock(InternalConnection) { - open(_) >> { sleep(100) } - } - } - } - monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()), ServerSettings.builder().build(), - internalConnectionFactory, ClusterConnectionMode.SINGLE, null, false, SameObjectProvider.initialized(sdam), - OPERATION_CONTEXT_FACTORY) - - monitor.start() - - when: - monitor.close() - monitor.monitor.join() - - then: - !stateChanged - } - - def 'should send started and succeeded heartbeat events'() { - given: - def latch = new CountDownLatch(1) - def startedEvent - def succeededEvent - def failedEvent - - def serverMonitorListener = new ServerMonitorListener() { - @Override - void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { - startedEvent = event - } - - @Override - void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) { - succeededEvent = event - latch.countDown() - } - - @Override - void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) { - failedEvent = event - latch.countDown() - } - } - - def connectionDescription = new ConnectionDescription(new ServerId(new ClusterId(''), new ServerAddress())) - def initialServerDescription = ServerDescription.builder() - .ok(true) - .address(new ServerAddress()) - .type(ServerType.STANDALONE) - .state(ServerConnectionState.CONNECTED) - .build() - - def helloResponse = '{' + - "$LEGACY_HELLO_LOWER: true," + - 'maxBsonObjectSize : 16777216, ' + - 'maxMessageSizeBytes : 48000000, ' + - 'maxWriteBatchSize : 1000, ' + - 'localTime : ISODate("2016-04-05T20:36:36.082Z"), ' + - 'maxWireVersion : 4, ' + - 'minWireVersion : 0, ' + - 'ok : 1 ' + - '}' - - def internalConnectionFactory = Mock(InternalConnectionFactory) { - create(_) >> { - Mock(InternalConnection) { - open(_) >> { } - - getBuffer(_) >> { int size -> - new ByteBufNIO(ByteBuffer.allocate(size)) - } - - getDescription() >> { - connectionDescription - } - - getInitialServerDescription() >> { - initialServerDescription - } - - send(_, _, _) >> { } - - receive(_, _) >> { - BsonDocument.parse(helloResponse) - } - } - } - } - monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()), - ServerSettings.builder().heartbeatFrequency(1, TimeUnit.SECONDS).addServerMonitorListener(serverMonitorListener).build(), - internalConnectionFactory, ClusterConnectionMode.SINGLE, null, false, mockSdamProvider(), OPERATION_CONTEXT_FACTORY) - - when: - monitor.start() - latch.await(30, TimeUnit.SECONDS) - - then: - failedEvent == null - startedEvent.connectionId == connectionDescription.connectionId - succeededEvent.connectionId == connectionDescription.connectionId - succeededEvent.reply == BsonDocument.parse(helloResponse) - succeededEvent.getElapsedTime(TimeUnit.NANOSECONDS) > 0 - - cleanup: - monitor?.close() - } - - def 'should send started and failed heartbeat events'() { - given: - def latch = new CountDownLatch(1) - def startedEvent - def succeededEvent - def failedEvent - - def serverMonitorListener = new ServerMonitorListener() { - @Override - void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { - startedEvent = event - } - - @Override - void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) { - succeededEvent = event - latch.countDown() - } - - @Override - void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) { - failedEvent = event - latch.countDown() - } - } - - def connectionDescription = new ConnectionDescription(new ServerId(new ClusterId(''), new ServerAddress())) - def initialServerDescription = ServerDescription.builder() - .ok(true) - .address(new ServerAddress()) - .type(ServerType.STANDALONE) - .state(ServerConnectionState.CONNECTED) - .build() - def exception = new MongoSocketReadTimeoutException('read timeout', new ServerAddress(), new IOException()) - - def internalConnectionFactory = Mock(InternalConnectionFactory) { - create(_) >> { - Mock(InternalConnection) { - open(_) >> { } - - getBuffer(_) >> { int size -> - new ByteBufNIO(ByteBuffer.allocate(size)) - } - - getDescription() >> { - connectionDescription - } - - getInitialServerDescription() >> { - initialServerDescription - } - - send(_, _, _) >> { } - - receive(_, _) >> { - throw exception - } - } - } - } - monitor = new DefaultServerMonitor(new ServerId(new ClusterId(), new ServerAddress()), - ServerSettings.builder().heartbeatFrequency(1, TimeUnit.SECONDS).addServerMonitorListener(serverMonitorListener).build(), - internalConnectionFactory, ClusterConnectionMode.SINGLE, null, false, mockSdamProvider(), OPERATION_CONTEXT_FACTORY) - - when: - monitor.start() - latch.await(30, TimeUnit.SECONDS) - - then: - succeededEvent == null - startedEvent.connectionId == connectionDescription.connectionId - failedEvent.connectionId == connectionDescription.connectionId - failedEvent.throwable == exception - failedEvent.getElapsedTime(TimeUnit.NANOSECONDS) > 0 - - cleanup: - monitor?.close() - } - - private mockSdamProvider() { - SameObjectProvider.initialized(Mock(SdamServerDescriptionManager)) - } -} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorTest.java new file mode 100644 index 00000000000..c6bc469cc55 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/DefaultServerMonitorTest.java @@ -0,0 +1,300 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.mongodb.internal.connection; + +import com.mongodb.MongoSocketReadTimeoutException; +import com.mongodb.ServerAddress; +import com.mongodb.connection.ClusterConnectionMode; +import com.mongodb.connection.ClusterId; +import com.mongodb.connection.ConnectionDescription; +import com.mongodb.connection.ServerConnectionState; +import com.mongodb.connection.ServerDescription; +import com.mongodb.connection.ServerId; +import com.mongodb.connection.ServerSettings; +import com.mongodb.connection.ServerType; +import com.mongodb.event.ServerHeartbeatFailedEvent; +import com.mongodb.event.ServerHeartbeatStartedEvent; +import com.mongodb.event.ServerHeartbeatSucceededEvent; +import com.mongodb.event.ServerMonitorListener; +import com.mongodb.event.TestServerMonitorListener; +import com.mongodb.internal.inject.SameObjectProvider; +import org.bson.BsonDocument; +import org.bson.ByteBufNIO; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.opentest4j.AssertionFailedError; + +import java.io.IOException; +import java.net.SocketException; +import java.nio.ByteBuffer; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +import static com.mongodb.ClusterFixture.OPERATION_CONTEXT_FACTORY; +import static com.mongodb.assertions.Assertions.assertFalse; +import static com.mongodb.internal.connection.MessageHelper.LEGACY_HELLO_LOWER; +import static java.util.Arrays.asList; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +public class DefaultServerMonitorTest { + + private DefaultServerMonitor monitor; + + @AfterEach + void tearDown() throws InterruptedException { + if (monitor != null) { + monitor.close(); + monitor.getServerMonitor().join(); + } + } + + @Test + void closeShouldNotSendStateChangedEvent() throws Exception { + // Given + AtomicBoolean stateChanged = new AtomicBoolean(false); + + SdamServerDescriptionManager sdamManager = new SdamServerDescriptionManager() { + @Override + public void update(final ServerDescription candidateDescription) { + assertNotNull(candidateDescription); + stateChanged.set(true); + } + + @Override + public void handleExceptionBeforeHandshake(final SdamServerDescriptionManager.SdamIssue sdamIssue) { + throw new UnsupportedOperationException(); + } + + @Override + public void handleExceptionAfterHandshake(final SdamServerDescriptionManager.SdamIssue sdamIssue) { + throw new UnsupportedOperationException(); + } + + @Override + public SdamServerDescriptionManager.SdamIssue.Context context() { + throw new UnsupportedOperationException(); + } + + @Override + public SdamServerDescriptionManager.SdamIssue.Context context(final InternalConnection connection) { + throw new UnsupportedOperationException(); + } + }; + + InternalConnection mockConnection = mock(InternalConnection.class); + doAnswer(invocation -> { + Thread.sleep(100); + return null; + }).when(mockConnection).open(any()); + + InternalConnectionFactory factory = createConnectionFactory(mockConnection); + + monitor = new DefaultServerMonitor( + new ServerId(new ClusterId(), new ServerAddress()), + ServerSettings.builder().build(), + factory, + ClusterConnectionMode.SINGLE, + null, + false, + SameObjectProvider.initialized(sdamManager), + OPERATION_CONTEXT_FACTORY); + + // When + monitor.start(); + monitor.close(); + + // Then + assertFalse(stateChanged.get()); + } + + @Test + void shouldSendStartedAndSucceededHeartbeatEvents() throws Exception { + // Given + ConnectionDescription connectionDescription = createDefaultConnectionDescription(); + ServerDescription initialServerDescription = createDefaultServerDescription(); + + String helloResponse = "{" + + LEGACY_HELLO_LOWER + ": true," + + "maxBsonObjectSize : 16777216, " + + "maxMessageSizeBytes : 48000000, " + + "maxWriteBatchSize : 1000, " + + "localTime : ISODate(\"2016-04-05T20:36:36.082Z\"), " + + "maxWireVersion : 4, " + + "minWireVersion : 0, " + + "ok : 1 " + + "}"; + + InternalConnection mockConnection = mock(InternalConnection.class); + when(mockConnection.getDescription()).thenReturn(connectionDescription); + when(mockConnection.getInitialServerDescription()).thenReturn(initialServerDescription); + when(mockConnection.getBuffer(anyInt())).thenReturn(new ByteBufNIO(ByteBuffer.allocate(1024))); + when(mockConnection.receive(any(), any())).thenReturn(BsonDocument.parse(helloResponse)); + + // When + TestServerMonitorListener listener = createTestServerMonitorListener(); + monitor = createAndStartMonitor(createConnectionFactory(mockConnection), listener); + + listener.waitForEvents(ServerHeartbeatSucceededEvent.class, event -> true, 1, Duration.ofSeconds(30)); + ServerHeartbeatStartedEvent startedEvent = getEvent(ServerHeartbeatStartedEvent.class, listener); + ServerHeartbeatSucceededEvent succeededEvent = getEvent(ServerHeartbeatSucceededEvent.class, listener); + + // Then + assertEquals(connectionDescription.getConnectionId(), startedEvent.getConnectionId()); + assertEquals(connectionDescription.getConnectionId(), succeededEvent.getConnectionId()); + assertEquals(BsonDocument.parse(helloResponse), succeededEvent.getReply()); + assertTrue(succeededEvent.getElapsedTime(TimeUnit.NANOSECONDS) > 0); + } + + @Test + void shouldSendStartedAndFailedHeartbeatEvents() throws Exception { + // Given + ConnectionDescription connectionDescription = createDefaultConnectionDescription(); + ServerDescription initialServerDescription = createDefaultServerDescription(); + MongoSocketReadTimeoutException exception = new MongoSocketReadTimeoutException("read timeout", + new ServerAddress(), new IOException()); + + InternalConnection mockConnection = mock(InternalConnection.class); + when(mockConnection.getDescription()).thenReturn(connectionDescription); + when(mockConnection.getInitialServerDescription()).thenReturn(initialServerDescription); + when(mockConnection.getBuffer(anyInt())).thenReturn(new ByteBufNIO(ByteBuffer.allocate(1024))); + when(mockConnection.receive(any(), any())).thenThrow(exception); + + // When + TestServerMonitorListener listener = createTestServerMonitorListener(); + monitor = createAndStartMonitor(createConnectionFactory(mockConnection), listener); + + listener.waitForEvents(ServerHeartbeatFailedEvent.class, event -> true, 1, Duration.ofSeconds(30)); + ServerHeartbeatStartedEvent startedEvent = getEvent(ServerHeartbeatStartedEvent.class, listener); + ServerHeartbeatFailedEvent failedEvent = getEvent(ServerHeartbeatFailedEvent.class, listener); + + // Then + assertEquals(connectionDescription.getConnectionId(), startedEvent.getConnectionId()); + assertEquals(connectionDescription.getConnectionId(), failedEvent.getConnectionId()); + assertEquals(exception, failedEvent.getThrowable()); + assertTrue(failedEvent.getElapsedTime(TimeUnit.NANOSECONDS) > 0); + } + + @Test + void shouldEmitHeartbeatStartedBeforeSocketIsConnected() throws Exception { + // Given + InternalConnection mockConnection = mock(InternalConnection.class); + CountDownLatch latch = new CountDownLatch(1); + List events = new ArrayList<>(); + ServerMonitorListener listener = new ServerMonitorListener() { + @Override + public void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { + events.add("serverHeartbeatStartedEvent"); + } + + @Override + public void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) { + events.add("serverHeartbeatSucceededEvent"); + latch.countDown(); + } + + @Override + public void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) { + events.add("serverHeartbeatFailedEvent"); + latch.countDown(); + } + }; + + doAnswer(invocation -> { + events.add("client connected"); + return null; + }).when(mockConnection).open(any()); + + when(mockConnection.getBuffer(anyInt())).thenReturn(new ByteBufNIO(ByteBuffer.allocate(1024))); + when(mockConnection.getDescription()).thenReturn(createDefaultConnectionDescription()); + when(mockConnection.getInitialServerDescription()).thenReturn(createDefaultServerDescription()); + + doAnswer(invocation -> { + events.add("client hello received"); + throw new SocketException("Socket error"); + }).when(mockConnection).receive(any(), any()); + + // When + monitor = createAndStartMonitor(createConnectionFactory(mockConnection), listener); + assertTrue(latch.await(5, TimeUnit.SECONDS), "Timed out waiting for heartbeat"); + + // Then + List expectedEvents = asList("serverHeartbeatStartedEvent", "client connected", "client hello received", "serverHeartbeatFailedEvent"); + assertEquals(expectedEvents, events); + } + + + private InternalConnectionFactory createConnectionFactory(final InternalConnection connection) { + InternalConnectionFactory factory = mock(InternalConnectionFactory.class); + when(factory.create(any())).thenReturn(connection); + return factory; + } + + private ServerDescription createDefaultServerDescription() { + return ServerDescription.builder() + .ok(true) + .address(new ServerAddress()) + .type(ServerType.STANDALONE) + .state(ServerConnectionState.CONNECTED) + .build(); + } + + private ConnectionDescription createDefaultConnectionDescription() { + return new ConnectionDescription(new ServerId(new ClusterId(""), new ServerAddress())); + } + + private DefaultServerMonitor createAndStartMonitor(final InternalConnectionFactory factory, final ServerMonitorListener listener) { + DefaultServerMonitor monitor = new DefaultServerMonitor( + new ServerId(new ClusterId(), new ServerAddress()), + ServerSettings.builder() + .heartbeatFrequency(500, TimeUnit.MILLISECONDS) + .addServerMonitorListener(listener) + .build(), + factory, + ClusterConnectionMode.SINGLE, + null, + false, + SameObjectProvider.initialized(mock(SdamServerDescriptionManager.class)), + OPERATION_CONTEXT_FACTORY); + monitor.start(); + return monitor; + } + + private T getEvent(final Class clazz, final TestServerMonitorListener listener) { + return listener.getEvents() + .stream() + .filter(clazz::isInstance) + .map(clazz::cast) + .findFirst() + .orElseThrow(AssertionFailedError::new); + } + + private TestServerMonitorListener createTestServerMonitorListener() { + return new TestServerMonitorListener(asList("serverHeartbeatStartedEvent", "serverHeartbeatSucceededEvent", + "serverHeartbeatFailedEvent")); + } +} diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy index 93bc656226a..156499797c2 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/InternalStreamConnectionInitializerSpecification.groovy @@ -61,14 +61,14 @@ class InternalStreamConnectionInitializerSpecification extends Specification { def initializer = new InternalStreamConnectionInitializer(SINGLE, null, null, [], null) when: - enqueueSuccessfulReplies(false, null) + enqueueSuccessfulReplies(false, 123) def description = initializer.startHandshake(internalConnection, operationContext) description = initializer.finishHandshake(internalConnection, description, operationContext) def connectionDescription = description.connectionDescription def serverDescription = description.serverDescription then: - connectionDescription == getExpectedConnectionDescription(connectionDescription.connectionId.localValue, null) + connectionDescription == getExpectedConnectionDescription(connectionDescription.connectionId.localValue, 123) serverDescription == getExpectedServerDescription(serverDescription) } @@ -77,7 +77,7 @@ class InternalStreamConnectionInitializerSpecification extends Specification { def initializer = new InternalStreamConnectionInitializer(SINGLE, null, null, [], null) when: - enqueueSuccessfulReplies(false, null) + enqueueSuccessfulReplies(false, 123) def futureCallback = new FutureResultCallback() initializer.startHandshakeAsync(internalConnection, operationContext, futureCallback) def description = futureCallback.get() @@ -88,7 +88,7 @@ class InternalStreamConnectionInitializerSpecification extends Specification { def serverDescription = description.serverDescription then: - connectionDescription == getExpectedConnectionDescription(connectionDescription.connectionId.localValue, null) + connectionDescription == getExpectedConnectionDescription(connectionDescription.connectionId.localValue, 123) serverDescription == getExpectedServerDescription(serverDescription) } @@ -106,20 +106,6 @@ class InternalStreamConnectionInitializerSpecification extends Specification { connectionDescription == getExpectedConnectionDescription(connectionDescription.connectionId.localValue, 123) } - def 'should create correct description with server connection id from hello'() { - given: - def initializer = new InternalStreamConnectionInitializer(SINGLE, null, null, [], null) - - when: - enqueueSuccessfulRepliesWithConnectionIdIsHelloResponse(false, 123) - def internalDescription = initializer.startHandshake(internalConnection, operationContext) - def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription, operationContext) - .connectionDescription - - then: - connectionDescription == getExpectedConnectionDescription(connectionDescription.connectionId.localValue, 123) - } - def 'should create correct description with server connection id asynchronously'() { given: def initializer = new InternalStreamConnectionInitializer(SINGLE, null, null, [], null) @@ -137,31 +123,13 @@ class InternalStreamConnectionInitializerSpecification extends Specification { connectionDescription == getExpectedConnectionDescription(connectionDescription.connectionId.localValue, 123) } - def 'should create correct description with server connection id from hello asynchronously'() { - given: - def initializer = new InternalStreamConnectionInitializer(SINGLE, null, null, [], null) - - when: - enqueueSuccessfulRepliesWithConnectionIdIsHelloResponse(false, 123) - def futureCallback = new FutureResultCallback() - initializer.startHandshakeAsync(internalConnection, operationContext, futureCallback) - def description = futureCallback.get() - futureCallback = new FutureResultCallback() - initializer.finishHandshakeAsync(internalConnection, description, operationContext, futureCallback) - description = futureCallback.get() - def connectionDescription = description.connectionDescription - - then: - connectionDescription == getExpectedConnectionDescription(connectionDescription.connectionId.localValue, 123) - } - def 'should authenticate'() { given: def firstAuthenticator = Mock(Authenticator) def initializer = new InternalStreamConnectionInitializer(SINGLE, firstAuthenticator, null, [], null) when: - enqueueSuccessfulReplies(false, null) + enqueueSuccessfulReplies(false, 123) def internalDescription = initializer.startHandshake(internalConnection, operationContext) def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription, operationContext) @@ -178,7 +146,7 @@ class InternalStreamConnectionInitializerSpecification extends Specification { def initializer = new InternalStreamConnectionInitializer(SINGLE, authenticator, null, [], null) when: - enqueueSuccessfulReplies(false, null) + enqueueSuccessfulReplies(false, 123) def futureCallback = new FutureResultCallback() initializer.startHandshakeAsync(internalConnection, operationContext, futureCallback) @@ -198,7 +166,7 @@ class InternalStreamConnectionInitializerSpecification extends Specification { def initializer = new InternalStreamConnectionInitializer(SINGLE, authenticator, null, [], null) when: - enqueueSuccessfulReplies(true, null) + enqueueSuccessfulReplies(true, 123) def internalDescription = initializer.startHandshake(internalConnection, operationContext) def connectionDescription = initializer.finishHandshake(internalConnection, internalDescription, operationContext) @@ -215,7 +183,7 @@ class InternalStreamConnectionInitializerSpecification extends Specification { def initializer = new InternalStreamConnectionInitializer(SINGLE, authenticator, null, [], null) when: - enqueueSuccessfulReplies(true, null) + enqueueSuccessfulReplies(true, 123) def futureCallback = new FutureResultCallback() initializer.startHandshakeAsync(internalConnection, operationContext, futureCallback) @@ -240,7 +208,7 @@ class InternalStreamConnectionInitializerSpecification extends Specification { } when: - enqueueSuccessfulReplies(false, null) + enqueueSuccessfulReplies(false, 123) if (async) { def callback = new FutureResultCallback() initializer.startHandshakeAsync(internalConnection, operationContext, callback) @@ -277,7 +245,7 @@ class InternalStreamConnectionInitializerSpecification extends Specification { } when: - enqueueSuccessfulReplies(false, null) + enqueueSuccessfulReplies(false, 123) if (async) { def callback = new FutureResultCallback() initializer.startHandshakeAsync(internalConnection, operationContext, callback) @@ -477,25 +445,12 @@ class InternalStreamConnectionInitializerSpecification extends Specification { } def enqueueSuccessfulReplies(final boolean isArbiter, final Integer serverConnectionId) { - internalConnection.enqueueReply(buildSuccessfulReply( - '{ok: 1, ' + - 'maxWireVersion: 3' + - (isArbiter ? ', isreplicaset: true, arbiterOnly: true' : '') + - '}')) - internalConnection.enqueueReply(buildSuccessfulReply( - '{ok: 1 ' + - (serverConnectionId == null ? '' : ', connectionId: ' + serverConnectionId) + - '}')) - } - - def enqueueSuccessfulRepliesWithConnectionIdIsHelloResponse(final boolean isArbiter, final Integer serverConnectionId) { internalConnection.enqueueReply(buildSuccessfulReply( '{ok: 1, ' + 'maxWireVersion: 3,' + 'connectionId: ' + serverConnectionId + (isArbiter ? ', isreplicaset: true, arbiterOnly: true' : '') + '}')) - internalConnection.enqueueReply(buildSuccessfulReply('{ok: 1, versionArray : [3, 0, 0]}')) } def enqueueSpeculativeAuthenticationResponsesForScramSha256() { diff --git a/driver-sync/src/test/functional/com/mongodb/client/ClusterEventPublishingTest.java b/driver-sync/src/test/functional/com/mongodb/client/ClusterEventPublishingTest.java index 6b10e475249..e390d4c3afc 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ClusterEventPublishingTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ClusterEventPublishingTest.java @@ -155,17 +155,18 @@ public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) @Override public void serverHearbeatStarted(final ServerHeartbeatStartedEvent event) { events.add(event); - heartbeatLatch.countDown(); } @Override public void serverHeartbeatSucceeded(final ServerHeartbeatSucceededEvent event) { events.add(event); + heartbeatLatch.countDown(); } @Override public void serverHeartbeatFailed(final ServerHeartbeatFailedEvent event) { events.add(event); + heartbeatLatch.countDown(); } } } diff --git a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java index 1887b2006cd..77883b6be73 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ServerDiscoveryAndMonitoringProseTests.java @@ -166,7 +166,7 @@ public void serverDescriptionChanged(final ServerDescriptionChangedEvent event) * Connection Pool Management. */ @Test - @Ignore + @Ignore("JAVA-4484 - events are not guaranteed to be delivered in order") @SuppressWarnings("try") public void testConnectionPoolManagement() throws InterruptedException { assumeTrue(serverVersionAtLeast(4, 3)); @@ -232,7 +232,7 @@ public void connectionPoolCleared(final ConnectionPoolClearedEvent event) { */ @Test @SuppressWarnings("try") - public void monitorsSleepAtLeastMinHeartbeatFreqencyMSBetweenChecks() { + public void monitorsSleepAtLeastMinHeartbeatFrequencyMSBetweenChecks() { assumeTrue(serverVersionAtLeast(4, 3)); assumeFalse(isServerlessTest()); long defaultMinHeartbeatIntervalMillis = MongoClientSettings.builder().build().getServerSettings() @@ -267,6 +267,13 @@ public void monitorsSleepAtLeastMinHeartbeatFreqencyMSBetweenChecks() { } } + @Test + @Ignore("Run as part of DefaultServerMonitorTest") + public void shouldEmitHeartbeatStartedBeforeSocketIsConnected() { + // The implementation of this test is in DefaultServerMonitorTest.shouldEmitHeartbeatStartedBeforeSocketIsConnected + // As it requires mocking and package access to `com.mongodb.internal.connection` + } + private static void assertPoll(final BlockingQueue queue, @Nullable final Class allowed, final Set> required) throws InterruptedException { assertPoll(queue, allowed, required, Timeout.expiresIn(TEST_WAIT_TIMEOUT_MILLIS, MILLISECONDS, ZERO_DURATION_MEANS_EXPIRED)); diff --git a/driver-sync/src/test/functional/com/mongodb/internal/connection/OidcAuthenticationProseTests.java b/driver-sync/src/test/functional/com/mongodb/internal/connection/OidcAuthenticationProseTests.java index b6a23a576ce..2b0544f0c5a 100644 --- a/driver-sync/src/test/functional/com/mongodb/internal/connection/OidcAuthenticationProseTests.java +++ b/driver-sync/src/test/functional/com/mongodb/internal/connection/OidcAuthenticationProseTests.java @@ -24,9 +24,12 @@ import com.mongodb.MongoSecurityException; import com.mongodb.MongoSocketException; import com.mongodb.assertions.Assertions; +import com.mongodb.client.ClientSession; +import com.mongodb.client.FindIterable; import com.mongodb.client.Fixture; import com.mongodb.client.MongoClient; import com.mongodb.client.MongoClients; +import com.mongodb.client.MongoCollection; import com.mongodb.client.TestListener; import com.mongodb.event.CommandListener; import com.mongodb.lang.Nullable; @@ -334,12 +337,17 @@ public void test3p3UnexpectedErrorDoesNotClearCache() { @Test public void test4p1Reauthentication() { + testReauthentication(false); + } + + private void testReauthentication(final boolean inSession) { TestCallback callback = createCallback(); MongoClientSettings clientSettings = createSettings(callback); - try (MongoClient mongoClient = createMongoClient(clientSettings)) { + try (MongoClient mongoClient = createMongoClient(clientSettings); + ClientSession session = inSession ? mongoClient.startSession() : null) { failCommand(391, 1, "find"); // #. Perform a find operation that succeeds. - performFind(mongoClient); + performFind(mongoClient, session); } assertEquals(2, callback.invocations.get()); } @@ -392,6 +400,11 @@ private static void performInsert(final MongoClient mongoClient) { .insertOne(Document.parse("{ x: 1 }")); } + @Test + public void test4p5ReauthenticationInSession() { + testReauthentication(true); + } + @Test public void test5p1AzureSucceedsWithNoUsername() { assumeAzure(); @@ -914,12 +927,14 @@ private void assertFindFails( } } - private void performFind(final MongoClient mongoClient) { - mongoClient - .getDatabase("test") - .getCollection("test") - .find() - .first(); + private static void performFind(final MongoClient mongoClient) { + performFind(mongoClient, null); + } + + private static void performFind(final MongoClient mongoClient, @Nullable final ClientSession session) { + MongoCollection collection = mongoClient.getDatabase("test").getCollection("test"); + FindIterable findIterable = session == null ? collection.find() : collection.find(session); + findIterable.first(); } protected void delayNextFind() {