Skip to content

Commit 9d8a6c9

Browse files
rennoxdahlerlend
authored andcommitted
BUG#37676790 Scripting REST end point returning stalled results
There was a major issue with stalled results on a released context. i.e. in the case processing ended with a time-out, even the request was considered complete, because the context remains alive for the execution of additional requests, the responses associated to the first request were still generated and consumed by the next request handled by the context. To address this, logic was needed to ensure stalled responses after a context is released are discardedi before putting back the context in the pool, this change made evident a second problem: the release logic is synchronous, so the client did not get any response but until the context was really returned to the pool. To address this second problem, the a release queue and a release thread were added to the pool, once a context is done processing a request, it is added to the queue, and the pool release thread waits for the context to be available to return it to the pool. Change-Id: I8973b202265f862f824dc378b07d546235ad266b
1 parent 830fac7 commit 9d8a6c9

11 files changed

+217
-119
lines changed

router/src/jit_executor/include/mysqlrouter/jit_executor_context.h

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,9 @@ class IContext {
4646
const std::vector<shcore::Value> &parameters,
4747
int timeout, ResultType result_type,
4848
const GlobalCallbacks &global_callbacks) = 0;
49-
virtual bool got_resources_error() const = 0;
49+
50+
virtual bool wait_for_idle() = 0;
51+
virtual size_t id() = 0;
5052
};
5153

5254
} // namespace jit_executor

