Skip to content

Commit 405f5dc

Browse files
committed
BUG#35403452 PAR Dump Enhancements: File Buffering and Worker Session Pool
When dumping using PAR, the workers split the data dump operation in 2 parts: - Buffering data - Transferring data to Bucket This patch replaces the memory buffering with file buffering. The dump files will first be dumped in local disk to later be transferred to the target bucket. The location of the files will in the system default temporary folder. Additionally, a new session pool is introduced on the dumper to enable the workers to release the database session as soon as the buffering step is complete. This way, the worker thread can continue with the data transfer while another worker can pick the next dump operation. Also the number of workers when dumping with PAR is internally doubled, so there can be up to 'threads'*2 workers doing dump/transfer operations while there are at most 'threads' workers doing dump operations. Notes on System Temporary Directory ----------------------------------- The following order of precedence is used to determine the value of the temporary directory: POSIX: TMPDIR, TMP, TEMP, TEMPDIR. If none is defined uses /tmp. Windows: TMP, TEMP, USERPROFILE. If none defined, uses the Windows directory. Change-Id: I99c534c6b24857ceb7544a14f847cf5a4c7bb9c9
1 parent 3ed372d commit 405f5dc

File tree

18 files changed

+458
-115
lines changed

18 files changed

+458
-115
lines changed

modules/util/dump/ddl_dumper_options.cc

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ const shcore::Option_pack_def<Ddl_dumper_options>
6565
.include<Dump_options>()
6666
.optional("chunking", &Ddl_dumper_options::m_split)
6767
.optional("bytesPerChunk", &Ddl_dumper_options::set_bytes_per_chunk)
68-
.optional("threads", &Ddl_dumper_options::m_threads)
68+
.optional("threads", &Ddl_dumper_options::set_threads)
6969
.optional("triggers", &Ddl_dumper_options::m_dump_triggers)
7070
.optional("tzUtc", &Ddl_dumper_options::m_timezone_utc)
7171
.optional("ddlOnly", &Ddl_dumper_options::m_ddl_only)
@@ -174,6 +174,13 @@ void Ddl_dumper_options::set_dry_run(bool dry_run) {
174174
set_dry_run_mode(dry_run ? Dry_run::DONT_WRITE_ANY_FILES : Dry_run::DISABLED);
175175
}
176176

