31
31
import java .util .WeakHashMap ;
32
32
import java .util .concurrent .ExecutorService ;
33
33
import java .util .concurrent .Executors ;
34
+ import java .util .concurrent .LinkedBlockingQueue ;
34
35
import java .util .concurrent .Semaphore ;
36
+ import java .util .concurrent .ThreadFactory ;
37
+ import java .util .concurrent .ThreadPoolExecutor ;
38
+ import java .util .concurrent .TimeUnit ;
39
+ import java .util .concurrent .atomic .AtomicInteger ;
35
40
36
41
public class AsyncServer {
37
42
public static final String LOGTAG = "NIO" ;
@@ -85,7 +90,7 @@ public static void post(Handler handler, Runnable runnable) {
85
90
catch (Throwable ex ) {
86
91
}
87
92
}
88
-
93
+
89
94
static AsyncServer mInstance = new AsyncServer ();
90
95
public static AsyncServer getDefault () {
91
96
return mInstance ;
@@ -114,7 +119,7 @@ private void handleSocket(final AsyncNetworkSocket handler) throws ClosedChannel
114
119
ckey .attach (handler );
115
120
handler .setup (this , ckey );
116
121
}
117
-
122
+
118
123
public void removeAllCallbacks (Object scheduled ) {
119
124
synchronized (this ) {
120
125
mQueue .remove (scheduled );
@@ -134,7 +139,7 @@ public void run() {
134
139
}
135
140
});
136
141
}
137
-
142
+
138
143
public Object postDelayed (Runnable runnable , long delay ) {
139
144
Scheduled s ;
140
145
synchronized (this ) {
@@ -161,11 +166,11 @@ public Object postDelayed(Runnable runnable, long delay) {
161
166
}
162
167
return s ;
163
168
}
164
-
169
+
165
170
public Object post (Runnable runnable ) {
166
171
return postDelayed (runnable , 0 );
167
172
}
168
-
173
+
169
174
public Object post (final CompletedCallback callback , final Exception e ) {
170
175
return post (new Runnable () {
171
176
@ Override
@@ -174,7 +179,7 @@ public void run() {
174
179
}
175
180
});
176
181
}
177
-
182
+
178
183
public void run (final Runnable runnable ) {
179
184
if (Thread .currentThread () == mAffinity ) {
180
185
post (runnable );
@@ -263,7 +268,7 @@ public void run() {
263
268
catch (Exception e ) {
264
269
}
265
270
}
266
-
271
+
267
272
protected void onDataReceived (int transmitted ) {
268
273
}
269
274
@@ -335,7 +340,7 @@ protected void cancelCleanup() {
335
340
SocketChannel socket ;
336
341
ConnectCallback callback ;
337
342
}
338
-
343
+
339
344
private ConnectFuture connectResolvedInetSocketAddress (final InetSocketAddress address , final ConnectCallback callback ) {
340
345
final ConnectFuture cancel = new ConnectFuture ();
341
346
assert !address .isUnresolved ();
@@ -396,7 +401,14 @@ public Cancellable connectSocket(final String host, final int port, final Connec
396
401
return connectSocket (InetSocketAddress .createUnresolved (host , port ), callback );
397
402
}
398
403
399
- private static ExecutorService synchronousWorkers = Executors .newFixedThreadPool (4 );
404
+ private static ExecutorService newSynchronousWorkers () {
405
+ ThreadFactory tf = new NamedThreadFactory ("AsyncServer-worker-" );
406
+ ThreadPoolExecutor tpe = new ThreadPoolExecutor (4 , 4 , 0L ,
407
+ TimeUnit .MILLISECONDS , new LinkedBlockingQueue <Runnable >(), tf );
408
+ return tpe ;
409
+ }
410
+
411
+ private static ExecutorService synchronousWorkers = newSynchronousWorkers ();
400
412
public Future <InetAddress []> getAllByName (final String host ) {
401
413
final SimpleFuture <InetAddress []> ret = new SimpleFuture <InetAddress []>();
402
414
synchronousWorkers .execute (new Runnable () {
@@ -487,7 +499,7 @@ public void run() {
487
499
});
488
500
return handler ;
489
501
}
490
-
502
+
491
503
public AsyncDatagramSocket connectDatagram (final SocketAddress remote ) throws IOException {
492
504
final DatagramChannel socket = DatagramChannel .open ();
493
505
final AsyncDatagramSocket handler = new AsyncDatagramSocket ();
@@ -509,7 +521,7 @@ public void run() {
509
521
});
510
522
return handler ;
511
523
}
512
-
524
+
513
525
final static WeakHashMap <Thread , AsyncServer > mServers = new WeakHashMap <Thread , AsyncServer >();
514
526
515
527
private boolean addMe () {
@@ -527,7 +539,7 @@ private boolean addMe() {
527
539
public static AsyncServer getCurrentThreadServer () {
528
540
return mServers .get (Thread .currentThread ());
529
541
}
530
-
542
+
531
543
Thread mAffinity ;
532
544
private void run (boolean newThread ) {
533
545
final SelectorWrapper selector ;
@@ -596,10 +608,10 @@ public void run() {
596
608
}
597
609
return ;
598
610
}
599
-
611
+
600
612
run (this , selector , queue );
601
613
}
602
-
614
+
603
615
private static void run (final AsyncServer server , final SelectorWrapper selector , final PriorityQueue <Scheduled > queue ) {
604
616
// Log.i(LOGTAG, "****AsyncServer is starting.****");
605
617
// at this point, this local queue and selector are owned
@@ -666,11 +678,11 @@ private static void shutdownEverything(SelectorWrapper selector) {
666
678
catch (Exception e ) {
667
679
}
668
680
}
669
-
681
+
670
682
private static final long QUEUE_EMPTY = Long .MAX_VALUE ;
671
683
private static long lockAndRunQueue (final AsyncServer server , final PriorityQueue <Scheduled > queue ) {
672
684
long wait = QUEUE_EMPTY ;
673
-
685
+
674
686
// find the first item we can actually run
675
687
while (true ) {
676
688
Scheduled run = null ;
@@ -689,10 +701,10 @@ private static long lockAndRunQueue(final AsyncServer server, final PriorityQueu
689
701
}
690
702
}
691
703
}
692
-
704
+
693
705
if (run == null )
694
706
break ;
695
-
707
+
696
708
run .runnable .run ();
697
709
}
698
710
@@ -834,11 +846,11 @@ public void run() {
834
846
}
835
847
});
836
848
}
837
-
849
+
838
850
public Thread getAffinity () {
839
851
return mAffinity ;
840
852
}
841
-
853
+
842
854
public boolean isAffinityThread () {
843
855
return mAffinity == Thread .currentThread ();
844
856
}
@@ -847,4 +859,27 @@ public boolean isAffinityThreadOrStopped() {
847
859
Thread affinity = mAffinity ;
848
860
return affinity == null || affinity == Thread .currentThread ();
849
861
}
862
+
863
+ private static class NamedThreadFactory implements ThreadFactory {
864
+ private final ThreadGroup group ;
865
+ private final AtomicInteger threadNumber = new AtomicInteger (1 );
866
+ private final String namePrefix ;
867
+
868
+ NamedThreadFactory (String namePrefix ) {
869
+ SecurityManager s = System .getSecurityManager ();
870
+ group = (s != null ) ? s .getThreadGroup () :
871
+ Thread .currentThread ().getThreadGroup ();
872
+ this .namePrefix = namePrefix ;
873
+ }
874
+
875
+ public Thread newThread (Runnable r ) {
876
+ Thread t = new Thread (group , r ,
877
+ namePrefix + threadNumber .getAndIncrement (), 0 );
878
+ if (t .isDaemon ()) t .setDaemon (false );
879
+ if (t .getPriority () != Thread .NORM_PRIORITY ) {
880
+ t .setPriority (Thread .NORM_PRIORITY );
881
+ }
882
+ return t ;
883
+ }
884
+ }
850
885
}
0 commit comments