Skip to content

Commit b1067e6

Browse files
authored
Register the queue cleanup callback to vm_id handle callbacks (proxy-wasm#114)
Signed-off-by: mathetake <[email protected]>
1 parent 75241a6 commit b1067e6

File tree

6 files changed

+131
-25
lines changed

6 files changed

+131
-25
lines changed

src/shared_data.cc

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,10 @@ SharedData &getGlobalSharedData() {
3030
return *ptr;
3131
};
3232

33-
SharedData::SharedData() {
34-
registerVmIdHandleCallback([this](std::string_view vm_id) { this->deleteByVmId(vm_id); });
33+
SharedData::SharedData(bool register_vm_id_callback) {
34+
if (register_vm_id_callback) {
35+
registerVmIdHandleCallback([this](std::string_view vm_id) { this->deleteByVmId(vm_id); });
36+
}
3537
}
3638

3739
void SharedData::deleteByVmId(std::string_view vm_id) {

src/shared_data.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ namespace proxy_wasm {
2222

2323
class SharedData {
2424
public:
25-
SharedData();
25+
SharedData(bool register_vm_id_callback = true);
2626
WasmResult get(std::string_view vm_id, const std::string_view key,
2727
std::pair<std::string, uint32_t> *result);
2828
WasmResult set(std::string_view vm_id, std::string_view key, std::string_view value,

src/shared_queue.cc

Lines changed: 49 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,51 @@
2121
#include <unordered_map>
2222
#include <unordered_set>
2323

24+
#include "include/proxy-wasm/vm_id_handle.h"
25+
2426
namespace proxy_wasm {
2527

2628
SharedQueue &getGlobalSharedQueue() {
2729
static auto *ptr = new SharedQueue;
2830
return *ptr;
2931
}
3032

33+
SharedQueue::SharedQueue(bool register_vm_id_callback) {
34+
if (register_vm_id_callback) {
35+
registerVmIdHandleCallback([this](std::string_view vm_id) { this->deleteByVmId(vm_id); });
36+
}
37+
}
38+
39+
void SharedQueue::deleteByVmId(std::string_view vm_id) {
40+
std::lock_guard<std::mutex> lock(mutex_);
41+
auto queue_keys = vm_queue_keys_.find(std::string(vm_id));
42+
if (queue_keys != vm_queue_keys_.end()) {
43+
for (auto queue_key : queue_keys->second) {
44+
auto token = queue_tokens_.find(queue_key);
45+
if (token != queue_tokens_.end()) {
46+
queues_.erase(token->second);
47+
queue_tokens_.erase(token);
48+
}
49+
}
50+
vm_queue_keys_.erase(queue_keys);
51+
}
52+
}
53+
54+
uint32_t SharedQueue::nextQueueToken() {
55+
// TODO(@mathetake): Should we handle the case where the queue overflows, i.e. the number of used
56+
// tokens exceeds the max of uint32? If it overflows, the following loop never exits.
57+
while (true) {
58+
uint32_t token = next_queue_token_++;
59+
if (token == 0) {
60+
continue; // 0 is an illegal token.
61+
}
62+
63+
if (queues_.find(token) == queues_.end()) {
64+
return token;
65+
}
66+
}
67+
}
68+
3169
uint32_t SharedQueue::registerQueue(std::string_view vm_id, std::string_view queue_name,
3270
uint32_t context_id, CallOnThreadFunction call_on_thread,
3371
std::string_view vm_key) {
@@ -36,8 +74,18 @@ uint32_t SharedQueue::registerQueue(std::string_view vm_id, std::string_view que
3674
auto it = queue_tokens_.insert(std::make_pair(key, static_cast<uint32_t>(0)));
3775
if (it.second) {
3876
it.first->second = nextQueueToken();
39-
queue_token_set_.insert(it.first->second);
77+
78+
auto vid = std::string(vm_id);
79+
QueueKeySet *queue_keys;
80+
auto map_it = vm_queue_keys_.find(vid);
81+
if (map_it == vm_queue_keys_.end()) {
82+
queue_keys = &vm_queue_keys_[vid];
83+
} else {
84+
queue_keys = &map_it->second;
85+
}
86+
queue_keys->insert(key);
4087
}
88+
4189
uint32_t token = it.first->second;
4290
auto &q = queues_[token];
4391
q.vm_key = std::string(vm_key);

src/shared_queue.h

Lines changed: 14 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -22,25 +22,18 @@ namespace proxy_wasm {
2222

2323
class SharedQueue {
2424
public:
25+
SharedQueue(bool register_vm_id_callback = true);
26+
2527
uint32_t registerQueue(std::string_view vm_id, std::string_view queue_name, uint32_t context_id,
2628
CallOnThreadFunction call_on_thread, std::string_view vm_key);
2729
uint32_t resolveQueue(std::string_view vm_id, std::string_view queue_name);
2830
WasmResult dequeue(uint32_t token, std::string *data);
2931
WasmResult enqueue(uint32_t token, std::string_view value);
3032

31-
private:
32-
uint32_t nextQueueToken() {
33-
while (true) {
34-
uint32_t token = next_queue_token_++;
35-
if (token == 0) {
36-
continue; // 0 is an illegal token.
37-
}
38-
if (queue_token_set_.find(token) == queue_token_set_.end()) {
39-
return token;
40-
}
41-
}
42-
}
33+
void deleteByVmId(std::string_view vm_id);
34+
uint32_t nextQueueToken();
4335

36+
private:
4437
struct Queue {
4538
std::string vm_key;
4639
uint32_t context_id;
@@ -50,15 +43,22 @@ class SharedQueue {
5043

5144
// TODO: use std::shared_mutex in C++17.
5245
std::mutex mutex_;
53-
std::map<uint32_t, Queue> queues_;
5446
uint32_t next_queue_token_ = 1;
47+
5548
struct pair_hash {
5649
template <class T1, class T2> std::size_t operator()(const std::pair<T1, T2> &pair) const {
5750
return std::hash<T1>()(pair.first) ^ std::hash<T2>()(pair.second);
5851
}
5952
};
53+
54+
using QueueKeySet = std::unordered_set<std::pair<std::string, std::string>, pair_hash>;
55+
56+
// vm_id -> queue keys
57+
std::unordered_map<std::string, QueueKeySet> vm_queue_keys_;
58+
// queue key -> token
6059
std::unordered_map<std::pair<std::string, std::string>, uint32_t, pair_hash> queue_tokens_;
61-
std::unordered_set<uint32_t> queue_token_set_;
60+
// token -> queue
61+
std::unordered_map<uint32_t, Queue> queues_;
6262
};
6363

6464
SharedQueue &getGlobalSharedQueue();

test/shared_data_test.cc

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
namespace proxy_wasm {
2424

2525
TEST(SharedData, SingleThread) {
26-
SharedData shared_data;
26+
SharedData shared_data(false);
2727
std::pair<std::string, uint32_t> result;
2828
EXPECT_EQ(WasmResult::NotFound, shared_data.get("non-exist", "non-exists", &result));
2929

@@ -63,7 +63,7 @@ void incrementData(SharedData *shared_data, std::string_view vm_id, std::string_
6363
}
6464

6565
TEST(SharedData, Concurrent) {
66-
SharedData shared_data;
66+
SharedData shared_data(false);
6767
std::pair<std::string, uint32_t> result;
6868

6969
std::string_view vm_id = "id";
@@ -80,7 +80,7 @@ TEST(SharedData, Concurrent) {
8080
}
8181

8282
TEST(SharedData, DeleteByVmId) {
83-
SharedData shared_data;
83+
SharedData shared_data(false);
8484
std::string_view vm_id = "id";
8585
std::string_view key = "key";
8686
std::string_view value;
@@ -91,7 +91,7 @@ TEST(SharedData, DeleteByVmId) {
9191
EXPECT_EQ(WasmResult::NotFound, shared_data.get(vm_id, key, &result));
9292
}
9393

94-
TEST(SharedData, VmIdCleanup) {
94+
TEST(SharedData, VmIdHandleCleanup) {
9595
SharedData shared_data;
9696
std::string_view vm_id = "proxy_wasm_shared_data_test";
9797
auto handle = getVmIdHandle(vm_id);

test/shared_queue_test.cc

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,29 @@
1818

1919
#include "gtest/gtest.h"
2020

21+
#include "include/proxy-wasm/vm_id_handle.h"
22+
2123
namespace proxy_wasm {
2224

25+
TEST(SharedQueue, NextQueueToken) {
26+
SharedQueue shared_queue(false);
27+
for (auto i = 1; i < 5; i++) {
28+
EXPECT_EQ(i, shared_queue.nextQueueToken());
29+
}
30+
EXPECT_EQ(5, shared_queue.registerQueue("a", "b", 1, nullptr, "c"));
31+
}
32+
2333
TEST(SharedQueue, SingleThread) {
24-
SharedQueue shared_queue;
34+
SharedQueue shared_queue(false);
2535
std::string_view vm_id = "id";
2636
std::string_view vm_key = "vm_key";
2737
std::string_view queue_name = "name";
2838
uint32_t context_id = 1;
2939

30-
EXPECT_EQ(1, shared_queue.registerQueue(vm_id, queue_name, context_id, nullptr, vm_key));
40+
for (auto i = 0; i < 3; i++) {
41+
// same token
42+
EXPECT_EQ(1, shared_queue.registerQueue(vm_id, queue_name, context_id, nullptr, vm_key));
43+
}
3144
EXPECT_EQ(1, shared_queue.resolveQueue(vm_id, queue_name));
3245
EXPECT_EQ(0, shared_queue.resolveQueue(vm_id, "non-exist"));
3346
EXPECT_EQ(0, shared_queue.resolveQueue("non-exist", queue_name));
@@ -69,7 +82,7 @@ void dequeueData(SharedQueue *shared_queue, uint32_t token, size_t *dequeued_cou
6982
}
7083

7184
TEST(SharedQueue, Concurrent) {
72-
SharedQueue shared_queue;
85+
SharedQueue shared_queue(false);
7386
std::string_view vm_id = "id";
7487
std::string_view vm_key = "vm_key";
7588
std::string_view queue_name = "name";
@@ -100,4 +113,47 @@ TEST(SharedQueue, Concurrent) {
100113
EXPECT_EQ(first_cnt + second_cnt, 200);
101114
}
102115

116+
TEST(SharedQueue, DeleteByVmId) {
117+
SharedQueue shared_queue(false);
118+
auto vm_id_1 = "id_1";
119+
auto vm_id_2 = "id_2";
120+
std::string_view vm_key = "vm_key";
121+
uint32_t context_id = 1;
122+
auto queue_num_per_vm = 3;
123+
124+
for (auto i = 1; i < queue_num_per_vm; i++) {
125+
EXPECT_EQ(i,
126+
shared_queue.registerQueue(vm_id_1, std::to_string(i), context_id, nullptr, vm_key));
127+
EXPECT_EQ(i, shared_queue.resolveQueue(vm_id_1, std::to_string(i)));
128+
}
129+
130+
for (auto i = queue_num_per_vm; i < 2 * queue_num_per_vm; i++) {
131+
EXPECT_EQ(i,
132+
shared_queue.registerQueue(vm_id_2, std::to_string(i), context_id, nullptr, vm_key));
133+
EXPECT_EQ(i, shared_queue.resolveQueue(vm_id_2, std::to_string(i)));
134+
}
135+
136+
shared_queue.deleteByVmId(vm_id_1);
137+
for (auto i = 1; i < queue_num_per_vm; i++) {
138+
EXPECT_EQ(0, shared_queue.resolveQueue(vm_id_1, std::to_string(i)));
139+
}
140+
141+
for (auto i = queue_num_per_vm; i < 2 * queue_num_per_vm; i++) {
142+
EXPECT_EQ(i, shared_queue.resolveQueue(vm_id_2, std::to_string(i)));
143+
}
144+
}
145+
146+
TEST(SharedQueue, VmIdHandleCleanup) {
147+
SharedQueue shared_queue;
148+
std::string_view vm_id = "proxy_wasm_shared_queue_test";
149+
std::string_view queue_name = "name";
150+
151+
auto handle = getVmIdHandle(vm_id);
152+
EXPECT_EQ(1, shared_queue.registerQueue(vm_id, queue_name, 1, nullptr, "vm_key"));
153+
EXPECT_EQ(1, shared_queue.resolveQueue(vm_id, queue_name));
154+
155+
handle.reset();
156+
EXPECT_EQ(0, shared_queue.resolveQueue(vm_id, queue_name));
157+
}
158+
103159
} // namespace proxy_wasm

0 commit comments

Comments
 (0)