Skip to content

Commit 86b1fa1

Browse files
committed
ENH nicer task queue implementation with threads continueing to live
1 parent f219ea6 commit 86b1fa1

File tree

1 file changed

+41
-15
lines changed

1 file changed

+41
-15
lines changed

src/tests/alloc.cpp

Lines changed: 41 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,15 @@ template <int NWorkers = 0>
1515
class work_queue
1616
{
1717
public:
18+
struct task_t{
19+
boost::shared_ptr<boost::shared_lock<boost::shared_mutex> > m_lock;
20+
boost::function<void()> m_task;
21+
task_t(const boost::function<void()>& task, boost::shared_mutex& m)
22+
: m_lock(new boost::shared_lock<boost::shared_mutex>(m)), m_task(task){}
23+
void operator()(){
24+
m_task();
25+
}
26+
};
1827
work_queue()
1928
{
2029
work_ctrl_ = new boost::asio::io_service::work(io_service_);
@@ -26,6 +35,22 @@ class work_queue
2635
}
2736
}
2837

38+
/**
39+
* @warning that this will block the concurrent addition of new tasks.
40+
* In this case, you will have a deadlock: the new task won't get its shared
41+
* lock, and this function won't get its unique lock.
42+
* It would probably be wiser to have something here that tries to get a
43+
* lock for /some/ time, then instead sleeps for a few milliseconds,
44+
* enabling other tasks to get into the queue.
45+
* On the other hand, this would still waste the time when everyone is
46+
* waiting. So currently the use case is the simple one: Put all tasks in
47+
* the queue, and /then/ call block_until_tasks_done. Don't generate tasks
48+
* asynchronously.
49+
*/
50+
void block_until_tasks_done(){
51+
boost::unique_lock<boost::shared_mutex> foo(access_);
52+
}
53+
2954
virtual ~work_queue() {
3055
delete work_ctrl_;
3156

@@ -37,10 +62,11 @@ class work_queue
3762
void post(const TTask& task) {
3863
// c++11
3964
// io_service_.dispatch(std::move(task));
40-
io_service_.dispatch(task);
65+
io_service_.dispatch(task_t(task, access_));
4166
}
4267

4368
private:
69+
boost::shared_mutex access_;
4470
boost::asio::io_service io_service_;
4571
boost::thread_group threads_;
4672
boost::asio::io_service::work *work_ctrl_;
@@ -146,6 +172,7 @@ struct pool_alloc_tester{
146172
BOOST_REQUIRE_GE(allocator->pool_size(M()), pool_size);
147173
BOOST_REQUIRE_GE(allocator->pool_free_count(M()), 0lu);
148174
}
175+
std::cout << "i:" << i << " ptr:" << ptr << std::endl;
149176
*done = true;
150177
}
151178
};
@@ -165,30 +192,29 @@ static void test_pooled_allocator_multi_threaded() {
165192
pool_alloc_tester<memory_space> tester(allocator, boost_mutex, ALLOC_SIZE);
166193

167194
//tbb::parallel_for_each(pointers.begin(), pointers.end(), tester);
168-
{ work_queue<> q;
195+
{ work_queue<> Q;
169196
for (size_t i = 0; i < pointers.size(); i++) {
170-
q.post(boost::bind(&pool_alloc_tester<memory_space>::operator(),
197+
Q.post(boost::bind(&pool_alloc_tester<memory_space>::operator(),
171198
&tester, &pointers[i], i, &done[i]));
172199
}
173-
}
200+
Q.block_until_tasks_done();
174201

175-
for (size_t i = 0; i < pointers.size(); i++) {
176-
BOOST_REQUIRE(done[i]);
177-
//std::cout << "i:" << i << " pointers[i]:" << pointers[i] << std::endl;
178-
BOOST_REQUIRE(pointers[i] != NULL);
179-
}
202+
for (size_t i = 0; i < pointers.size(); i++) {
203+
BOOST_REQUIRE(done[i]);
204+
//std::cout << "i:" << i << " pointers[i]:" << pointers[i] << std::endl;
205+
BOOST_REQUIRE(pointers[i] != NULL);
206+
}
180207

181-
BOOST_CHECK_GE(allocator.pool_size(m), pointers.size() * ALLOC_SIZE);
182-
BOOST_CHECK_LE(allocator.pool_count(m), 10 * pointers.size());
208+
BOOST_CHECK_GE(allocator.pool_size(m), pointers.size() * ALLOC_SIZE);
209+
BOOST_CHECK_LE(allocator.pool_count(m), 10 * pointers.size());
183210

184-
size_t count = allocator.pool_count(m);
185-
BOOST_CHECK_GE(count, pointers.size());
211+
size_t count = allocator.pool_count(m);
212+
BOOST_CHECK_GE(count, pointers.size());
186213

187-
{ work_queue<> q;
188214
pool_destroy_tester<memory_space> tester2(allocator, boost_mutex, ALLOC_SIZE);
189215
//tbb::parallel_for_each(pointers.begin(), pointers.end(), tester2);
190216
for (size_t i = 0; i < pointers.size(); i++) {
191-
q.post(boost::bind( &pool_destroy_tester<memory_space>::operator(), &tester2, &pointers[i]));
217+
Q.post(boost::bind( &pool_destroy_tester<memory_space>::operator(), &tester2, &pointers[i]));
192218
}
193219
}
194220

0 commit comments

Comments
 (0)