97
97
import java .net .InetSocketAddress ;
98
98
import java .net .URI ;
99
99
import java .net .URLEncoder ;
100
- import java .nio .ByteBuffer ;
101
100
import java .security .NoSuchAlgorithmException ;
102
101
import java .util .Collection ;
103
102
import java .util .HashMap ;
@@ -460,11 +459,13 @@ static int getPort(final URI uri, final int p) {
460
459
461
460
462
461
@ SuppressWarnings ({"unchecked" })
463
- void sendRequest (final FilterChainContext ctx ,
462
+ boolean sendRequest (final FilterChainContext ctx ,
464
463
final Request request ,
465
464
final HttpRequestPacket requestPacket )
466
465
throws IOException {
467
466
467
+ boolean isWriteComplete = true ;
468
+
468
469
if (requestHasEntityBody (request )) {
469
470
final HttpTransactionContext context = getHttpTransactionContext (ctx .getConnection ());
470
471
BodyHandler handler = bodyHandlerFactory .getBodyHandler (request );
@@ -473,13 +474,15 @@ void sendRequest(final FilterChainContext ctx,
473
474
handler = new ExpectHandler (handler );
474
475
}
475
476
context .bodyHandler = handler ;
476
- handler .doHandle (ctx , request , requestPacket );
477
+ isWriteComplete = handler .doHandle (ctx , request , requestPacket );
477
478
} else {
478
479
ctx .write (requestPacket , ctx .getTransportContext ().getCompletionHandler ());
479
480
}
480
481
if (LOGGER .isDebugEnabled ()) {
481
482
LOGGER .debug ("REQUEST: " + requestPacket .toString ());
482
483
}
484
+
485
+ return isWriteComplete ;
483
486
}
484
487
485
488
@@ -686,7 +689,9 @@ public NextAction handleWrite(final FilterChainContext ctx)
686
689
Object message = ctx .getMessage ();
687
690
if (message instanceof Request ) {
688
691
ctx .setMessage (null );
689
- sendAsGrizzlyRequest ((Request ) message , ctx );
692
+ if (!sendAsGrizzlyRequest ((Request ) message , ctx )) {
693
+ return ctx .getSuspendAction ();
694
+ }
690
695
}
691
696
692
697
return ctx .getStopAction ();
@@ -713,7 +718,7 @@ public NextAction handleEvent(final FilterChainContext ctx,
713
718
714
719
715
720
716
- private void sendAsGrizzlyRequest (final Request request ,
721
+ private boolean sendAsGrizzlyRequest (final Request request ,
717
722
final FilterChainContext ctx )
718
723
throws IOException {
719
724
@@ -781,7 +786,7 @@ private void sendAsGrizzlyRequest(final Request request,
781
786
new FluentCaseInsensitiveStringsMap (request .getHeaders ());
782
787
TransferCompletionHandler .class .cast (h ).transferAdapter (new GrizzlyTransferAdapter (map ));
783
788
}
784
- sendRequest (ctx , request , requestPacket );
789
+ return sendRequest (ctx , request , requestPacket );
785
790
786
791
}
787
792
@@ -1513,7 +1518,7 @@ private static interface BodyHandler {
1513
1518
1514
1519
boolean handlesBodyType (final Request request );
1515
1520
1516
- void doHandle (final FilterChainContext ctx ,
1521
+ boolean doHandle (final FilterChainContext ctx ,
1517
1522
final Request request ,
1518
1523
final HttpRequestPacket requestPacket ) throws IOException ;
1519
1524
@@ -1569,10 +1574,11 @@ public boolean handlesBodyType(Request request) {
1569
1574
}
1570
1575
1571
1576
@ SuppressWarnings ({"unchecked" })
1572
- public void doHandle (FilterChainContext ctx , Request request , HttpRequestPacket requestPacket ) throws IOException {
1577
+ public boolean doHandle (FilterChainContext ctx , Request request , HttpRequestPacket requestPacket ) throws IOException {
1573
1578
this .request = request ;
1574
1579
this .requestPacket = requestPacket ;
1575
1580
ctx .write (requestPacket , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1581
+ return true ;
1576
1582
}
1577
1583
1578
1584
public void finish (final FilterChainContext ctx ) throws IOException {
@@ -1592,7 +1598,7 @@ public boolean handlesBodyType(final Request request) {
1592
1598
}
1593
1599
1594
1600
@ SuppressWarnings ({"unchecked" })
1595
- public void doHandle (final FilterChainContext ctx ,
1601
+ public boolean doHandle (final FilterChainContext ctx ,
1596
1602
final Request request ,
1597
1603
final HttpRequestPacket requestPacket )
1598
1604
throws IOException {
@@ -1612,6 +1618,7 @@ public void doHandle(final FilterChainContext ctx,
1612
1618
final HttpContent content = requestPacket .httpContentBuilder ().content (gBuffer ).build ();
1613
1619
content .setLast (true );
1614
1620
ctx .write (content , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1621
+ return true ;
1615
1622
}
1616
1623
}
1617
1624
@@ -1627,7 +1634,7 @@ public boolean handlesBodyType(final Request request) {
1627
1634
}
1628
1635
1629
1636
@ SuppressWarnings ({"unchecked" })
1630
- public void doHandle (final FilterChainContext ctx ,
1637
+ public boolean doHandle (final FilterChainContext ctx ,
1631
1638
final Request request ,
1632
1639
final HttpRequestPacket requestPacket )
1633
1640
throws IOException {
@@ -1647,6 +1654,7 @@ public void doHandle(final FilterChainContext ctx,
1647
1654
final HttpContent content = requestPacket .httpContentBuilder ().content (gBuffer ).build ();
1648
1655
content .setLast (true );
1649
1656
ctx .write (content , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1657
+ return true ;
1650
1658
}
1651
1659
1652
1660
} // END StringBodyHandler
@@ -1663,14 +1671,15 @@ public boolean handlesBodyType(final Request request) {
1663
1671
}
1664
1672
1665
1673
@ SuppressWarnings ({"unchecked" })
1666
- public void doHandle (final FilterChainContext ctx ,
1674
+ public boolean doHandle (final FilterChainContext ctx ,
1667
1675
final Request request ,
1668
1676
final HttpRequestPacket requestPacket )
1669
1677
throws IOException {
1670
1678
1671
1679
final HttpContent content = requestPacket .httpContentBuilder ().content (Buffers .EMPTY_BUFFER ).build ();
1672
1680
content .setLast (true );
1673
1681
ctx .write (content , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1682
+ return true ;
1674
1683
}
1675
1684
1676
1685
} // END NoBodyHandler
@@ -1688,7 +1697,7 @@ public boolean handlesBodyType(final Request request) {
1688
1697
}
1689
1698
1690
1699
@ SuppressWarnings ({"unchecked" })
1691
- public void doHandle (final FilterChainContext ctx ,
1700
+ public boolean doHandle (final FilterChainContext ctx ,
1692
1701
final Request request ,
1693
1702
final HttpRequestPacket requestPacket )
1694
1703
throws IOException {
@@ -1733,6 +1742,7 @@ public void doHandle(final FilterChainContext ctx,
1733
1742
content .setLast (true );
1734
1743
ctx .write (content , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1735
1744
}
1745
+ return true ;
1736
1746
}
1737
1747
1738
1748
} // END ParamsBodyHandler
@@ -1748,7 +1758,7 @@ public boolean handlesBodyType(final Request request) {
1748
1758
}
1749
1759
1750
1760
@ SuppressWarnings ({"unchecked" })
1751
- public void doHandle (final FilterChainContext ctx ,
1761
+ public boolean doHandle (final FilterChainContext ctx ,
1752
1762
final Request request ,
1753
1763
final HttpRequestPacket requestPacket )
1754
1764
throws IOException {
@@ -1766,6 +1776,7 @@ public void doHandle(final FilterChainContext ctx,
1766
1776
ctx .write (content , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1767
1777
}
1768
1778
1779
+ return true ;
1769
1780
}
1770
1781
1771
1782
} // END EntityWriterBodyHandler
@@ -1781,7 +1792,7 @@ public boolean handlesBodyType(final Request request) {
1781
1792
}
1782
1793
1783
1794
@ SuppressWarnings ({"unchecked" })
1784
- public void doHandle (final FilterChainContext ctx ,
1795
+ public boolean doHandle (final FilterChainContext ctx ,
1785
1796
final Request request ,
1786
1797
final HttpRequestPacket requestPacket )
1787
1798
throws IOException {
@@ -1815,6 +1826,8 @@ public void doHandle(final FilterChainContext ctx,
1815
1826
content .setLast (true );
1816
1827
ctx .write (content , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1817
1828
}
1829
+
1830
+ return true ;
1818
1831
}
1819
1832
1820
1833
} // END StreamDataBodyHandler
@@ -1831,7 +1844,7 @@ public boolean handlesBodyType(final Request request) {
1831
1844
}
1832
1845
1833
1846
@ SuppressWarnings ({"unchecked" })
1834
- public void doHandle (final FilterChainContext ctx ,
1847
+ public boolean doHandle (final FilterChainContext ctx ,
1835
1848
final Request request ,
1836
1849
final HttpRequestPacket requestPacket )
1837
1850
throws IOException {
@@ -1854,6 +1867,7 @@ public void doHandle(final FilterChainContext ctx,
1854
1867
ctx .write (content , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1855
1868
}
1856
1869
1870
+ return true ;
1857
1871
}
1858
1872
1859
1873
} // END PartsBodyHandler
@@ -1869,7 +1883,7 @@ public boolean handlesBodyType(final Request request) {
1869
1883
}
1870
1884
1871
1885
@ SuppressWarnings ({"unchecked" })
1872
- public void doHandle (final FilterChainContext ctx ,
1886
+ public boolean doHandle (final FilterChainContext ctx ,
1873
1887
final Request request ,
1874
1888
final HttpRequestPacket requestPacket )
1875
1889
throws IOException {
@@ -1897,6 +1911,8 @@ public void doHandle(final FilterChainContext ctx,
1897
1911
last (last ).build ();
1898
1912
ctx .write (content , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1899
1913
}
1914
+
1915
+ return true ;
1900
1916
}
1901
1917
1902
1918
} // END FileBodyHandler
@@ -1912,7 +1928,7 @@ public boolean handlesBodyType(final Request request) {
1912
1928
}
1913
1929
1914
1930
@ SuppressWarnings ({"unchecked" })
1915
- public void doHandle (final FilterChainContext ctx ,
1931
+ public boolean doHandle (final FilterChainContext ctx ,
1916
1932
final Request request ,
1917
1933
final HttpRequestPacket requestPacket )
1918
1934
throws IOException {
@@ -1926,21 +1942,37 @@ public void doHandle(final FilterChainContext ctx,
1926
1942
requestPacket .setChunked (true );
1927
1943
}
1928
1944
1945
+ final MemoryManager mm = ctx .getMemoryManager ();
1929
1946
boolean last = false ;
1930
- for (ByteBuffer buffer = ByteBuffer .allocate (MAX_CHUNK_SIZE ); !last ; ) {
1931
- buffer .clear ();
1932
- if (bodyLocal .read (buffer ) < 0 ) {
1933
- last = true ;
1934
- buffer = ByteBuffer .allocate (0 );
1947
+
1948
+ while (!last ) {
1949
+ Buffer buffer = mm .allocate (MAX_CHUNK_SIZE );
1950
+ buffer .allowBufferDispose (true );
1951
+
1952
+ final long readBytes = bodyLocal .read (buffer .toByteBuffer ());
1953
+ if (readBytes > 0 ) {
1954
+ buffer .position ((int ) readBytes );
1955
+ buffer .trim ();
1956
+ } else {
1957
+ buffer .dispose ();
1958
+
1959
+ if (readBytes < 0 ) {
1960
+ last = true ;
1961
+ buffer = Buffers .EMPTY_BUFFER ;
1962
+ } else {
1963
+ // @TODO pass the context to bodyLocal to be able to
1964
+ // continue body transferring once more data is available
1965
+ return false ;
1966
+ }
1935
1967
}
1936
- final MemoryManager mm = ctx .getMemoryManager ();
1937
- buffer .flip ();
1938
- Buffer b = Buffers .wrap (mm , buffer );
1968
+
1939
1969
final HttpContent content =
1940
- requestPacket .httpContentBuilder ().content (b ).
1970
+ requestPacket .httpContentBuilder ().content (buffer ).
1941
1971
last (last ).build ();
1942
1972
ctx .write (content , ((!requestPacket .isCommitted ()) ? ctx .getTransportContext ().getCompletionHandler () : null ));
1943
1973
}
1974
+
1975
+ return true ;
1944
1976
}
1945
1977
1946
1978
} // END BodyGeneratorBodyHandler
0 commit comments