3
3
import com .koushikdutta .async .callback .CompletedCallback ;
4
4
import com .koushikdutta .async .callback .WritableCallback ;
5
5
6
- import java .nio .ByteBuffer ;
7
-
8
6
public class BufferedDataSink implements DataSink {
9
7
DataSink mDataSink ;
10
8
public BufferedDataSink (DataSink datasink ) {
@@ -14,7 +12,7 @@ public BufferedDataSink(DataSink datasink) {
14
12
public boolean isBuffering () {
15
13
return mPendingWrites .hasRemaining () || forceBuffering ;
16
14
}
17
-
15
+
18
16
public DataSink getDataSink () {
19
17
return mDataSink ;
20
18
}
@@ -28,58 +26,50 @@ public void forceBuffering(boolean forceBuffering) {
28
26
29
27
public void setDataSink (DataSink datasink ) {
30
28
mDataSink = datasink ;
31
- mDataSink .setWriteableCallback (new WritableCallback () {
32
- @ Override
33
- public void onWriteable () {
34
- writePending ();
35
- }
36
- });
29
+ mDataSink .setWriteableCallback (this ::writePending );
37
30
}
38
31
39
32
private void writePending () {
40
33
if (forceBuffering )
41
34
return ;
42
35
43
36
// Log.i("NIO", "Writing to buffer...");
44
- if (mPendingWrites .hasRemaining ()) {
37
+ boolean empty ;
38
+ synchronized (mPendingWrites ) {
45
39
mDataSink .write (mPendingWrites );
46
- if (mPendingWrites .remaining () == 0 ) {
47
- if (endPending )
48
- mDataSink .end ();
49
- }
40
+ empty = mPendingWrites .isEmpty ();
41
+ }
42
+ if (empty ) {
43
+ if (endPending )
44
+ mDataSink .end ();
50
45
}
51
- if (! mPendingWrites . hasRemaining () && mWritable != null )
46
+ if (empty && mWritable != null )
52
47
mWritable .onWriteable ();
53
48
}
54
49
55
- ByteBufferList mPendingWrites = new ByteBufferList ();
50
+ final ByteBufferList mPendingWrites = new ByteBufferList ();
56
51
57
52
@ Override
58
53
public void write (ByteBufferList bb ) {
59
54
write (bb , false );
60
55
}
61
-
56
+
62
57
protected void write (final ByteBufferList bb , final boolean ignoreBuffer ) {
63
58
if (getServer ().getAffinity () != Thread .currentThread ()) {
64
- getServer (). run ( new Runnable ( ) {
65
- @ Override
66
- public void run () {
67
- write ( bb , ignoreBuffer );
68
- }
69
- } );
59
+ synchronized ( mPendingWrites ) {
60
+ if ( mPendingWrites . remaining () >= mMaxBuffer && ! ignoreBuffer )
61
+ return ;
62
+ bb . get ( mPendingWrites );
63
+ }
64
+ getServer (). post ( this :: writePending );
70
65
return ;
71
66
}
72
67
73
68
if (!isBuffering ())
74
69
mDataSink .write (bb );
75
70
76
- if (bb .remaining () > 0 ) {
77
- int toRead = Math .min (bb .remaining (), mMaxBuffer );
78
- if (ignoreBuffer )
79
- toRead = bb .remaining ();
80
- if (toRead > 0 ) {
81
- bb .get (mPendingWrites , toRead );
82
- }
71
+ synchronized (mPendingWrites ) {
72
+ bb .get (mPendingWrites );
83
73
}
84
74
}
85
75
@@ -102,7 +92,7 @@ public int remaining() {
102
92
public int getMaxBuffer () {
103
93
return mMaxBuffer ;
104
94
}
105
-
95
+
106
96
public void setMaxBuffer (int maxBuffer ) {
107
97
assert maxBuffer >= 0 ;
108
98
mMaxBuffer = maxBuffer ;
@@ -117,18 +107,15 @@ public boolean isOpen() {
117
107
@ Override
118
108
public void end () {
119
109
if (getServer ().getAffinity () != Thread .currentThread ()) {
120
- getServer ().run (new Runnable () {
121
- @ Override
122
- public void run () {
123
- end ();
124
- }
125
- });
110
+ getServer ().post (this ::end );
126
111
return ;
127
112
}
128
113
129
- if (mPendingWrites .hasRemaining ()) {
130
- endPending = true ;
131
- return ;
114
+ synchronized (mPendingWrites ) {
115
+ if (mPendingWrites .hasRemaining ()) {
116
+ endPending = true ;
117
+ return ;
118
+ }
132
119
}
133
120
mDataSink .end ();
134
121
}
0 commit comments