Skip to content

Commit 8c5e3a2

Browse files
Add field expiration options to reactive API.
1 parent f5239b8 commit 8c5e3a2

File tree

8 files changed

+206
-255
lines changed

8 files changed

+206
-255
lines changed

src/main/java/org/springframework/data/redis/connection/ReactiveHashCommands.java

Lines changed: 119 additions & 202 deletions
Large diffs are not rendered by default.

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceHashCommands.java

Lines changed: 5 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -222,22 +222,14 @@ public Cursor<Entry<byte[], byte[]>> hScan(byte[] key, ScanOptions options) {
222222
return hPersist(key, fields);
223223
}
224224

225-
if (ObjectUtils.nullSafeEquals(FieldExpirationOptions.none(), options)) {
226-
if (ObjectUtils.nullSafeEquals(TimeUnit.MILLISECONDS, expiration.getTimeUnit())) {
227-
if (expiration.isUnixTimestamp()) {
228-
return hpExpireAt(key, expiration.getExpirationTimeInMilliseconds(), fields);
229-
}
230-
return hpExpire(key, expiration.getExpirationTimeInMilliseconds(), fields);
231-
}
232-
if (expiration.isUnixTimestamp()) {
233-
return hExpireAt(key, expiration.getExpirationTimeInSeconds(), fields);
234-
}
235-
return hExpire(key, expiration.getExpirationTimeInSeconds(), fields);
236-
}
237-
238225
ExpireArgs option = new ExpireArgs() {
239226
@Override
240227
public <K, V> void build(CommandArgs<K, V> args) {
228+
229+
if(ObjectUtils.nullSafeEquals(options, FieldExpirationOptions.none())) {
230+
return;
231+
}
232+
241233
args.add(options.getCondition().name());
242234
}
243235
};

src/main/java/org/springframework/data/redis/connection/lettuce/LettuceReactiveHashCommands.java

Lines changed: 34 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,10 @@
1515
*/
1616
package org.springframework.data.redis.connection.lettuce;
1717

18+
import io.lettuce.core.ExpireArgs;
1819
import io.lettuce.core.KeyValue;
1920
import io.lettuce.core.ScanStream;
21+
import io.lettuce.core.protocol.CommandArgs;
2022
import reactor.core.publisher.Flux;
2123
import reactor.core.publisher.Mono;
2224

@@ -25,10 +27,11 @@
2527
import java.util.List;
2628
import java.util.Map;
2729
import java.util.Map.Entry;
30+
import java.util.concurrent.TimeUnit;
2831
import java.util.stream.Collectors;
2932

3033
import org.reactivestreams.Publisher;
31-
34+
import org.springframework.data.redis.connection.Hash.FieldExpirationOptions;
3235
import org.springframework.data.redis.connection.ReactiveHashCommands;
3336
import org.springframework.data.redis.connection.ReactiveRedisConnection.BooleanResponse;
3437
import org.springframework.data.redis.connection.ReactiveRedisConnection.CommandResponse;
@@ -38,6 +41,7 @@
3841
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
3942
import org.springframework.data.redis.connection.convert.Converters;
4043
import org.springframework.util.Assert;
44+
import org.springframework.util.ObjectUtils;
4145

