Skip to content

Commit 7f02e43

Browse files
committed
Merged pull request cpp-netlib#38 from vusak/async_server_run_stop_concurrency_0.9-devel.
Fix and tests for concurrent async_server::run/stop calls. Fixes cpp-netlib#36
2 parents 8f8eb7a + 4a1e7c5 commit 7f02e43

File tree

4 files changed

+184
-21
lines changed

4 files changed

+184
-21
lines changed

boost/network/protocol/http/server/async_server.hpp

+42-21
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ namespace boost { namespace network { namespace http {
2323
typedef typename boost::network::http::response_header<Tag>::type response_header;
2424
typedef async_connection<Tag,Handler> connection;
2525
typedef shared_ptr<connection> connection_ptr;
26+
typedef boost::unique_lock<boost::mutex> scoped_mutex_lock;
2627

2728
template <class ArgPack>
2829
async_server_base(ArgPack const & args)
@@ -44,6 +45,7 @@ namespace boost { namespace network { namespace http {
4445
, stopping(false)
4546
, new_connection()
4647
, listening_mutex_()
48+
, stopping_mutex_()
4749
, listening(false)
4850
{}
4951

@@ -55,15 +57,21 @@ namespace boost { namespace network { namespace http {
5557
void stop() {
5658
// stop accepting new requests and let all the existing
5759
// handlers finish.
58-
stopping = true;
59-
system::error_code ignored;
60-
acceptor.close(ignored);
60+
scoped_mutex_lock listening_lock(listening_mutex_);
61+
if (listening) { // we dont bother stopping if we arent currently listening
62+
scoped_mutex_lock stopping_lock(stopping_mutex_);
63+
stopping = true;
64+
system::error_code ignored;
65+
acceptor.close(ignored);
66+
listening = false;
67+
service_.post(boost::bind(&async_server_base::handle_stop, this));
68+
}
6169
}
6270

6371
void listen() {
64-
boost::unique_lock<boost::mutex> listening_lock(listening_mutex_);
72+
scoped_mutex_lock listening_lock(listening_mutex_);
6573
BOOST_NETWORK_MESSAGE("Listening on " << address_ << ':' << port_);
66-
if (!listening) start_listening();
74+
if (!listening) start_listening(); // we only initialize our acceptor/sockets if we arent already listening
6775
if (!listening) {
6876
BOOST_NETWORK_MESSAGE("Error listening on " << address_ << ':' << port_);
6977
boost::throw_exception(std::runtime_error("Error listening on provided port."));
@@ -78,28 +86,36 @@ namespace boost { namespace network { namespace http {
7886
bool stopping;
7987
connection_ptr new_connection;
8088
boost::mutex listening_mutex_;
89+
boost::mutex stopping_mutex_;
8190
bool listening;
91+
92+
void handle_stop() {
93+
scoped_mutex_lock stopping_lock(stopping_mutex_);
94+
if (stopping) service_.stop(); // a user may have started listening again before the stop command is reached
95+
}
8296

8397
void handle_accept(boost::system::error_code const & ec) {
98+
{
99+
scoped_mutex_lock stopping_lock(stopping_mutex_);
100+
if (stopping) return; // we dont want to add another handler instance, and we dont want to know about errors for a socket we dont need anymore
101+
}
84102
if (!ec) {
85103
socket_options_base::socket_options(new_connection->socket());
86104
new_connection->start();
87-
if (!stopping) {
88-
new_connection.reset(
89-
new connection(
90-
service_
91-
, handler
92-
, thread_pool
93-
)
94-
);
95-
acceptor.async_accept(new_connection->socket(),
96-
boost::bind(
97-
&async_server_base<Tag,Handler>::handle_accept
98-
, this
99-
, boost::asio::placeholders::error
100-
)
101-
);
102-
}
105+
new_connection.reset(
106+
new connection(
107+
service_
108+
, handler
109+
, thread_pool
110+
)
111+
);
112+
acceptor.async_accept(new_connection->socket(),
113+
boost::bind(
114+
&async_server_base<Tag,Handler>::handle_accept
115+
, this
116+
, boost::asio::placeholders::error
117+
)
118+
);
103119
} else {
104120
BOOST_NETWORK_MESSAGE("Error accepting connection, reason: " << ec);
105121
}
@@ -109,6 +125,9 @@ namespace boost { namespace network { namespace http {
109125
using boost::asio::ip::tcp;
110126

111127
system::error_code error;
128+
129+
service_.reset(); // this allows repeated cycles of run -> stop -> run
130+
112131
tcp::resolver resolver(service_);
113132
tcp::resolver::query query(address_, port_);
114133
tcp::resolver::iterator endpoint_iterator = resolver.resolve(query, error);
@@ -140,6 +159,8 @@ namespace boost { namespace network { namespace http {
140159
, this
141160
, boost::asio::placeholders::error));
142161
listening = true;
162+
scoped_mutex_lock stopping_lock(stopping_mutex_);
163+
stopping = false; // if we were in the process of stopping, we revoke that command and continue listening
143164
BOOST_NETWORK_MESSAGE("Now listening on socket: '" << address_ << ":" << port_ << "'");
144165
}
145166
};

libs/network/test/http/CMakeLists.txt

+1
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ if (Boost_FOUND)
7070
server_hello_world
7171
server_async
7272
server_async_less_copy
73+
server_async_run_stop_concurrency
7374
)
7475
set ( PORT 8000 )
7576
foreach ( test ${SERVER_TESTS} )

libs/network/test/http/Jamfile.v2

+2
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,12 @@ run url_test.cpp /cpp-netlib//cppnetlib-uri-parsers ;
3434
exe http_async_server : server_async.cpp /cpp-netlib//cppnetlib-server-parsers ;
3535
exe hello_world : server_hello_world.cpp /cpp-netlib//cppnetlib-server-parsers ;
3636
exe http_async_less_copy_server : server_async_less_copy.cpp /cpp-netlib//cppnetlib-server-parsers ;
37+
exe http_async_run_stop_concurrency_server : server_async_run_stop_concurrency.cpp /cpp-netlib//cppnetlib-server-parsers ;
3738

3839
make httplib_acceptance.passed : ../httplib_acceptance.py hello_world : @httplib_acceptance ;
3940
make httplib_async_acceptance.passed : ../httplib_acceptance.py http_async_server : @httplib_acceptance ;
4041
make httplib_async_less_copy_acceptance.passed : ../httplib_acceptance.py http_async_less_copy_server : @httplib_acceptance ;
42+
make httplib_async_run_stop_concurrency_acceptance.passed : ../httplib_acceptance.py http_async_run_stop_concurrency_server : @httplib_acceptance ;
4143

4244
actions httplib_acceptance {
4345
export TEST_SCRIPT=`echo "$(>)" | awk '{print $1}'`
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
#define BOOST_TEST_MODULE HTTP Asynchronous Server Tests
2+
3+
#include <boost/network/include/http/server.hpp>
4+
#include <boost/test/unit_test.hpp>
5+
#include <boost/thread.hpp>
6+
7+
namespace http = boost::network::http;
8+
namespace util = boost::network::utils;
9+
10+
struct dummy_async_handler;
11+
typedef http::async_server<dummy_async_handler> async_server;
12+
13+
struct dummy_async_handler {
14+
void operator()(async_server::request const & req,
15+
async_server::connection_ptr conn) {
16+
// Really, this is just for testing purposes
17+
}
18+
};
19+
20+
// In this batch of tests we ensure that calling run and stop on an async_server, in any sequence, is thread safe.
21+
22+
int main(int argc, char * argv[]) {
23+
dummy_async_handler async_handler;
24+
25+
#define ASYNC_SERVER_TEST_CONFIG \
26+
http::_address = "127.0.0.1", \
27+
http::_port = "80", \
28+
http::_handler = async_handler, \
29+
http::_thread_pool = pool, \
30+
http::_reuse_address = true
31+
32+
#define ASYNC_SERVER_SLEEP_TIME \
33+
boost::posix_time::milliseconds(100)
34+
35+
// stop from main thread
36+
{
37+
BOOST_NETWORK_MESSAGE("TEST: stop without running");
38+
util::thread_pool pool;
39+
async_server server_instance(ASYNC_SERVER_TEST_CONFIG);
40+
server_instance.stop();
41+
}
42+
43+
// run-stop from main thread
44+
{
45+
BOOST_NETWORK_MESSAGE("TEST: stop from main thread");
46+
util::thread_pool pool;
47+
async_server server_instance(ASYNC_SERVER_TEST_CONFIG);
48+
49+
boost::thread running_thread(boost::bind(&async_server::run, &server_instance));
50+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
51+
52+
server_instance.stop();
53+
running_thread.join();
54+
}
55+
56+
// run-stop from another thread
57+
{
58+
BOOST_NETWORK_MESSAGE("TEST: stop from another thread");
59+
util::thread_pool pool;
60+
async_server server_instance(ASYNC_SERVER_TEST_CONFIG);
61+
62+
boost::thread running_thread(boost::bind(&async_server::run, &server_instance));
63+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
64+
65+
boost::thread stopping_thread(boost::bind(&async_server::stop, &server_instance));
66+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
67+
68+
stopping_thread.join();
69+
running_thread.join();
70+
}
71+
72+
// run-stop-run-stop from another thread
73+
{
74+
BOOST_NETWORK_MESSAGE("TEST: run-stop-run-stop from another thread");
75+
util::thread_pool pool;
76+
async_server server_instance(ASYNC_SERVER_TEST_CONFIG);
77+
78+
boost::thread running_thread(boost::bind(&async_server::run, &server_instance));
79+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
80+
81+
boost::thread stopping_thread(boost::bind(&async_server::stop, &server_instance));
82+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
83+
84+
boost::thread second_running_thread(boost::bind(&async_server::run, &server_instance));
85+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
86+
87+
boost::thread second_stopping_thread(boost::bind(&async_server::stop, &server_instance));
88+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
89+
90+
stopping_thread.join();
91+
running_thread.join();
92+
second_stopping_thread.join();
93+
second_running_thread.join();
94+
}
95+
96+
// run-run-stop from another thread
97+
{
98+
BOOST_NETWORK_MESSAGE("TEST: run-run-stop from another thread");
99+
util::thread_pool pool;
100+
async_server server_instance(ASYNC_SERVER_TEST_CONFIG);
101+
102+
boost::thread running_thread(boost::bind(&async_server::run, &server_instance));
103+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
104+
105+
boost::thread second_running_thread(boost::bind(&async_server::run, &server_instance));
106+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
107+
108+
boost::thread stopping_thread(boost::bind(&async_server::stop, &server_instance));
109+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
110+
111+
stopping_thread.join();
112+
running_thread.join();
113+
second_running_thread.join();
114+
}
115+
116+
// run-stop-stop from another thread
117+
{
118+
BOOST_NETWORK_MESSAGE("TEST: run-stop-stop from another thread");
119+
util::thread_pool pool;
120+
async_server server_instance(ASYNC_SERVER_TEST_CONFIG);
121+
122+
boost::thread running_thread(boost::bind(&async_server::run, &server_instance));
123+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
124+
125+
boost::thread stopping_thread(boost::bind(&async_server::stop, &server_instance));
126+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
127+
128+
boost::thread second_stopping_thread(boost::bind(&async_server::stop, &server_instance));
129+
boost::this_thread::sleep(ASYNC_SERVER_SLEEP_TIME);
130+
131+
stopping_thread.join();
132+
second_stopping_thread.join();
133+
running_thread.join();
134+
}
135+
136+
#undef ASYNC_SERVER_TEST_CONFIG
137+
138+
return 0;
139+
}

0 commit comments

Comments
 (0)