Skip to content

Commit 3f2e960

Browse files
authored
Added dispatch_mode=8/9 (swoole#4318)
* dispatch_mode=8/9 * docker * fix * fix 2 * fix 3
1 parent 0c948ee commit 3f2e960

File tree

12 files changed

+281
-8
lines changed

12 files changed

+281
-8
lines changed

ext-src/swoole_server.cc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3628,6 +3628,9 @@ static PHP_METHOD(swoole_server, getClientInfo) {
36283628
if (conn->uid > 0 || serv->dispatch_mode == SW_DISPATCH_UIDMOD) {
36293629
add_assoc_long(return_value, "uid", conn->uid);
36303630
}
3631+
if (conn->worker_id > 0 || serv->dispatch_mode == SW_DISPATCH_CO_CONN_LB) {
3632+
add_assoc_long(return_value, "worker_id", conn->worker_id);
3633+
}
36313634

36323635
ListenPort *port = serv->get_port_by_fd(conn->fd);
36333636
if (port && port->open_websocket_protocol) {

include/swoole_process_pool.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -130,6 +130,7 @@ struct Worker {
130130

131131
long dispatch_count;
132132
long request_count;
133+
size_t coroutine_num;
133134

134135
/**
135136
* worker id

include/swoole_reactor.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ class Reactor {
8686
PRIORITY_SIGNAL_CALLBACK,
8787
PRIORITY_TRY_EXIT,
8888
PRIORITY_MALLOC_TRIM,
89+
PRIORITY_WORKER_CALLBACK,
8990
};
9091

9192
enum ExitCondition {

include/swoole_server.h

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@ enum swFactory_dispatch_mode {
6161
SW_DISPATCH_UIDMOD = 5,
6262
SW_DISPATCH_USERFUNC = 6,
6363
SW_DISPATCH_STREAM = 7,
64+
SW_DISPATCH_CO_CONN_LB,
65+
SW_DISPATCH_CO_REQ_LB,
6466
};
6567

6668
enum swFactory_dispatch_result {
@@ -98,6 +100,7 @@ struct Connection {
98100
uint8_t active;
99101
enum swSocket_type socket_type;
100102
int fd;
103+
int worker_id;
101104
SessionId session_id;
102105
//--------------------------------------------------------------
103106
#ifdef SW_USE_OPENSSL
@@ -998,7 +1001,8 @@ class Server {
9981001
}
9991002

10001003
inline bool is_hash_dispatch_mode() {
1001-
return dispatch_mode == SW_DISPATCH_FDMOD || dispatch_mode == SW_DISPATCH_IPMOD;
1004+
return dispatch_mode == SW_DISPATCH_FDMOD || dispatch_mode == SW_DISPATCH_IPMOD ||
1005+
dispatch_mode == SW_DISPATCH_CO_CONN_LB;
10021006
}
10031007

10041008
inline bool is_support_send_yield() {
@@ -1042,6 +1046,19 @@ class Server {
10421046
return nullptr;
10431047
}
10441048

1049+
int get_lowest_load_worker_id() {
1050+
uint32_t lowest_load_worker_id = 0;
1051+
size_t min_coroutine = workers[0].coroutine_num;
1052+
for (uint32_t i = 1; i < worker_num; i++) {
1053+
if (workers[i].coroutine_num < min_coroutine) {
1054+
min_coroutine = workers[i].coroutine_num;
1055+
lowest_load_worker_id = i;
1056+
continue;
1057+
}
1058+
}
1059+
return lowest_load_worker_id;
1060+
}
1061+
10451062
void stop_async_worker(Worker *worker);
10461063

10471064
inline Pipe *get_pipe_object(int pipe_fd) {

src/server/master.cc

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -964,6 +964,17 @@ int Server::schedule_worker(int fd, SendData *data) {
964964
} else {
965965
key = conn->uid;
966966
}
967+
} else if (dispatch_mode == SW_DISPATCH_CO_CONN_LB) {
968+
Connection *conn = get_connection(fd);
969+
if (conn == nullptr) {
970+
return key % worker_num;
971+
}
972+
if (conn->worker_id < 0) {
973+
conn->worker_id = get_lowest_load_worker_id();
974+
}
975+
return conn->worker_id;
976+
} else if (dispatch_mode == SW_DISPATCH_CO_REQ_LB) {
977+
return get_lowest_load_worker_id();
967978
}
968979
// Preemptive distribution
969980
else {
@@ -982,6 +993,7 @@ int Server::schedule_worker(int fd, SendData *data) {
982993
swTraceLog(SW_TRACE_SERVER, "schedule=%d, round=%d", key, worker_round_id);
983994
return key;
984995
}
996+
985997
return key % worker_num;
986998
}
987999

@@ -1659,6 +1671,7 @@ Connection *Server::add_connection(ListenPort *ls, Socket *_socket, int server_f
16591671
connection->server_fd = (sw_atomic_t) server_fd;
16601672
connection->last_recv_time = connection->connect_time = microtime();
16611673
connection->active = 1;
1674+
connection->worker_id = -1;
16621675
connection->socket_type = ls->type;
16631676
connection->socket = _socket;
16641677

src/server/worker.cc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "swoole_memory.h"
2424
#include "swoole_msg_queue.h"
2525
#include "swoole_client.h"
26+
#include "swoole_coroutine.h"
2627

2728
swoole::WorkerGlobal SwooleWG = {};
2829

@@ -567,6 +568,9 @@ int Server::start_event_worker(Worker *worker) {
567568
stream_protocol.package_max_length = UINT_MAX;
568569
stream_protocol.onPackage = Worker_onStreamPackage;
569570
buffer_pool = new std::queue<String *>;
571+
} else if (dispatch_mode == SW_DISPATCH_CO_CONN_LB || dispatch_mode == SW_DISPATCH_CO_REQ_LB) {
572+
reactor->set_end_callback(Reactor::PRIORITY_WORKER_CALLBACK,
573+
[worker](Reactor *) { worker->coroutine_num = Coroutine::count(); });
570574
}
571575

572576
worker->status = SW_WORKER_IDLE;

tests/include/functions.php

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -775,3 +775,27 @@ function curl_type_assert($ch, $resource_type, $class_type)
775775
Assert::eq(get_resource_type($ch), $resource_type);
776776
}
777777
}
778+
779+
function swoole_get_variance($avg, $array, $is_swatch = false)
780+
{
781+
$count = count($array);
782+
if ($count == 1 && $is_swatch == true) {
783+
return false;
784+
} elseif ($count > 0) {
785+
$total_var = 0;
786+
foreach ($array as $lv) {
787+
$total_var += pow(($lv - $avg), 2);
788+
}
789+
if ($count == 1 && $is_swatch == true) {
790+
return false;
791+
}
792+
return $is_swatch ? sqrt($total_var / (count($array) - 1)) : sqrt($total_var / count($array));
793+
} else {
794+
return false;
795+
}
796+
}
797+
798+
function swoole_get_average($array)
799+
{
800+
return array_sum($array) / count($array);
801+
}

tests/swoole_server/dispatch_mode_7.phpt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ $pm->parentFunc = function ($pid) use ($pm) {
4444
};
4545

4646
$pm->childFunc = function () use ($pm) {
47-
$serv = new Server('127.0.0.1', $pm->getFreePort(), SWOOLE_BASE);
47+
$serv = new Server('127.0.0.1', $pm->getFreePort(), SWOOLE_PROCESS);
4848
$serv->set(array(
4949
'package_eof' => "\r\n\r\n",
5050
'open_eof_check' => true,
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
--TEST--
2+
swoole_server: dispatch_mode = 8 [co conn lb]
3+
--SKIPIF--
4+
<?php require __DIR__ . '/../include/skipif.inc'; ?>
5+
--FILE--
6+
<?php
7+
require __DIR__ . '/../include/bootstrap.php';
8+
9+
use Swoole\Coroutine\Client;
10+
use Swoole\Coroutine\System;
11+
use Swoole\Server;
12+
use Swoole\Table;
13+
14+
use function Swoole\Coroutine\run;
15+
use function Swoole\Coroutine\go;
16+
17+
$table = new Table(64);
18+
$table->column('count', Table::TYPE_INT);
19+
$table->create();
20+
21+
$pm = new SwooleTest\ProcessManager;
22+
$pm->magic_code = rand(10000000, 90000000);
23+
$pm->parentFunc = function ($pid) use ($pm, $table) {
24+
run(function () use ($pm, $table) {
25+
$n = 200;
26+
while ($n--) {
27+
go(function () use ($pm, $table) {
28+
$client = new Client(SWOOLE_SOCK_TCP);
29+
if (!$client->connect('127.0.0.1', $pm->getFreePort(), 0.5, 0)) {
30+
echo "Over flow. errno=" . $client->errCode;
31+
die("\n");
32+
}
33+
34+
$data = array(
35+
'name' => __FILE__,
36+
'sid' => $pm->magic_code,
37+
'content' => str_repeat('A', 8192 * rand(1, 3)),
38+
);
39+
40+
$_serialize_data = serialize($data) . "\r\n\r\n";
41+
$client->send($_serialize_data);
42+
Assert::eq($client->recv(), "SUCCESS\n");
43+
});
44+
}
45+
});
46+
47+
$pm->kill();
48+
49+
$array = array_column(iterator_to_array($table), 'count');
50+
$standard_deviation = sqrt(swoole_get_variance(swoole_get_average($array), $array));
51+
Assert::greaterThan($standard_deviation, 1);
52+
Assert::lessThan($standard_deviation, 5);
53+
echo 'DONE' . PHP_EOL;
54+
};
55+
56+
$pm->childFunc = function () use ($pm, $table) {
57+
$serv = new Server('127.0.0.1', $pm->getFreePort(), SWOOLE_PROCESS);
58+
$serv->set(array(
59+
'package_eof' => "\r\n\r\n",
60+
'open_eof_check' => true,
61+
'open_eof_split' => true,
62+
'dispatch_mode' => 8,
63+
'package_max_length' => 1024 * 1024 * 2,
64+
"worker_num" => 4,
65+
'log_file' => '/dev/null',
66+
"reload_async" => true,
67+
));
68+
$serv->on("WorkerStart", function (Server $serv, $worker_id) use ($pm) {
69+
if ($worker_id == 0) {
70+
$pm->wakeup();
71+
}
72+
});
73+
$serv->on('connect', function (Server $serv, $fd, $rid) use ($table) {
74+
$table->incr($serv->getWorkerId(), 'count');
75+
if (rand(1000, 9999) % 4 == 0) {
76+
System::sleep(0.5);
77+
}
78+
});
79+
$serv->on('receive', function (Server $serv, $fd, $rid, $data) use ($pm) {
80+
Assert::eq($serv->getClientInfo($fd)['worker_id'], $serv->getWorkerId());
81+
$_data = unserialize(rtrim($data));
82+
if ($_data and is_array($_data) and $_data['sid'] == $pm->magic_code) {
83+
$serv->send($fd, "SUCCESS\n");
84+
} else {
85+
$serv->send($fd, "ERROR\n");
86+
}
87+
});
88+
$serv->start();
89+
};
90+
91+
$pm->childFirst();
92+
$pm->run();
93+
?>
94+
--EXPECT--
95+
DONE
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
--TEST--
2+
swoole_server: dispatch_mode = 9 [co req lb]
3+
--SKIPIF--
4+
<?php require __DIR__ . '/../include/skipif.inc'; ?>
5+
--FILE--
6+
<?php
7+
require __DIR__ . '/../include/bootstrap.php';
8+
9+
use Swoole\Coroutine\Client;
10+
use Swoole\Coroutine\System;
11+
use Swoole\Server;
12+
use Swoole\Table;
13+
14+
use function Swoole\Coroutine\run;
15+
use function Swoole\Coroutine\go;
16+
17+
$table = new Table(64);
18+
$table->column('count', Table::TYPE_INT);
19+
$table->create();
20+
21+
const N = 1024;
22+
const EOF = "\r\n\r\n";
23+
24+
$pm = new SwooleTest\ProcessManager;
25+
$pm->magic_code = rand(10000000, 90000000);
26+
$pm->parentFunc = function ($pid) use ($pm, $table) {
27+
run(function () use ($pm, $table) {
28+
$client = new Client(SWOOLE_SOCK_TCP);
29+
$client->set(array(
30+
'package_eof' => EOF,
31+
'open_eof_check' => true,
32+
'open_eof_split' => true,
33+
));
34+
if (!$client->connect('127.0.0.1', $pm->getFreePort(), 0.5, 0)) {
35+
echo "Over flow. errno=" . $client->errCode;
36+
die("\n");
37+
}
38+
39+
$rand = rand(1, 4);
40+
41+
$data = array(
42+
'name' => __FILE__,
43+
'sid' => $pm->magic_code,
44+
'content' => str_repeat('A', 1024 * $rand),
45+
);
46+
47+
$_serialize_data = serialize($data) . EOF;
48+
49+
go(function () use ($client) {
50+
$n = N;
51+
while ($n--) {
52+
Assert::eq($client->recv(), "SUCCESS" . EOF);
53+
}
54+
$client->close();
55+
});
56+
57+
$n = N;
58+
while ($n--) {
59+
$client->send($_serialize_data);
60+
if ($n % 10 == 1) {
61+
System::sleep(0.002);
62+
}
63+
}
64+
});
65+
66+
$pm->kill();
67+
68+
$array = array_column(iterator_to_array($table), 'count');
69+
$standard_deviation = sqrt(swoole_get_variance(swoole_get_average($array), $array));
70+
Assert::greaterThan($standard_deviation, 1);
71+
Assert::lessThan($standard_deviation, 5);
72+
echo 'DONE' . PHP_EOL;
73+
};
74+
75+
$pm->childFunc = function () use ($pm, $table) {
76+
$serv = new Server('127.0.0.1', $pm->getFreePort(), SWOOLE_PROCESS);
77+
78+
$serv->set(array(
79+
'package_eof' => "\r\n\r\n",
80+
'open_eof_check' => true,
81+
'open_eof_split' => true,
82+
'dispatch_mode' => 9,
83+
'package_max_length' => 1024 * 1024 * 2,
84+
"worker_num" => 4,
85+
'log_file' => '/dev/null',
86+
"reload_async" => true,
87+
));
88+
89+
$serv->on("WorkerStart", function (Server $serv, $worker_id) use ($pm) {
90+
if ($worker_id == 0) {
91+
$pm->wakeup();
92+
}
93+
});
94+
95+
$serv->on('receive', function (Server $serv, $fd, $rid, $data) use ($pm, $table) {
96+
$table->incr($serv->getWorkerId(), 'count');
97+
if (rand(1000, 9999) % 10 == 0) {
98+
System::sleep(0.5);
99+
}
100+
$_data = unserialize(rtrim($data));
101+
if ($_data and is_array($_data) and $_data['sid'] == $pm->magic_code) {
102+
$serv->send($fd, "SUCCESS".EOF);
103+
} else {
104+
$serv->send($fd, "ERROR".EOF);
105+
}
106+
});
107+
108+
$serv->start();
109+
};
110+
111+
$pm->childFirst();
112+
$pm->run();
113+
?>
114+
--EXPECT--
115+
DONE

0 commit comments

Comments
 (0)