4246
/**
4347
* @author Christoph Strobl
@@ -265,49 +269,48 @@ public Flux<NumericResponse<HStrLenCommand, Long>> hStrLen(Publisher<HStrLenComm
265269
}
266270

267271
@Override
268-
public Flux<NumericResponse<Expire, Long>> hExpire(Publisher<Expire> commands) {
272+
public Flux<NumericResponse<ExpireCommand, Long>> expireHashField(Publisher<ExpireCommand> commands) {
269273
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
270274

271275
Assert.notNull(command.getKey(), "Key must not be null");
272276
Assert.notNull(command.getFields(), "Fields must not be null");
273277

274-
return cmd.hexpire(command.getKey(), command.getTtl().toSeconds(), command.getFields().toArray(ByteBuffer[]::new))
275-
.map(value -> new NumericResponse<>(command, value));
276-
}));
277-
}
278-
279-
@Override
280-
public Flux<NumericResponse<Expire, Long>> hpExpire(Publisher<Expire> commands) {
281-
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
278+
ByteBuffer[] fields = command.getFields().toArray(ByteBuffer[]::new);
282279

283-
Assert.notNull(command.getKey(), "Key must not be null");
284-
Assert.notNull(command.getFields(), "Fields must not be null");
280+
if (command.getExpiration().isPersistent()) {
281+
return cmd.hpersist(command.getKey(), fields).map(value -> new NumericResponse<>(command, value));
282+
}
285283

286-
return cmd.hpexpire(command.getKey(), command.getTtl().toMillis(), command.getFields().toArray(ByteBuffer[]::new))
287-
.map(value -> new NumericResponse<>(command, value));
288-
}));
289-
}
284+
ExpireArgs args = new ExpireArgs() {
290285

291-
@Override
292-
public Flux<NumericResponse<ExpireAt, Long>> hExpireAt(Publisher<ExpireAt> commands) {
293-
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
286+
@Override
287+
public <K, V> void build(CommandArgs<K, V> args) {
288+
super.build(args);
289+
if (ObjectUtils.nullSafeEquals(command.getOptions(), FieldExpirationOptions.none())) {
290+
return;
291+
}
294292

295-
Assert.notNull(command.getKey(), "Key must not be null");
296-
Assert.notNull(command.getFields(), "Fields must not be null");
293+
args.add(command.getOptions().getCondition().name());
294+
}
295+
};
297296

298-
return cmd.hexpireat(command.getKey(), command.getExpireAt().getEpochSecond(), command.getFields().toArray(ByteBuffer[]::new))
299-
.map(value -> new NumericResponse<>(command, value));
300-
}));
301-
}
297+
if (command.getExpiration().isUnixTimestamp()) {
302298

303-
@Override
304-
public Flux<NumericResponse<ExpireAt, Long>> hpExpireAt(Publisher<ExpireAt> commands) {
305-
return connection.execute(cmd -> Flux.from(commands).concatMap(command -> {
299+
if (command.getExpiration().getTimeUnit().equals(TimeUnit.MILLISECONDS)) {
300+
return cmd
301+
.hpexpireat(command.getKey(), command.getExpiration().getExpirationTimeInMilliseconds(), args, fields)
302+
.map(value -> new NumericResponse<>(command, value));
303+
}
304+
return cmd.hexpireat(command.getKey(), command.getExpiration().getExpirationTimeInSeconds(), args, fields)
305+
.map(value -> new NumericResponse<>(command, value));
306+
}
306307

307-
Assert.notNull(command.getKey(), "Key must not be null");
308-
Assert.notNull(command.getFields(), "Fields must not be null");
308+
if (command.getExpiration().getTimeUnit().equals(TimeUnit.MILLISECONDS)) {
309+
return cmd.hpexpire(command.getKey(), command.getExpiration().getExpirationTimeInMilliseconds(), args, fields)
310+
.map(value -> new NumericResponse<>(command, value));
311+
}
309312

310-
return cmd.hpexpireat(command.getKey(), command.getExpireAt().toEpochMilli(), command.getFields().toArray(ByteBuffer[]::new))
313+
return cmd.hexpire(command.getKey(), command.getExpiration().getExpirationTimeInSeconds(), args, fields)
311314
.map(value -> new NumericResponse<>(command, value));
312315
}));
313316
}

src/main/java/org/springframework/data/redis/core/DefaultReactiveHashOperations.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18+
import org.springframework.data.redis.connection.Hash.FieldExpirationOptions;
19+
import org.springframework.data.redis.connection.ReactiveHashCommands.ExpireCommand;
20+
import org.springframework.data.redis.connection.ReactiveRedisConnection.NumericResponse;
21+
import org.springframework.data.redis.core.types.Expiration;
1822
import reactor.core.publisher.Flux;
1923
import reactor.core.publisher.Mono;
2024

@@ -239,18 +243,18 @@ public Flux<Map.Entry<HK, HV>> scan(H key, ScanOptions options) {
239243

240244
@Override
241245
public Mono<ExpireChanges<HK>> expire(H key, Duration timeout, Collection<HK> hashKeys) {
246+
return expire(key, Expiration.from(timeout), FieldExpirationOptions.none(), hashKeys);
247+
}
248+
249+
@Override
250+
public Mono<ExpireChanges<HK>> expire(H key, Expiration expiration, FieldExpirationOptions options, Collection<HK> hashKeys) {
242251

243252
List<HK> orderedKeys = List.copyOf(hashKeys);
244253
ByteBuffer rawKey = rawKey(key);
245254
List<ByteBuffer> rawHashKeys = orderedKeys.stream().map(this::rawHashKey).toList();
246255

247-
Mono<List<Long>> raw = createFlux(connection -> {
248-
249-
if (TimeoutUtils.hasMillis(timeout)) {
250-
return connection.hpExpire(rawKey, timeout, rawHashKeys);
251-
}
252-
253-
return connection.hExpire(rawKey, timeout, rawHashKeys);
256+
Mono<List<Long>> raw =createFlux(connection -> {
257+
return connection.expireHashField(Mono.just(ExpireCommand.expire(rawHashKeys, expiration).from(rawKey).withOptions(options))).map(NumericResponse::getOutput);
254258
}).collectList();
255259

256260
return raw.map(values -> ExpireChanges.of(orderedKeys, values));

src/main/java/org/springframework/data/redis/core/ReactiveHashOperations.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
*/
1616
package org.springframework.data.redis.core;
1717

