Skip to content

race condition leads to exceptions when spring context is being shutdown and there is a stream redis listener #2262

Closed
@lrozek

Description

@lrozek

there is a race condition in following methods calls RedisConnectionFactory(LettuceConnectionFactory)#destroy and StreamPollTask#doLoop and DefaultStreamMessageListenerContainer#stop

when DefaultStreamMessageListenerContainer#stop is being called as a result of spring context being shutdown it does not wait until StreamPollTask#doLoop is finished, meaning that redis connection is still in use. Later on redis connection is being closed as a result of
RedisConnectionFactory#destroy while StreamPollTask#doLoop is still doing its last iteration.

This might lead to following exceptions, depending what has finished in LettuceConnectionFactory#destroy. The easiest way to reproduce following exceptions is by stepping through LettuceConnectionFactory#destroy and than resuming StreamPollTask#readRecords

2022-02-16 10:44:27.782  WARN 25768 --- [cTaskExecutor-1] io.lettuce.core.RedisChannelHandler      : Connection is already closed
2022-02-16 10:44:27.787 ERROR 25768 --- [cTaskExecutor-1] ageListenerContainer$LoggingErrorHandler : Unexpected error occurred in scheduled task.

org.springframework.data.redis.RedisSystemException: Redis exception; nested exception is io.lettuce.core.RedisException: Connection closed
	at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:74) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceExceptionConverter.convert(LettuceExceptionConverter.java:41) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.PassThroughExceptionTranslationStrategy.translate(PassThroughExceptionTranslationStrategy.java:44) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.FallbackExceptionTranslationStrategy.translate(FallbackExceptionTranslationStrategy.java:42) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.convertLettuceAccessException(LettuceConnection.java:272) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.await(LettuceConnection.java:1063) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnection.lambda$doInvoke$4(LettuceConnection.java:920) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceInvoker$Synchronizer.invoke(LettuceInvoker.java:673) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceInvoker$DefaultManyInvocationSpec.toList(LettuceInvoker.java:618) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceStreamCommands.xRead(LettuceStreamCommands.java:323) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$null$5(DefaultStreamMessageListenerContainer.java:263) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:223) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$6(DefaultStreamMessageListenerContainer.java:262) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.1.jar!/:2.6.1]
	at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
Caused by: io.lettuce.core.RedisException: Connection closed
	at io.lettuce.core.protocol.DefaultEndpoint.notifyDrainQueuedCommands(DefaultEndpoint.java:679) ~[lettuce-core-6.1.6.RELEASE.jar!/:6.1.6.RELEASE]
	at io.lettuce.core.protocol.CommandHandler.channelInactive(CommandHandler.java:358) ~[lettuce-core-6.1.6.RELEASE.jar!/:6.1.6.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.lettuce.core.protocol.RedisHandshakeHandler.channelInactive(RedisHandshakeHandler.java:89) ~[lettuce-core-6.1.6.RELEASE.jar!/:6.1.6.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:81) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.lettuce.core.ChannelGroupListener.channelInactive(ChannelGroupListener.java:69) ~[lettuce-core-6.1.6.RELEASE.jar!/:6.1.6.RELEASE]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:241) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelInactive(DefaultChannelPipeline.java:1405) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:262) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:248) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:901) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:813) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.util.concurrent.AbstractEventExecutor.safeExecute(AbstractEventExecutor.java:164) ~[netty-common-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:469) ~[netty-common-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:503) ~[netty-transport-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:986) ~[netty-common-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[netty-common-4.1.73.Final.jar!/:4.1.73.Final]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.73.Final.jar!/:4.1.73.Final]
	... 1 common frames omitted
2022-02-16 10:49:23.331  WARN 26174 --- [cTaskExecutor-1] io.netty.channel.AbstractChannel         : Force-closing a channel whose registration task was not accepted by an event loop: [id: 0x985b4d05]

