Skip to content

Commit a715cdb

Browse files
twz123slandelle
authored andcommitted
Handle onTrailingHeadersReceived in AHC extras for RxJava 1 and 2 (AsyncHttpClient#1398)
Override default implementation and forward it to delegates, add tests.
1 parent f4786f3 commit a715cdb

File tree

4 files changed

+20
-0
lines changed

4 files changed

+20
-0
lines changed

extras/rxjava/src/main/java/org/asynchttpclient/extras/rxjava/single/AbstractSingleSubscriberBridge.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,11 @@ public State onHeadersReceived(HttpHeaders headers) throws Exception {
5656
return subscriber.isUnsubscribed() ? abort() : delegate().onHeadersReceived(headers);
5757
}
5858

59+
@Override
60+
public State onTrailingHeadersReceived(HttpHeaders headers) throws Exception {
61+
return subscriber.isUnsubscribed() ? abort() : delegate().onTrailingHeadersReceived(headers);
62+
}
63+
5964
@Override
6065
public Void onCompleted() {
6166
if (delegateTerminated.getAndSet(true)) {

extras/rxjava/src/test/java/org/asynchttpclient/extras/rxjava/single/AsyncHttpSingleTest.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,9 @@ public void testSuccessfulCompletion() throws Exception {
8282
bridge.onBodyPartReceived(null);
8383
verify(handler).onBodyPartReceived(null);
8484

85+
bridge.onTrailingHeadersReceived(null);
86+
verify(handler).onTrailingHeadersReceived(null);
87+
8588
bridge.onCompleted();
8689
verify(handler).onCompleted();
8790
} catch (final Throwable t) {
@@ -135,6 +138,9 @@ public void testSuccessfulCompletionWithProgress() throws Exception {
135138
progressBridge.onBodyPartReceived(null);
136139
inOrder.verify(handler).onBodyPartReceived(null);
137140

141+
bridge.onTrailingHeadersReceived(null);
142+
verify(handler).onTrailingHeadersReceived(null);
143+
138144
progressBridge.onCompleted();
139145
inOrder.verify(handler).onCompleted();
140146
} catch (final Throwable t) {

extras/rxjava2/src/main/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridge.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,11 @@ public final State onHeadersReceived(HttpHeaders headers) throws Exception {
8181
return emitter.isDisposed() ? disposed() : delegate().onHeadersReceived(headers);
8282
}
8383

84+
@Override
85+
public State onTrailingHeadersReceived(HttpHeaders headers) throws Exception {
86+
return emitter.isDisposed() ? disposed() : delegate().onTrailingHeadersReceived(headers);
87+
}
88+
8489
/**
8590
* {@inheritDoc}
8691
*

extras/rxjava2/src/test/java/org/asynchttpclient/extras/rxjava2/maybe/AbstractMaybeAsyncHandlerBridgeTest.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ public void forwardsEvents() throws Exception {
9292
/* when */ underTest.onBodyPartReceived(bodyPart);
9393
then(delegate).should(times(2)).onBodyPartReceived(bodyPart);
9494

95+
/* when */ underTest.onTrailingHeadersReceived(headers);
96+
then(delegate).should().onTrailingHeadersReceived(headers);
97+
9598
/* when */ underTest.onCompleted();
9699
then(delegate).should().onCompleted();
97100
then(emitter).should().onSuccess(this);
@@ -199,6 +202,7 @@ public Object[][] httpEvents() {
199202
{ named("onStatusReceived", () -> underTest.onStatusReceived(status)) }, //
200203
{ named("onHeadersReceived", () -> underTest.onHeadersReceived(headers)) }, //
201204
{ named("onBodyPartReceived", () -> underTest.onBodyPartReceived(bodyPart)) }, //
205+
{ named("onTrailingHeadersReceived", () -> underTest.onTrailingHeadersReceived(headers)) }, //
202206
};
203207
}
204208

0 commit comments

Comments
 (0)