177+
void Ddl_dumper_options::set_threads(uint64_t threads) {
178+
m_threads = threads;
179+
180+
// By default, m_worker_threads is equal to m_threads
181+
m_worker_threads = threads;
182+
}
183+
177184
const Object_storage_options *Ddl_dumper_options::object_storage_options()
178185
const {
179186
if (m_dump_manifest_options) {
@@ -201,6 +208,9 @@ void Ddl_dumper_options::set_output_url(http://github.com/const%20std::string%20&url) {
201208
} else {
202209
if (par.type() == mysqlshdk::oci::PAR_type::PREFIX) {
203210
set_storage_config(dump::common::get_par_config(par));
211+
212+
// For dumps with PAR prefix, doubles the number of worker threads
213+
m_worker_threads = m_threads * 2;
204214
} else {
205215
throw std::invalid_argument("The given URL is not a prefix PAR.");
206216
}

modules/util/dump/ddl_dumper_options.h

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,8 @@ class Ddl_dumper_options : public Dump_options {
5656

5757
std::size_t threads() const override { return m_threads; }
5858

59+
std::size_t worker_threads() const override { return m_worker_threads; }
60+
5961
bool is_export_only() const override { return false; }
6062

6163
bool use_single_file() const override { return false; }
@@ -96,6 +98,7 @@ class Ddl_dumper_options : public Dump_options {
9698
void set_ocimds(bool value);
9799
void set_compatibility_options(const std::vector<std::string> &options);
98100
void set_dry_run(bool dry_run);
101+
void set_threads(uint64_t threads);
99102
const Object_storage_options *object_storage_options() const;
100103

101104
Dump_manifest_options m_dump_manifest_options;
@@ -106,8 +109,15 @@ class Ddl_dumper_options : public Dump_options {
106109

107110
bool m_split = true;
108111
uint64_t m_bytes_per_chunk;
112+
113+
// Number of threads requested by the user (or default)
114+
// At most this number of database connections will be used in the dump
115+
// operation.
109116
uint64_t m_threads = 4;
110117

118+
// Internal number of threads (to be doubled in the case of prefix PAR dumps)
119+
uint64_t m_worker_threads = 4;
120+
111121
bool m_dump_triggers = true;
112122
bool m_timezone_utc = true;
113123
bool m_ddl_only = false;

modules/util/dump/dump_options.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,8 @@ class Dump_options {
148148

149149
virtual std::size_t threads() const = 0;
150150

151+
virtual std::size_t worker_threads() const { return threads(); }
152+
151153
virtual bool is_export_only() const = 0;
152154

153155
virtual bool use_single_file() const = 0;

modules/util/dump/dumper.cc

Lines changed: 48 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@
2525

2626
#include <algorithm>
2727
#include <chrono>
28-
#include <condition_variable>
2928
#include <cstdint>
3029
#include <functional>
3130
#include <iterator>
@@ -57,6 +56,7 @@
5756
#include "mysqlshdk/libs/utils/rate_limit.h"
5857
#include "mysqlshdk/libs/utils/std.h"
5958
#include "mysqlshdk/libs/utils/strformat.h"
59+
#include "mysqlshdk/libs/utils/utils_file.h"
6060
#include "mysqlshdk/libs/utils/utils_general.h"
6161
#include "mysqlshdk/libs/utils/utils_lexing.h"
6262
#include "mysqlshdk/libs/utils/utils_mysql_parsing.h"
@@ -345,31 +345,6 @@ bool check_if_transactions_are_ddl_safe(
345345

346346
} // namespace
347347

348-
class Dumper::Synchronize_workers final {
349-
public:
350-
Synchronize_workers() = default;
351-
~Synchronize_workers() = default;
352-
353-
void wait_for(const uint16_t count) {
354-
std::unique_lock<std::mutex> lock(m_mutex);
355-
m_condition.wait(lock, [this, count]() { return m_count >= count; });
356-
m_count -= count;
357-
}
358-
359-
void notify() {
360-
{
361-
std::lock_guard<std::mutex> lock(m_mutex);
362-
++m_count;
363-
}
364-
m_condition.notify_one();
365-
}
366-
367-
private:
368-
std::mutex m_mutex;
369-
std::condition_variable m_condition;
370-
uint16_t m_count = 0;
371-
};
372-
373348
class Dumper::Dump_writer_controller {
374349
public:
375350
using Create_file = std::function<std::unique_ptr<mysqlshdk::storage::IFile>(
@@ -705,6 +680,12 @@ class Dumper::Table_worker final {
705680

706681
~Table_worker() = default;
707682

683+
void release_session() {
684+
if (m_session) {
685+
m_dumper->session_pool().push(std::move(m_session));
686+
}
687+
}
688+
708689
void run() {
709690
std::string context;
710691

@@ -714,14 +695,6 @@ class Dumper::Table_worker final {
714695
{"id", std::to_string(m_id)}}));
715696

716697
mysqlsh::Mysql_thread mysql_thread;
717-
shcore::on_leave_scope close_session([this]() {
718-
if (m_session) {
719-
m_session->close();
720-
}
721-
});
722-
723-
open_session();
724-
725698
m_rate_limit =
726699
mysqlshdk::utils::Rate_limit(m_dumper->m_options.max_rate());
727700

@@ -738,14 +711,20 @@ class Dumper::Table_worker final {
738711

739712
context = std::move(work.info);
740713

714+
m_session = m_dumper->session_pool().pop();
715+
shcore::on_leave_scope session_releaser(
716+
[this]() { release_session(); });
717+
718+
if (m_dumper->m_worker_interrupt) {
719+
return;
720+
}
721+
741722
work.task(this);
742723

743724
if (m_dumper->m_worker_interrupt) {
744725
return;
745726
}
746727
}
747-
748-
m_dumper->assert_transaction_is_open(m_session);
749728
} catch (const mysqlshdk::db::Error &e) {
750729
handle_exception(context, e.format().c_str());
751730
} catch (const shcore::Error &e) {
@@ -765,18 +744,6 @@ class Dumper::Table_worker final {
765744
private:
766745
friend class Dumper;
767746

768-
void open_session() {
769-
// notify dumper that the session has been established
770-
shcore::on_leave_scope notify_dumper(
771-
[this]() { m_dumper->m_worker_synchronization->notify(); });
772-
773-
m_session =
774-
establish_session(m_dumper->session()->get_connection_options(), false);
775-
776-
m_dumper->start_transaction(m_session);
777-
m_dumper->on_init_thread_session(m_session);
778-
}
779-
780747
inline std::shared_ptr<mysqlshdk::db::IResult> query(
781748
const std::string &sql) const {
782749
return Dumper::query(m_session, sql);
@@ -875,6 +842,8 @@ class Dumper::Table_worker final {
875842
throw;
876843
}
877844

845+
release_session();
846+
878847
controller->finish_writing();
879848

880849
duration.finish();
@@ -1579,6 +1548,7 @@ class Dumper::Table_worker final {
15791548

15801549
void handle_exception(const std::string &context, const char *msg) {
15811550
m_dumper->m_worker_exceptions[m_id] = std::current_exception();
1551+
m_dumper->m_worker_exception_thrown = true;
15821552
current_console()->print_error(
15831553
m_log_id + (context.empty() ? "" : "Error while " + context + ": ") +
15841554
msg);
@@ -1828,10 +1798,6 @@ Dumper::Dumper(const Dump_options &options)
18281798
mysqlshdk::storage::get_extension(m_options.compression());
18291799
}
18301800

1831-
// needs to be defined here due to Dumper::Synchronize_workers being
1832-
// incomplete type
1833-
Dumper::~Dumper() = default;
1834-
18351801
void Dumper::run() {
18361802
if (m_options.is_dry_run()) {
18371803
current_console()->print_info(
@@ -1934,15 +1900,15 @@ void Dumper::do_run() {
19341900
return;
19351901
}
19361902

1903+
create_worker_sessions();
1904+
19371905
create_worker_threads();
19381906

19391907
m_current_stage->finish();
19401908

19411909
// initialize cache while threads are starting up
19421910
initialize_instance_cache();
19431911

1944-
wait_for_workers();
1945-
19461912
if (m_options.consistent_dump() && !m_worker_interrupt) {
19471913
current_console()->print_info("All transactions have been started");
19481914
lock_instance();
@@ -1956,6 +1922,20 @@ void Dumper::do_run() {
19561922
}
19571923
}
19581924

1925+
shcore::on_leave_scope cleanup([this]() {
1926+
// Ensures all the worker sessions get closed
1927+
size_t worker_sessions = m_options.threads();
1928+
while (worker_sessions) {
1929+
auto session = m_session_pool.pop();
1930+
1931+
if (!m_worker_exception_thrown) {
1932+
assert_transaction_is_open(session);
1933+
}
1934+
session->close();
1935+
--worker_sessions;
1936+
}
1937+
});
1938+
19591939
create_schema_tasks();
19601940

19611941
validate_privileges();
@@ -2803,23 +2783,30 @@ void Dumper::close_output_directory() {
28032783
m_output_dir.reset();
28042784
}
28052785

2786+
void Dumper::create_worker_sessions() {
2787+
for (std::size_t i = 0; i < m_options.threads(); ++i) {
2788+
auto worker_session =
2789+
establish_session(session()->get_connection_options(), false);
2790+
2791+
start_transaction(worker_session);
2792+
on_init_thread_session(worker_session);
2793+
2794+
m_session_pool.push(std::move(worker_session));
2795+
}
2796+
}
2797+
28062798
void Dumper::create_worker_threads() {
28072799
m_worker_exceptions.clear();
2808-
m_worker_exceptions.resize(m_options.threads());
2809-
m_worker_synchronization = std::make_unique<Synchronize_workers>();
2800+
m_worker_exceptions.resize(m_options.worker_threads());
28102801

2811-
for (std::size_t i = 0; i < m_options.threads(); ++i) {
2802+
for (std::size_t i = 0; i < m_options.worker_threads(); ++i) {
28122803
auto t = mysqlsh::spawn_scoped_thread(
28132804
&Table_worker::run,
28142805
Table_worker{i, this, Table_worker::Exception_strategy::ABORT});
28152806
m_workers.emplace_back(std::move(t));
28162807
}
28172808
}
28182809

2819-
void Dumper::wait_for_workers() {
2820-
m_worker_synchronization->wait_for(m_workers.size());
2821-
}
2822-
28232810
void Dumper::maybe_push_shutdown_tasks() {
28242811
if (0 == m_chunking_tasks &&
28252812
m_main_thread_finished_producing_chunking_tasks) {

modules/util/dump/dumper.h

Lines changed: 14 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,11 @@
2525
#define MODULES_UTIL_DUMP_DUMPER_H_
2626

2727
#include <atomic>
28+
#include <condition_variable>
2829
#include <functional>
2930
#include <memory>
3031
#include <mutex>
32+
#include <queue>
3133
#include <string>
3234
#include <string_view>
3335
#include <thread>
@@ -65,7 +67,7 @@ class Dumper {
6567
Dumper &operator=(const Dumper &) = delete;
6668
Dumper &operator=(Dumper &&) = delete;
6769

68-
virtual ~Dumper();
70+
virtual ~Dumper() = default;
6971

7072
void run();
7173

@@ -161,8 +163,6 @@ class Dumper {
161163
std::function<void(Table_worker *)> task;
162164
};
163165

164-
class Synchronize_workers;
165-
166166
class Dump_info;
167167

168168
class Memory_dumper;
@@ -228,9 +228,9 @@ class Dumper {
228228

229229
void close_output_directory();
230230

231-
void create_worker_threads();
231+
void create_worker_sessions();
232232

233-
void wait_for_workers();
233+
void create_worker_threads();
234234

235235
void maybe_push_shutdown_tasks();
236236

@@ -370,6 +370,11 @@ class Dumper {
370370

371371
void throw_if_cannot_dump_users() const;
372372

373+
shcore::Synchronized_queue<std::shared_ptr<mysqlshdk::db::ISession>>
374+
&session_pool() {
375+
return m_session_pool;
376+
}
377+
373378
// session
374379
std::shared_ptr<mysqlshdk::db::ISession> m_session;
375380
std::vector<std::shared_ptr<mysqlshdk::db::ISession>> m_lock_sessions;
@@ -435,10 +440,10 @@ class Dumper {
435440
// threads
436441
std::vector<std::thread> m_workers;
437442
std::vector<std::exception_ptr> m_worker_exceptions;
443+
std::atomic<bool> m_worker_exception_thrown = false;
438444
shcore::Synchronized_queue<Task_info> m_worker_tasks;
439445
std::atomic<uint64_t> m_chunking_tasks;
440446
std::atomic<bool> m_main_thread_finished_producing_chunking_tasks;
441-
std::unique_ptr<Synchronize_workers> m_worker_synchronization;
442447
std::function<std::unique_ptr<Dump_writer>()> m_writer_creator;
443448
volatile bool m_worker_interrupt = false;
444449

@@ -448,6 +453,9 @@ class Dumper {
448453
mutable Progress_thread m_progress_thread;
449454
mutable Progress_thread::Stage *m_current_stage = nullptr;
450455
Progress_thread::Stage *m_data_dump_stage = nullptr;
456+
457+
shcore::Synchronized_queue<std::shared_ptr<mysqlshdk::db::ISession>>
458+
m_session_pool;
451459
};
452460

453461
} // namespace dump

0 commit comments

Comments
 (0)