18+
import org.springframework.data.redis.connection.Hash.FieldExpirationOptions;
19+
import org.springframework.data.redis.core.types.Expiration;
1820
import reactor.core.publisher.Flux;
1921
import reactor.core.publisher.Mono;
2022

@@ -237,6 +239,8 @@ default Flux<Map.Entry<HK, HV>> scan(H key) {
237239

238240
Mono<ExpireChanges<HK>> expire(H key, Duration timeout, Collection<HK> hashKeys);
239241

242+
Mono<ExpireChanges<HK>> expire(H key, Expiration expiration, FieldExpirationOptions options, Collection<HK> hashKeys);
243+
240244
/**
241245
* Set the expiration for given {@code hashKey} (aka field) as a {@literal date} timestamp.
242246
*

src/main/java/org/springframework/data/redis/core/types/Expiration.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Objects;
2020
import java.util.concurrent.TimeUnit;
2121

22+
import org.springframework.data.redis.core.TimeoutUtils;
2223
import org.springframework.lang.Nullable;
2324
import org.springframework.util.Assert;
2425
import org.springframework.util.ObjectUtils;
@@ -105,8 +106,8 @@ public static Expiration from(Duration duration) {
105106
Assert.notNull(duration, "Duration must not be null");
106107

107108
return duration.isZero() ? Expiration.persistent()
108-
: duration.toMillis() % 1000 == 0 ? new Expiration(duration.getSeconds(), TimeUnit.SECONDS)
109-
: new Expiration(duration.toMillis(), TimeUnit.MILLISECONDS);
109+
: TimeoutUtils.hasMillis(duration) ? new Expiration(duration.toMillis(), TimeUnit.MILLISECONDS)
110+
: new Expiration(duration.getSeconds(), TimeUnit.SECONDS);
110111
}
111112

112113
/**

src/test/java/org/springframework/data/redis/core/DefaultHashOperationsIntegrationTests.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -322,6 +322,7 @@ void testExpireWithOptionsNone() {
322322
}
323323

324324
@ParameterizedRedisTest
325+
@EnabledOnCommand("HEXPIRE")
325326
void testExpireWithOptions() {
326327

327328
K key = keyFactory.instance();

src/test/java/org/springframework/data/redis/core/DefaultReactiveHashOperationsIntegrationTests.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static org.assertj.core.api.Assumptions.assumeThat;
2020
import static org.junit.jupiter.api.condition.OS.MAC;
2121

22+
import org.springframework.data.redis.connection.Hash.FieldExpirationOptions;
2223
import reactor.test.StepVerifier;
2324

2425
import java.time.Duration;
@@ -529,6 +530,34 @@ void testExpireAndGetExpireMillis() {
529530
}).verifyComplete();
530531
}
531532

533+
@ParameterizedRedisTest
534+
@EnabledOnCommand("HEXPIRE")
535+
void testExpireWithOptions() {
536+
537+
K key = keyFactory.instance();
538+
HK key1 = hashKeyFactory.instance();
539+
HV val1 = hashValueFactory.instance();
540+
HK key2 = hashKeyFactory.instance();
541+
HV val2 = hashValueFactory.instance();
542+
543+
putAll(key, key1, val1, key2, val2);
544+
545+
hashOperations.expire(key, org.springframework.data.redis.core.types.Expiration.seconds(20), FieldExpirationOptions.none(), List.of(key1)).as(StepVerifier::create)//
546+
.assertNext(changes -> {
547+
assertThat(changes.allOk()).isTrue();
548+
}).verifyComplete();
549+
hashOperations.expire(key, org.springframework.data.redis.core.types.Expiration.seconds(60), FieldExpirationOptions.none(), List.of(key2)).as(StepVerifier::create)//
550+
.assertNext(changes -> {
551+
assertThat(changes.allOk()).isTrue();
552+
}).verifyComplete();
553+
554+
hashOperations.expire(key, org.springframework.data.redis.core.types.Expiration.seconds(30), FieldExpirationOptions.builder().gt().build(), List.of(key1, key2)).as(StepVerifier::create)//
555+
.assertNext(changes -> {
556+
assertThat(changes.ok()).containsExactly(key1);
557+
assertThat(changes.skipped()).containsExactly(key2);
558+
}).verifyComplete();
559+
}
560+
532561
@ParameterizedRedisTest
533562
@EnabledOnCommand("HEXPIRE")
534563
void testExpireAndGetExpireSeconds() {

0 commit comments

Comments
 (0)