Skip to content

Commit bc2f392

Browse files
committed
catchup update #4 (fix removed files)
1 parent 50cd84d commit bc2f392

File tree

2 files changed

+102
-10
lines changed

2 files changed

+102
-10
lines changed

src/catchup.c

+47-4
Original file line numberDiff line numberDiff line change
@@ -311,15 +311,12 @@ do_catchup_instance(const char *source_pgdata, const char *dest_pgdata, PGconn *
311311
pgFile *file = (pgFile *) parray_get(source_filelist, i);
312312
char parent_dir[MAXPGPATH];
313313

314-
/* setup threads */
315-
pg_atomic_clear_flag(&file->lock);
316-
317314
if (!S_ISDIR(file->mode))
318315
continue;
319316

320317
/*
321318
* check if it is fake "directory" and is a tablespace link
322-
* это происходить потому что мы передали follow_symlink при построении списка
319+
* это происходит потому что мы передали follow_symlink при построении списка
323320
*/
324321
/* get parent dir of rel_path */
325322
strncpy(parent_dir, file->rel_path, MAXPGPATH);
@@ -342,12 +339,14 @@ do_catchup_instance(const char *source_pgdata, const char *dest_pgdata, PGconn *
342339
const char *linked_path = NULL;
343340
char to_path[MAXPGPATH];
344341

342+
// perform additional check that this is actually synlink?
345343
{ /* get full symlink path and map this path to new location */
346344
char source_full_path[MAXPGPATH];
347345
char symlink_content[MAXPGPATH];
348346
join_path_components(source_full_path, source_pgdata, file->rel_path);
349347
fio_readlink(source_full_path, symlink_content, sizeof(symlink_content), FIO_DB_HOST);
350348
linked_path = leaked_abstraction_get_tablespace_mapping(symlink_content);
349+
// TODO: check that linked_path != symlink_content in case of local catchup?
351350
elog(WARNING, "Map tablespace full_path: \"%s\" old_symlink_content: \"%s\" old_symlink_content: \"%s\"\n",
352351
source_full_path,
353352
symlink_content,
@@ -373,6 +372,50 @@ do_catchup_instance(const char *source_pgdata, const char *dest_pgdata, PGconn *
373372
}
374373
}
375374

375+
if (!dest_pgdata_is_empty &&
376+
(current.backup_mode == BACKUP_MODE_DIFF_PTRACK ||
377+
current.backup_mode == BACKUP_MODE_DIFF_DELTA))
378+
{
379+
elog(INFO, "Removing redundant files in destination directory");
380+
parray_qsort(dest_filelist, pgFileCompareRelPathWithExternalDesc);
381+
for (i = 0; i < parray_num(dest_filelist); i++)
382+
{
383+
bool redundant = true;
384+
pgFile *file = (pgFile *) parray_get(dest_filelist, i);
385+
386+
if (parray_bsearch(source_filelist, file, pgFileCompareRelPathWithExternal))
387+
redundant = false;
388+
389+
/* pg_filenode.map are always restored, because it's crc cannot be trusted */
390+
if (file->external_dir_num == 0 &&
391+
pg_strcasecmp(file->name, RELMAPPER_FILENAME) == 0)
392+
redundant = true;
393+
394+
/* do not delete the useful internal directories */
395+
if (S_ISDIR(file->mode) && !redundant)
396+
continue;
397+
398+
/* if file does not exists in destination list, then we can safely unlink it */
399+
if (redundant)
400+
{
401+
char fullpath[MAXPGPATH];
402+
403+
join_path_components(fullpath, dest_pgdata, file->rel_path);
404+
405+
fio_delete(file->mode, fullpath, FIO_DB_HOST);
406+
elog(VERBOSE, "Deleted file \"%s\"", fullpath);
407+
408+
/* shrink pgdata list */
409+
pgFileFree(file);
410+
parray_remove(dest_filelist, i);
411+
i--;
412+
}
413+
}
414+
}
415+
416+
/* clear file locks */
417+
pfilearray_clear_locks(source_filelist);
418+
376419
/* Sort by size for load balancing */
377420
parray_qsort(source_filelist, pgFileCompareSize);
378421

tests/catchup.py

+55-6
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,6 @@ def test_multithread_local_transfer(self):
4242

4343
# @unittest.skip("skip")
4444
def test_local_simple_transfer_with_tablespace(self):
45-
"""
46-
Test 'multithreaded basebackup' mode
47-
create node, insert some test data, catchup into other dir, start, select test data
48-
"""
4945
fname = self.id().split('.')[3]
5046

5147
source_pg = self.make_simple_node(
@@ -203,10 +199,8 @@ def test_remote_delta_catchup(self):
203199
base_dir = os.path.join(module_name, fname, 'src'),
204200
set_replication = True,
205201
ptrack_enable = True,
206-
initdb_params = ['--data-checksums']
207202
)
208203
source_pg.slow_start()
209-
source_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
210204
source_pg.safe_psql("postgres", "CREATE TABLE ultimate_question(answer int)")
211205

212206
# make clean shutdowned lagging behind replica
@@ -249,3 +243,58 @@ def test_remote_delta_catchup(self):
249243
# Clean after yourself
250244
self.del_test_dir(module_name, fname)
251245

246+
# @unittest.skip("skip")
247+
def test_table_drop(self):
248+
"""
249+
Test 'multithreaded basebackup' mode
250+
create node, insert some test data, catchup into other dir, start, select test data
251+
"""
252+
fname = self.id().split('.')[3]
253+
254+
source_pg = self.make_simple_node(
255+
base_dir = os.path.join(module_name, fname, 'src'),
256+
ptrack_enable = True,
257+
initdb_params = ['--data-checksums'])
258+
source_pg.slow_start()
259+
260+
source_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
261+
source_pg.safe_psql(
262+
"postgres",
263+
"CREATE TABLE ultimate_question AS SELECT 42 AS answer")
264+
265+
dest_pg = self.make_empty_node(os.path.join(module_name, fname, 'dst'))
266+
dest_pg = self.catchup_node(
267+
backup_mode = 'FULL',
268+
source_pgdata = source_pg.data_dir,
269+
destination_node = dest_pg,
270+
options = [
271+
'-d', 'postgres',
272+
'-p', str(source_pg.port),
273+
'--stream'
274+
]
275+
)
276+
277+
dest_pg.slow_start()
278+
dest_pg.stop()
279+
280+
source_pg.safe_psql("postgres", "DROP TABLE ultimate_question")
281+
source_pg.safe_psql("postgres", "CHECKPOINT")
282+
source_pg.safe_psql("postgres", "CHECKPOINT")
283+
284+
# catchup
285+
self.catchup_node(
286+
backup_mode = 'PTRACK',
287+
source_pgdata = source_pg.data_dir,
288+
destination_node = dest_pg,
289+
options = ['-d', 'postgres', '-p', str(source_pg.port), '--stream'])
290+
291+
source_pg.stop()
292+
dest_pg.slow_start()
293+
dest_pg.stop()
294+
295+
source_pgdata = self.pgdata_content(source_pg.data_dir)
296+
dest_pgdata = self.pgdata_content(dest_pg.data_dir)
297+
self.compare_pgdata(source_pgdata, dest_pgdata)
298+
299+
# Clean after yourself
300+
self.del_test_dir(module_name, fname)

0 commit comments

Comments
 (0)