router/src/jit_executor/src/jit_executor_common_context.cc

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -187,8 +187,8 @@ void CommonContext::life_cycle_thread() {
187187
m_init_condition.notify_one();
188188

189189
// Now waits for the finalization indication
190-
{
191-
std::unique_lock<std::mutex> lock(m_mutex);
190+
if (m_initialized) {
191+
std::unique_lock<std::mutex> lock(m_finish_mutex);
192192
m_finish_condition.wait(lock, [this]() { return m_terminated; });
193193
}
194194

router/src/jit_executor/src/jit_executor_context_pool.cc

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
#include <string>
3232
#include <vector>
3333

34+
#include "include/my_thread.h"
3435
#include "jit_executor_common_context.h"
3536
#include "jit_executor_javascript_context.h"
3637

@@ -40,10 +41,12 @@ ContextPool::ContextPool(size_t size, CommonContext *common_context)
4041
: m_common_context{common_context} {
4142
m_pool = std::make_unique<Pool<IContext *>>(
4243
size,
43-
[this]() -> JavaScriptContext * {
44-
auto context = std::make_unique<JavaScriptContext>(m_common_context);
45-
if (context->got_initialization_error()) {
46-
return nullptr;
44+
[this](size_t id) -> JavaScriptContext * {
45+
auto context =
46+
std::make_unique<JavaScriptContext>(id, m_common_context);
47+
if (!context->started()) {
48+
// The factory function should throw runtime exception if fails
49+
throw std::runtime_error("Failed initializing JavaScriptContext");
4750
}
4851

4952
return context.release();
@@ -53,21 +56,49 @@ ContextPool::ContextPool(size_t size, CommonContext *common_context)
5356

5457
ContextPool::~ContextPool() { teardown(); }
5558

59+
void ContextPool::teardown() {
60+
m_pool->teardown();
61+
62+
// Tear down the releaser thread
63+
release(nullptr);
64+
65+
if (m_release_thread) {
66+
m_release_thread->join();
67+
m_release_thread.reset();
68+
}
69+
}
70+
5671
std::shared_ptr<PooledContextHandle> ContextPool::get_context() {
5772
auto ctx = m_pool->get();
5873

5974
if (ctx) {
75+
if (!m_release_thread) {
76+
m_release_thread =
77+
std::make_unique<std::thread>(&ContextPool::release_thread, this);
78+
}
79+
6080
return std::make_shared<PooledContextHandle>(this, ctx);
6181
}
6282

6383
return {};
6484
}
6585

66-
void ContextPool::release(IContext *ctx) {
67-
if (ctx->got_resources_error()) {
68-
m_pool->on_resources_error(ctx);
69-
} else {
70-
m_pool->release(ctx);
86+
void ContextPool::release(IContext *ctx) { m_release_queue.push(ctx); }
87+
88+
void ContextPool::release_thread() {
89+
my_thread_self_setname("Jit-CtxDispose");
90+
while (true) {
91+
auto ctx = m_release_queue.pop();
92+
if (ctx) {
93+
if (ctx->wait_for_idle()) {
94+
m_pool->release(ctx);
95+
} else {
96+
m_pool->discard(ctx, true);
97+
}
98+
} else {
99+
// nullptr arrived, meaning we are done
100+
break;
101+
}
71102
}
72103
}
73104

router/src/jit_executor/src/jit_executor_context_pool.h

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -33,22 +33,27 @@
3333
#include <mutex>
3434
#include <stdexcept>
3535
#include <string>
36+
#include <thread>
3637
#include <vector>
3738

39+
#include "mysql/harness/logging/logging.h"
40+
#include "mysql/harness/mpsc_queue.h"
3841
#include "mysqlrouter/jit_executor_context.h"
3942
#include "mysqlrouter/jit_executor_context_handle.h"
4043
#include "mysqlrouter/jit_executor_value.h"
4144
#include "mysqlrouter/polyglot_file_system.h"
4245

4346
namespace jit_executor {
4447

48+
IMPORT_LOG_FUNCTIONS()
49+
4550
/**
4651
* Generic implementation of a pool
4752
*/
4853
template <class T>
4954
class Pool {
5055
public:
51-
explicit Pool(size_t size, const std::function<T()> &factory,
56+
explicit Pool(size_t size, const std::function<T(size_t)> &factory,
5257
const std::function<void(T)> &destructor = {})
5358
: m_pool_size{size},
5459
m_item_factory{factory},
@@ -84,7 +89,7 @@ class Pool {
8489
}
8590

8691
try {
87-
T item = m_item_factory();
92+
T item = m_item_factory(m_created_items);
8893
increase_active_items();
8994
return item;
9095
} catch (const std::runtime_error &err) {
@@ -99,15 +104,12 @@ class Pool {
99104

100105
if (!m_teardown && m_items.size() < m_pool_size) {
101106
m_items.push_back(ctx);
102-
m_item_availability.notify_all();
107+
m_item_availability.notify_one();
103108
return;
104109
}
105110
}
106111

107-
if (m_item_destructor) {
108-
m_item_destructor(ctx);
109-
}
110-
decrease_active_items();
112+
discard(ctx, false);
111113
}
112114

113115
void teardown() {
@@ -120,10 +122,7 @@ class Pool {
120122
auto item = m_items.front();
121123
m_items.pop_front();
122124

123-
if (m_item_destructor) {
124-
m_item_destructor(item);
125-
}
126-
decrease_active_items();
125+
discard(item, false);
127126
}
128127

129128
// Waits until all the contexts created by the pool get released
@@ -139,18 +138,24 @@ class Pool {
139138
/**
140139
* Discards the affected context and turns ON contention mode for the pool
141140
*/
142-
void on_resources_error(T ctx) {
141+
void discard(T ctx, bool set_contention_mode) {
142+
decrease_active_items(set_contention_mode);
143+
143144
if (m_item_destructor) {
144-
m_item_destructor(ctx);
145+
try {
146+
m_item_destructor(ctx);
147+
} catch (const std::exception &e) {
148+
log_error("%s", e.what());
149+
}
145150
}
146-
decrease_active_items(true);
147151
}
148152

149153
private:
150154
void increase_active_items() {
151155
{
152156
std::scoped_lock lock(m_mutex);
153157
m_active_items++;
158+
m_created_items++;
154159
}
155160
}
156161

@@ -171,9 +176,10 @@ class Pool {
171176
bool m_teardown = false;
172177
size_t m_pool_size;
173178
std::deque<T> m_items;
174-
std::function<T()> m_item_factory;
179+
std::function<T(size_t id)> m_item_factory;
175180
std::function<void(T)> m_item_destructor;
176181
size_t m_active_items = 0;
182+
size_t m_created_items = 0;
177183
bool m_contention_mode = false;
178184
};
179185

@@ -187,11 +193,15 @@ class ContextPool final {
187193

188194
std::shared_ptr<PooledContextHandle> get_context();
189195
void release(IContext *ctx);
190-
void teardown() { m_pool->teardown(); }
196+
void teardown();
191197

192198
private:
199+
void release_thread();
200+
193201
CommonContext *m_common_context;
194202
std::unique_ptr<Pool<IContext *>> m_pool;
203+
mysql_harness::WaitingMPSCQueue<IContext *> m_release_queue;
204+
std::unique_ptr<std::thread> m_release_thread;
195205
};
196206

197207
/**

router/src/jit_executor/src/jit_executor_debug_context_handle.cc

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ DebugContextHandle::DebugContextHandle(const std::string &debug_port,
3535
CommonContext *common_context) {
3636
assert(!debug_port.empty());
3737
assert(common_context);
38-
m_ctx = std::make_unique<JavaScriptContext>(common_context, debug_port);
38+
m_ctx = std::make_unique<JavaScriptContext>(0, common_context, debug_port);
3939
}
4040

4141
} // namespace jit_executor

0 commit comments

Comments
 (0)