java.util.concurrent.RejectedExecutionException: event executor terminated
	at io.netty.util.concurrent.SingleThreadEventExecutor.reject(SingleThreadEventExecutor.java:923) ~[netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.offerTask(SingleThreadEventExecutor.java:350) ~[netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.addTask(SingleThreadEventExecutor.java:343) ~[netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:825) ~[netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.util.concurrent.SingleThreadEventExecutor.execute(SingleThreadEventExecutor.java:815) ~[netty-common-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.channel.AbstractChannel$AbstractUnsafe.register(AbstractChannel.java:483) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:87) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.channel.SingleThreadEventLoop.register(SingleThreadEventLoop.java:81) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.channel.MultithreadEventLoopGroup.register(MultithreadEventLoopGroup.java:86) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.bootstrap.AbstractBootstrap.initAndRegister(AbstractBootstrap.java:323) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.bootstrap.Bootstrap.doResolveAndConnect(Bootstrap.java:155) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.netty.bootstrap.Bootstrap.connect(Bootstrap.java:139) ~[netty-transport-4.1.73.Final.jar:4.1.73.Final]
	at io.lettuce.core.AbstractRedisClient.initializeChannelAsync0(AbstractRedisClient.java:409) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at io.lettuce.core.AbstractRedisClient.lambda$initializeChannelAsync$2(AbstractRedisClient.java:391) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at reactor.core.publisher.LambdaMonoSubscriber.onNext(LambdaMonoSubscriber.java:171) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.FluxPeekFuseable$PeekFuseableSubscriber.onNext(FluxPeekFuseable.java:210) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.FluxPeekFuseable$PeekConditionalSubscriber.onNext(FluxPeekFuseable.java:854) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:62) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.MonoDefer.subscribe(MonoDefer.java:52) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4400) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Mono.subscribeWith(Mono.java:4515) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4371) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4307) ~[reactor-core-3.4.14.jar:3.4.14]
	at reactor.core.publisher.Mono.subscribe(Mono.java:4279) ~[reactor-core-3.4.14.jar:3.4.14]
	at io.lettuce.core.AbstractRedisClient.initializeChannelAsync(AbstractRedisClient.java:386) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at io.lettuce.core.RedisClient.connectStatefulAsync(RedisClient.java:325) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at io.lettuce.core.RedisClient.connectStandaloneAsync(RedisClient.java:287) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at io.lettuce.core.RedisClient.connect(RedisClient.java:216) ~[lettuce-core-6.1.6.RELEASE.jar:6.1.6.RELEASE]
	at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.lambda$getConnection$1(StandaloneConnectionProvider.java:115) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at java.base/java.util.Optional.orElseGet(Optional.java:362) ~[na:na]
	at org.springframework.data.redis.connection.lettuce.StandaloneConnectionProvider.getConnection(StandaloneConnectionProvider.java:115) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$ExceptionTranslatingConnectionProvider.getConnection(LettuceConnectionFactory.java:1595) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getNativeConnection(LettuceConnectionFactory.java:1383) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory$SharedConnection.getConnection(LettuceConnectionFactory.java:1366) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getSharedConnection(LettuceConnectionFactory.java:1093) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getConnection(LettuceConnectionFactory.java:421) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.fetchConnection(RedisConnectionUtils.java:193) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:144) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:105) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:210) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$6(DefaultStreamMessageListenerContainer.java:262) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]
2022-02-16 10:54:17.735 ERROR 26812 --- [cTaskExecutor-1] ageListenerContainer$LoggingErrorHandler : Unexpected error occurred in scheduled task.

java.lang.IllegalStateException: LettuceConnectionFactory was destroyed and cannot be used anymore
	at org.springframework.util.Assert.state(Assert.java:76) ~[spring-core-5.3.15.jar:5.3.15]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.assertInitialized(LettuceConnectionFactory.java:1263) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory.getConnection(LettuceConnectionFactory.java:414) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.fetchConnection(RedisConnectionUtils.java:193) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.doGetConnection(RedisConnectionUtils.java:144) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisConnectionUtils.getConnection(RedisConnectionUtils.java:105) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:210) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:190) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.core.RedisTemplate.execute(RedisTemplate.java:177) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.DefaultStreamMessageListenerContainer.lambda$getReadFunction$6(DefaultStreamMessageListenerContainer.java:262) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.readRecords(StreamPollTask.java:166) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.doLoop(StreamPollTask.java:147) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at org.springframework.data.redis.stream.StreamPollTask.run(StreamPollTask.java:132) ~[spring-data-redis-2.6.1.jar:2.6.1]
	at java.base/java.lang.Thread.run(Thread.java:832) ~[na:na]

Metadata

Metadata

Assignees

No one assigned

    Labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions