diff --git a/Makefile b/Makefile index 1431be4ef..5173aa38f 100644 --- a/Makefile +++ b/Makefile @@ -7,7 +7,7 @@ OBJS = src/utils/configuration.o src/utils/json.o src/utils/logger.o \ OBJS += src/archive.o src/backup.o src/catalog.o src/checkdb.o src/configure.o src/data.o \ src/delete.o src/dir.o src/fetch.o src/help.o src/init.o src/merge.o \ src/parsexlog.o src/ptrack.o src/pg_probackup.o src/restore.o src/show.o src/stream.o \ - src/util.o src/validate.o src/datapagemap.o + src/util.o src/validate.o src/datapagemap.o src/catchup.o # borrowed files OBJS += src/pg_crc.o src/receivelog.o src/streamutil.o \ diff --git a/doc/pgprobackup.xml b/doc/pgprobackup.xml index b1ddd0032..f7814c2d2 100644 --- a/doc/pgprobackup.xml +++ b/doc/pgprobackup.xml @@ -143,6 +143,14 @@ doc/src/sgml/pgprobackup.sgml wal_file_name option + + pg_probackup + + catchup_mode + =path_to_pgdata_on_remote_server + =path_to_local_dir + option + @@ -283,6 +291,11 @@ doc/src/sgml/pgprobackup.sgml Partial restore: restoring only the specified databases. + + + Catchup: cloning a PostgreSQL instance for a fallen-behind standby server to catch up with master. + + To manage backup data, pg_probackup creates a @@ -1076,7 +1089,8 @@ GRANT SELECT ON TABLE pg_catalog.pg_database TO backup; mode: , , , - , + , + , and . @@ -1431,6 +1445,7 @@ pg_probackup backup -B backup_dir --instance + Performing Cluster Verification @@ -1506,6 +1521,7 @@ pg_probackup checkdb --amcheck --skip-block-validation [connection_ higher cost of CPU, memory, and I/O consumption. + Validating a Backup @@ -2073,6 +2089,7 @@ pg_probackup restore -B backup_dir --instance , , , + , and processes can be executed on several parallel threads. This can significantly @@ -3390,6 +3407,148 @@ pg_probackup delete -B backup_dir --instance + + + Cloning <productname>PostgreSQL</productname> Instance + + pg_probackup can create a copy of a PostgreSQL + instance directly, without using the backup catalog. This allows you + to add a new standby server in a parallel mode or to have a standby + server that has fallen behind catch up with master. + + + + Cloning a PostgreSQL instance is different from other pg_probackup + operations: + + + + The backup catalog is not required. + + + + + STREAM WAL delivery mode is only supported. + + + + + Copying external directories + is not supported. + + + + + No SQL commands involving tablespaces, such as + CREATE TABLESPACE/DROP TABLESPACE, + can be run simultaneously with catchup. + + + + + catchup takes configuration files, such as + postgresql.conf, postgresql.auto.conf, + or pg_hba.conf, from the source server and overwrites them + on the target server. + + + + + + + Before cloning a PostgreSQL instance, set up the source database server as follows: + + + + Configure + the database cluster for the instance to copy. + + + + + To copy from a remote server, configure the remote mode. + + + + + To use the PTRACK catchup mode, set up PTRACK backups. + + + + + + + To clone a PostgreSQL instance, ensure that the source + database server is running and accepting connections and + on the server with the destination database, run the following command: + + +pg_probackup catchup -b catchup-mode --source-pgdata=path_to_pgdata_on_remote_server --destination-pgdata=path_to_local_dir --stream [connection_options] [remote_options] + + + Where catchup_mode can take one of the + following values: FULL, DELTA, or PTRACK. + + + + + FULL — creates a full copy of the PostgreSQL instance. + The destination directory must be empty for this mode. + + + + + DELTA — reads all data files in the data directory and + creates an incremental copy for pages that have changed + since the destination database was shut down cleanly. + For this mode, the destination directory must contain a previous + copy of the database that was shut down cleanly. + + + + + PTRACK — tracking page changes on the fly, + only copies pages that have changed since the point of divergence + of the source and destination databases. + For this mode, the destination directory must contain a previous + copy of the database that was shut down cleanly. + + + + + You can use connection_options to specify + the connection to the source database cluster. If it is located on a different server, + also specify remote_options. + If the source database contains tablespaces that must be located in + a different directory, additionally specify the + option: + +pg_probackup catchup -b catchup-mode --source-pgdata=path_to_pgdata_on_remote_server --destination-pgdata=path_to_local_dir --stream --tablespace-mapping=OLDDIR=NEWDIR + + To run the catchup command on parallel threads, specify the number + of threads with the option: + +pg_probackup catchup -b catchup-mode --source-pgdata=path_to_pgdata_on_remote_server --destination-pgdata=path_to_local_dir --stream --threads=num_threads + + + + For example, assume that a remote standby server with the PostgreSQL instance having /replica-pgdata data directory has fallen behind. To sync this instance with the one in /master-pgdata data directory, you can run + the catchup command in the PTRACK mode on four parallel threads as follows: + +pg_probackup catchup --source-pgdata=/master-pgdata --destination-pgdata=/replica-pgdata -p 5432 -d postgres -U remote-postgres-user --stream --backup-mode=PTRACK --remote-host=remote-hostname --remote-user=remote-unix-username -j 4 + + + + Another example shows how you can add a new remote standby server with the PostgreSQL data directory /replica-pgdata by running the catchup command in the FULL mode + on four parallel threads: + +pg_probackup catchup --source-pgdata=/master-pgdata --destination-pgdata=/replica-pgdata -p 5432 -d postgres -U remote-postgres-user --stream --backup-mode=FULL --remote-host=remote-hostname --remote-user=remote-unix-username -j 4 + + + @@ -4262,6 +4421,121 @@ pg_probackup archive-get -B backup_dir --instance Archiving Options. + + + catchup + +pg_probackup catchup -b catchup_mode +--source-pgdata=path_to_pgdata_on_remote_server +--destination-pgdata=path_to_local_dir +[--help] [--stream] [-j num_threads] +[-T OLDDIR=NEWDIR] +[connection_options] [remote_options] + + + Creates a copy of a PostgreSQL + instance without using the backup catalog. + + + + + + + + Specifies the catchup mode to use. Possible values are: + + + + + FULL — creates a full copy of the PostgreSQL instance. + + + + + DELTA — reads all data files in the data directory and + creates an incremental copy for pages that have changed + since the destination database was shut down cleanly. + + + + + PTRACK — tracking page changes on the fly, + only copies pages that have changed since the point of divergence + of the source and destination databases. + + + + + + + + + + + + Specifies the path to the data directory of the instance to be copied. The path can be local or remote. + + + + + + + + + Specifies the path to the local data directory to copy to. + + + + + + + + + Makes a STREAM backup, which + includes all the necessary WAL files by streaming them from + the database server via replication protocol. + + + + + + + + + + Sets the number of parallel threads for + catchup process. + + + + + + + + + + Relocates the tablespace from the OLDDIR to the NEWDIR + directory at the time of recovery. Both OLDDIR and NEWDIR must + be absolute paths. If the path contains the equals sign (=), + escape it with a backslash. This option can be specified + multiple times for multiple tablespaces. + + + + + + + + + Additionally, connection + options, remote + mode options can be used. + + + For details on usage, see the section + Cloning PostgreSQL Instance. + + Options @@ -4651,7 +4925,7 @@ pg_probackup archive-get -B backup_dir --instance - Disable the coloring for console log messages of warning and error levels. + Disable coloring for console log messages of warning and error levels. @@ -4804,7 +5078,8 @@ pg_probackup archive-get -B backup_dir --instance Connection Options You can use these options together with - and + + , , and commands. @@ -5095,6 +5370,7 @@ pg_probackup archive-get -B backup_dir --instance , , , + , , , and commands. diff --git a/src/archive.c b/src/archive.c index 4058cd0d4..7bb8c1c03 100644 --- a/src/archive.c +++ b/src/archive.c @@ -148,7 +148,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa elog(ERROR, "getcwd() error"); /* verify that archive-push --instance parameter is valid */ - system_id = get_system_identifier(current_dir); + system_id = get_system_identifier(current_dir, FIO_DB_HOST); if (instance->pgdata == NULL) elog(ERROR, "Cannot read pg_probackup.conf for this instance"); diff --git a/src/backup.c b/src/backup.c index 688afefca..2d834410a 100644 --- a/src/backup.c +++ b/src/backup.c @@ -94,7 +94,6 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, { int i; char external_prefix[MAXPGPATH]; /* Temp value. Used as template */ - char dst_backup_path[MAXPGPATH]; char label[1024]; XLogRecPtr prev_backup_start_lsn = InvalidXLogRecPtr; @@ -137,7 +136,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, #if PG_VERSION_NUM >= 90600 current.tli = get_current_timeline(backup_conn); #else - current.tli = get_current_timeline_from_control(false); + current.tli = get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false); #endif /* @@ -258,17 +257,19 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, /* start stream replication */ if (current.stream) { - join_path_components(dst_backup_path, current.database_dir, PG_XLOG_DIR); - fio_mkdir(dst_backup_path, DIR_PERMISSION, FIO_BACKUP_HOST); + char stream_xlog_path[MAXPGPATH]; - start_WAL_streaming(backup_conn, dst_backup_path, &instance_config.conn_opt, + join_path_components(stream_xlog_path, current.database_dir, PG_XLOG_DIR); + fio_mkdir(stream_xlog_path, DIR_PERMISSION, FIO_BACKUP_HOST); + + start_WAL_streaming(backup_conn, stream_xlog_path, &instance_config.conn_opt, current.start_lsn, current.tli); /* Make sure that WAL streaming is working * PAGE backup in stream mode is waited twice, first for * segment in WAL archive and then for streamed segment */ - wait_wal_lsn(dst_backup_path, current.start_lsn, true, current.tli, false, true, ERROR, true); + wait_wal_lsn(stream_xlog_path, current.start_lsn, true, current.tli, false, true, ERROR, true); } /* initialize backup's file list */ @@ -315,23 +316,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, elog(ERROR, "PGDATA is almost empty. Either it was concurrently deleted or " "pg_probackup do not possess sufficient permissions to list PGDATA content"); - /* Calculate pgdata_bytes */ - for (i = 0; i < parray_num(backup_files_list); i++) - { - pgFile *file = (pgFile *) parray_get(backup_files_list, i); - - if (file->external_dir_num != 0) - continue; - - if (S_ISDIR(file->mode)) - { - current.pgdata_bytes += 4096; - continue; - } - - current.pgdata_bytes += file->size; - } - + current.pgdata_bytes += calculate_datasize_of_filelist(backup_files_list); pretty_size(current.pgdata_bytes, pretty_bytes, lengthof(pretty_bytes)); elog(INFO, "PGDATA size: %s", pretty_bytes); @@ -697,7 +682,7 @@ pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo) if (nodeInfo->is_superuser) elog(WARNING, "Current PostgreSQL role is superuser. " - "It is not recommended to run backup or checkdb as superuser."); + "It is not recommended to run pg_probackup under superuser."); strlcpy(current.server_version, nodeInfo->server_version_str, sizeof(current.server_version)); @@ -786,7 +771,7 @@ do_backup(InstanceState *instanceState, pgSetBackupParams *set_backup_params, // elog(WARNING, "ptrack_version_num %d", ptrack_version_num); if (nodeInfo.ptrack_version_num > 0) - nodeInfo.is_ptrack_enable = pg_ptrack_enable(backup_conn, nodeInfo.ptrack_version_num); + nodeInfo.is_ptrack_enabled = pg_is_ptrack_enabled(backup_conn, nodeInfo.ptrack_version_num); if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK) { @@ -795,7 +780,7 @@ do_backup(InstanceState *instanceState, pgSetBackupParams *set_backup_params, elog(ERROR, "This PostgreSQL instance does not support ptrack"); else { - if (!nodeInfo.is_ptrack_enable) + if (!nodeInfo.is_ptrack_enabled) elog(ERROR, "Ptrack is disabled"); } } @@ -953,12 +938,12 @@ check_server_version(PGconn *conn, PGNodeInfo *nodeInfo) * All system identifiers must be equal. */ void -check_system_identifiers(PGconn *conn, char *pgdata) +check_system_identifiers(PGconn *conn, const char *pgdata) { uint64 system_id_conn; uint64 system_id_pgdata; - system_id_pgdata = get_system_identifier(pgdata); + system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST); system_id_conn = get_remote_system_identifier(conn); /* for checkdb check only system_id_pgdata and system_id_conn */ @@ -1069,7 +1054,7 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup, * Switch to a new WAL segment. It should be called only for master. * For PG 9.5 it should be called only if pguser is superuser. */ -static void +void pg_switch_wal(PGconn *conn) { PGresult *res; @@ -2282,7 +2267,7 @@ process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno) } -static void +void check_external_for_tablespaces(parray *external_list, PGconn *backup_conn) { PGresult *res; @@ -2346,3 +2331,36 @@ check_external_for_tablespaces(parray *external_list, PGconn *backup_conn) } } } + +/* + * Calculate pgdata_bytes + * accepts (parray *) of (pgFile *) + */ +int64 +calculate_datasize_of_filelist(parray *filelist) +{ + int64 bytes = 0; + int i; + + /* parray_num don't check for NULL */ + if (filelist == NULL) + return 0; + + for (i = 0; i < parray_num(filelist); i++) + { + pgFile *file = (pgFile *) parray_get(filelist, i); + + if (file->external_dir_num != 0) + continue; + + if (S_ISDIR(file->mode)) + { + // TODO is a dir always 4K? + bytes += 4096; + continue; + } + + bytes += file->size; + } + return bytes; +} diff --git a/src/catalog.c b/src/catalog.c index f8af2b72e..9775968b8 100644 --- a/src/catalog.c +++ b/src/catalog.c @@ -2883,7 +2883,7 @@ pgNodeInit(PGNodeInfo *node) node->server_version_str[0] = '\0'; node->ptrack_version_num = 0; - node->is_ptrack_enable = false; + node->is_ptrack_enabled = false; node->ptrack_schema = NULL; } diff --git a/src/catchup.c b/src/catchup.c new file mode 100644 index 000000000..f80a0f0f9 --- /dev/null +++ b/src/catchup.c @@ -0,0 +1,1020 @@ +/*------------------------------------------------------------------------- + * + * catchup.c: sync DB cluster + * + * Copyright (c) 2021, Postgres Professional + * + *------------------------------------------------------------------------- + */ + +#include "pg_probackup.h" + +#if PG_VERSION_NUM < 110000 +#include "catalog/catalog.h" +#endif +#include "catalog/pg_tablespace.h" +#include "access/timeline.h" +#include "pgtar.h" +#include "streamutil.h" + +#include +#include +#include + +#include "utils/thread.h" +#include "utils/file.h" + +/* + * Catchup routines + */ +static PGconn *catchup_collect_info(PGNodeInfo *source_node_info, const char *source_pgdata, const char *dest_pgdata); +static void catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn, const char *source_pgdata, + const char *dest_pgdata); +static void catchup_check_tablespaces_existance_in_tbsmapping(PGconn *conn); +static parray* catchup_get_tli_history(ConnectionOptions *conn_opt, TimeLineID tli); + +//REVIEW The name of this function looks strange to me. +//Maybe catchup_init_state() or catchup_setup() will do better? +//I'd also suggest to wrap all these fields into some CatchupState, but it isn't urgent. +/* + * Prepare for work: fill some globals, open connection to source database + */ +static PGconn * +catchup_collect_info(PGNodeInfo *source_node_info, const char *source_pgdata, const char *dest_pgdata) +{ + PGconn *source_conn; + + /* Initialize PGInfonode */ + pgNodeInit(source_node_info); + + /* Get WAL segments size and system ID of source PG instance */ + instance_config.xlog_seg_size = get_xlog_seg_size(source_pgdata); + instance_config.system_identifier = get_system_identifier(source_pgdata, FIO_DB_HOST); + current.start_time = time(NULL); + + StrNCpy(current.program_version, PROGRAM_VERSION, sizeof(current.program_version)); + + /* Do some compatibility checks and fill basic info about PG instance */ + source_conn = pgdata_basic_setup(instance_config.conn_opt, source_node_info); + +#if PG_VERSION_NUM >= 110000 + if (!RetrieveWalSegSize(source_conn)) + elog(ERROR, "Failed to retrieve wal_segment_size"); +#endif + + get_ptrack_version(source_conn, source_node_info); + if (source_node_info->ptrack_version_num > 0) + source_node_info->is_ptrack_enabled = pg_is_ptrack_enabled(source_conn, source_node_info->ptrack_version_num); + + /* Obtain current timeline */ +#if PG_VERSION_NUM >= 90600 + current.tli = get_current_timeline(source_conn); +#else + instance_config.pgdata = source_pgdata; + current.tli = get_current_timeline_from_control(source_pgdata, FIO_DB_HOST, false); +#endif + + elog(INFO, "Catchup start, pg_probackup version: %s, " + "PostgreSQL version: %s, " + "remote: %s, source-pgdata: %s, destination-pgdata: %s", + PROGRAM_VERSION, source_node_info->server_version_str, + IsSshProtocol() ? "true" : "false", + source_pgdata, dest_pgdata); + + if (current.from_replica) + elog(INFO, "Running catchup from standby"); + + return source_conn; +} + +/* + * Check that catchup can be performed on source and dest + * this function is for checks, that can be performed without modification of data on disk + */ +static void +catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn, + const char *source_pgdata, const char *dest_pgdata) +{ + /* TODO + * gsmol - fallback to FULL mode if dest PGDATA is empty + * kulaginm -- I think this is a harmful feature. If user requested an incremental catchup, then + * he expects that this will be done quickly and efficiently. If, for example, he made a mistake + * with dest_dir, then he will receive a second full copy instead of an error message, and I think + * that in some cases he would prefer the error. + * I propose in future versions to offer a backup_mode auto, in which we will look to the dest_dir + * and decide which of the modes will be the most effective. + * I.e.: + * if(requested_backup_mode == BACKUP_MODE_DIFF_AUTO) + * { + * if(dest_pgdata_is_empty) + * backup_mode = BACKUP_MODE_FULL; + * else + * if(ptrack supported and applicable) + * backup_mode = BACKUP_MODE_DIFF_PTRACK; + * else + * backup_mode = BACKUP_MODE_DIFF_DELTA; + * } + */ + + if (dir_is_empty(dest_pgdata, FIO_LOCAL_HOST)) + { + if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK || + current.backup_mode == BACKUP_MODE_DIFF_DELTA) + elog(ERROR, "\"%s\" is empty, but incremental catchup mode requested.", + dest_pgdata); + } + else /* dest dir not empty */ + { + if (current.backup_mode == BACKUP_MODE_FULL) + elog(ERROR, "Can't perform full catchup into non-empty directory \"%s\".", + dest_pgdata); + } + + /* check that postmaster is not running in destination */ + if (current.backup_mode != BACKUP_MODE_FULL) + { + pid_t pid; + pid = fio_check_postmaster(dest_pgdata, FIO_LOCAL_HOST); + if (pid == 1) /* postmaster.pid is mangled */ + { + char pid_filename[MAXPGPATH]; + join_path_components(pid_filename, dest_pgdata, "postmaster.pid"); + elog(ERROR, "Pid file \"%s\" is mangled, cannot determine whether postmaster is running or not", + pid_filename); + } + else if (pid > 1) /* postmaster is up */ + { + elog(ERROR, "Postmaster with pid %u is running in destination directory \"%s\"", + pid, dest_pgdata); + } + } + + /* check backup_label absence in dest */ + if (current.backup_mode != BACKUP_MODE_FULL) + { + char backup_label_filename[MAXPGPATH]; + + join_path_components(backup_label_filename, dest_pgdata, PG_BACKUP_LABEL_FILE); + if (fio_access(backup_label_filename, F_OK, FIO_LOCAL_HOST) == 0) + elog(ERROR, "Destination directory contains \"" PG_BACKUP_LABEL_FILE "\" file"); + } + + /* check that destination database is shutdowned cleanly */ + if (current.backup_mode != BACKUP_MODE_FULL) + { + DBState state; + state = get_system_dbstate(dest_pgdata, FIO_LOCAL_HOST); + /* see states in postgres sources (src/include/catalog/pg_control.h) */ + if (state != DB_SHUTDOWNED && state != DB_SHUTDOWNED_IN_RECOVERY) + elog(ERROR, "Postmaster in destination directory \"%s\" must be stopped cleanly", + dest_pgdata); + } + + /* Check that connected PG instance, source and destination PGDATA are the same */ + { + uint64 source_conn_id, source_id, dest_id; + + source_conn_id = get_remote_system_identifier(source_conn); + source_id = get_system_identifier(source_pgdata, FIO_DB_HOST); /* same as instance_config.system_identifier */ + + if (source_conn_id != source_id) + elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu", + source_conn_id, source_pgdata, source_id); + + if (current.backup_mode != BACKUP_MODE_FULL) + { + dest_id = get_system_identifier(dest_pgdata, FIO_LOCAL_HOST); + if (source_conn_id != dest_id) + elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu", + source_conn_id, dest_pgdata, dest_id); + } + } + + /* check PTRACK version */ + if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK) + { + if (source_node_info->ptrack_version_num == 0) + elog(ERROR, "This PostgreSQL instance does not support ptrack"); + else if (source_node_info->ptrack_version_num < 200) + elog(ERROR, "ptrack extension is too old.\n" + "Upgrade ptrack to version >= 2"); + else if (!source_node_info->is_ptrack_enabled) + elog(ERROR, "Ptrack is disabled"); + } + + if (current.from_replica && exclusive_backup) + elog(ERROR, "Catchup from standby is only available for PostgreSQL >= 9.6"); + + /* check that we don't overwrite tablespace in source pgdata */ + catchup_check_tablespaces_existance_in_tbsmapping(source_conn); + + /* check timelines */ + if (current.backup_mode != BACKUP_MODE_FULL) + { + RedoParams dest_redo = { 0, InvalidXLogRecPtr, 0 }; + + /* fill dest_redo.lsn and dest_redo.tli */ + get_redo(dest_pgdata, FIO_LOCAL_HOST, &dest_redo); + + if (current.tli != 1) + { + parray *source_timelines; /* parray* of TimeLineHistoryEntry* */ + source_timelines = catchup_get_tli_history(&instance_config.conn_opt, current.tli); + + if (source_timelines == NULL) + elog(ERROR, "Cannot get source timeline history"); + + if (!satisfy_timeline(source_timelines, dest_redo.tli, dest_redo.lsn)) + elog(ERROR, "Destination is not in source timeline history"); + + parray_walk(source_timelines, pfree); + parray_free(source_timelines); + } + else /* special case -- no history files in source */ + { + if (dest_redo.tli != 1) + elog(ERROR, "Source is behind destination in timeline history"); + } + } +} + +/* + * Check that all tablespaces exists in tablespace mapping (--tablespace-mapping option) + * Check that all local mapped directories is empty if it is local FULL catchup + * Emit fatal error if that (not existent in map or not empty) tablespace found + */ +static void +catchup_check_tablespaces_existance_in_tbsmapping(PGconn *conn) +{ + PGresult *res; + int i; + char *tablespace_path = NULL; + const char *linked_path = NULL; + char *query = "SELECT pg_catalog.pg_tablespace_location(oid) " + "FROM pg_catalog.pg_tablespace " + "WHERE pg_catalog.pg_tablespace_location(oid) <> '';"; + + res = pgut_execute(conn, query, 0, NULL); + + if (!res) + elog(ERROR, "Failed to get list of tablespaces"); + + for (i = 0; i < res->ntups; i++) + { + tablespace_path = PQgetvalue(res, i, 0); + Assert (strlen(tablespace_path) > 0); + + canonicalize_path(tablespace_path); + linked_path = get_tablespace_mapping(tablespace_path); + + if (strcmp(tablespace_path, linked_path) == 0) + /* same result -> not found in mapping */ + { + if (!fio_is_remote(FIO_DB_HOST)) + elog(ERROR, "Local catchup executed, but source database contains " + "tablespace (\"%s\"), that is not listed in the map", tablespace_path); + else + elog(WARNING, "Remote catchup executed and source database contains " + "tablespace (\"%s\"), that is not listed in the map", tablespace_path); + } + + if (!is_absolute_path(linked_path)) + elog(ERROR, "Tablespace directory path must be an absolute path: \"%s\"", + linked_path); + + if (current.backup_mode == BACKUP_MODE_FULL + && !dir_is_empty(linked_path, FIO_LOCAL_HOST)) + elog(ERROR, "Target mapped tablespace directory (\"%s\") is not empty in FULL catchup", + linked_path); + } + PQclear(res); +} + +/* + * Get timeline history via replication connection + * returns parray* of TimeLineHistoryEntry* + */ +static parray* +catchup_get_tli_history(ConnectionOptions *conn_opt, TimeLineID tli) +{ + PGresult *res; + PGconn *conn; + char *history; + char query[128]; + parray *result = NULL; + + snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", tli); + + /* + * Connect in replication mode to the server. + */ + conn = pgut_connect_replication(conn_opt->pghost, + conn_opt->pgport, + conn_opt->pgdatabase, + conn_opt->pguser, + false); + + if (!conn) + return NULL; + + res = PQexec(conn, query); + PQfinish(conn); + + if (PQresultStatus(res) != PGRES_TUPLES_OK) + { + elog(WARNING, "Could not send replication command \"%s\": %s", + query, PQresultErrorMessage(res)); + PQclear(res); + return NULL; + } + + /* + * The response to TIMELINE_HISTORY is a single row result set + * with two fields: filename and content + */ + if (PQnfields(res) != 2 || PQntuples(res) != 1) + { + elog(ERROR, "Unexpected response to TIMELINE_HISTORY command: " + "got %d rows and %d fields, expected %d rows and %d fields", + PQntuples(res), PQnfields(res), 1, 2); + PQclear(res); + return NULL; + } + + history = pgut_strdup(PQgetvalue(res, 0, 1)); + result = parse_tli_history_buffer(history, tli); + + /* some cleanup */ + pg_free(history); + PQclear(res); + + return result; +} + +/* + * catchup multithreaded copy rountine and helper structure and function + */ + +/* parameters for catchup_thread_runner() passed from catchup_multithreaded_copy() */ +typedef struct +{ + PGNodeInfo *nodeInfo; + const char *from_root; + const char *to_root; + parray *source_filelist; + parray *dest_filelist; + XLogRecPtr sync_lsn; + BackupMode backup_mode; + int thread_num; + bool completed; +} catchup_thread_runner_arg; + +/* Catchup file copier executed in separate thread */ +static void * +catchup_thread_runner(void *arg) +{ + int i; + char from_fullpath[MAXPGPATH]; + char to_fullpath[MAXPGPATH]; + + catchup_thread_runner_arg *arguments = (catchup_thread_runner_arg *) arg; + int n_files = parray_num(arguments->source_filelist); + + /* catchup a file */ + for (i = 0; i < n_files; i++) + { + pgFile *file = (pgFile *) parray_get(arguments->source_filelist, i); + pgFile *dest_file = NULL; + + /* We have already copied all directories */ + if (S_ISDIR(file->mode)) + continue; + + if (!pg_atomic_test_set_flag(&file->lock)) + continue; + + /* check for interrupt */ + if (interrupted || thread_interrupted) + elog(ERROR, "Interrupted during catchup"); + + if (progress) + elog(INFO, "Progress: (%d/%d). Process file \"%s\"", + i + 1, n_files, file->rel_path); + + /* construct destination filepath */ + Assert(file->external_dir_num == 0); + join_path_components(from_fullpath, arguments->from_root, file->rel_path); + join_path_components(to_fullpath, arguments->to_root, file->rel_path); + + /* Encountered some strange beast */ + if (!S_ISREG(file->mode)) + elog(WARNING, "Unexpected type %d of file \"%s\", skipping", + file->mode, from_fullpath); + + /* Check that file exist in dest pgdata */ + if (arguments->backup_mode != BACKUP_MODE_FULL) + { + pgFile **dest_file_tmp = NULL; + dest_file_tmp = (pgFile **) parray_bsearch(arguments->dest_filelist, + file, pgFileCompareRelPathWithExternal); + if (dest_file_tmp) + { + /* File exists in destination PGDATA */ + file->exists_in_prev = true; + dest_file = *dest_file_tmp; + } + } + + /* Do actual work */ + if (file->is_datafile && !file->is_cfs) + { + catchup_data_file(file, from_fullpath, to_fullpath, + arguments->sync_lsn, + arguments->backup_mode, + NONE_COMPRESS, + 0, + arguments->nodeInfo->checksum_version, + arguments->nodeInfo->ptrack_version_num, + arguments->nodeInfo->ptrack_schema, + false, + dest_file != NULL ? dest_file->size : 0); + } + else + { + backup_non_data_file(file, dest_file, from_fullpath, to_fullpath, + arguments->backup_mode, current.parent_backup, true); + } + + if (file->write_size == FILE_NOT_FOUND) + continue; + + if (file->write_size == BYTES_INVALID) + { + elog(VERBOSE, "Skipping the unchanged file: \"%s\", read %li bytes", from_fullpath, file->read_size); + continue; + } + + elog(VERBOSE, "File \"%s\". Copied "INT64_FORMAT " bytes", + from_fullpath, file->write_size); + } + + /* ssh connection to longer needed */ + fio_disconnect(); + + /* Data files transferring is successful */ + arguments->completed = true; + + return NULL; +} + +/* + * main multithreaded copier + */ +static bool +catchup_multithreaded_copy(int num_threads, + PGNodeInfo *source_node_info, + const char *source_pgdata_path, + const char *dest_pgdata_path, + parray *source_filelist, + parray *dest_filelist, + XLogRecPtr sync_lsn, + BackupMode backup_mode) +{ + /* arrays with meta info for multi threaded catchup */ + catchup_thread_runner_arg *threads_args; + pthread_t *threads; + + bool all_threads_successful = true; + int i; + + /* init thread args */ + threads_args = (catchup_thread_runner_arg *) palloc(sizeof(catchup_thread_runner_arg) * num_threads); + for (i = 0; i < num_threads; i++) + threads_args[i] = (catchup_thread_runner_arg){ + .nodeInfo = source_node_info, + .from_root = source_pgdata_path, + .to_root = dest_pgdata_path, + .source_filelist = source_filelist, + .dest_filelist = dest_filelist, + .sync_lsn = sync_lsn, + .backup_mode = backup_mode, + .thread_num = i + 1, + .completed = false, + }; + + /* Run threads */ + thread_interrupted = false; + threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads); + for (i = 0; i < num_threads; i++) + { + elog(VERBOSE, "Start thread num: %i", i); + pthread_create(&threads[i], NULL, &catchup_thread_runner, &(threads_args[i])); + } + + /* Wait threads */ + for (i = 0; i < num_threads; i++) + { + pthread_join(threads[i], NULL); + all_threads_successful &= threads_args[i].completed; + } + + free(threads); + free(threads_args); + return all_threads_successful; +} + +/* + * + */ +static void +catchup_sync_destination_files(const char* pgdata_path, fio_location location, parray *filelist, pgFile *pg_control_file) +{ + char fullpath[MAXPGPATH]; + time_t start_time, end_time; + char pretty_time[20]; + int i; + + elog(INFO, "Syncing copied files to disk"); + time(&start_time); + + for (i = 0; i < parray_num(filelist); i++) + { + pgFile *file = (pgFile *) parray_get(filelist, i); + + /* TODO: sync directory ? */ + if (S_ISDIR(file->mode)) + continue; + + Assert(file->external_dir_num == 0); + join_path_components(fullpath, pgdata_path, file->rel_path); + if (fio_sync(fullpath, location) != 0) + elog(ERROR, "Cannot sync file \"%s\": %s", fullpath, strerror(errno)); + } + + /* + * sync pg_control file + */ + join_path_components(fullpath, pgdata_path, pg_control_file->rel_path); + if (fio_sync(fullpath, location) != 0) + elog(ERROR, "Cannot sync file \"%s\": %s", fullpath, strerror(errno)); + + time(&end_time); + pretty_time_interval(difftime(end_time, start_time), + pretty_time, lengthof(pretty_time)); + elog(INFO, "Files are synced, time elapsed: %s", pretty_time); +} + +/* + * Entry point of pg_probackup CATCHUP subcommand. + */ +int +do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads, bool sync_dest_files) +{ + PGconn *source_conn = NULL; + PGNodeInfo source_node_info; + bool backup_logs = false; + parray *source_filelist = NULL; + pgFile *source_pg_control_file = NULL; + parray *dest_filelist = NULL; + char dest_xlog_path[MAXPGPATH]; + + RedoParams dest_redo = { 0, InvalidXLogRecPtr, 0 }; + PGStopBackupResult stop_backup_result; + bool catchup_isok = true; + + int i; + + /* for fancy reporting */ + time_t start_time, end_time; + char pretty_time[20]; + char pretty_bytes[20]; + + source_conn = catchup_collect_info(&source_node_info, source_pgdata, dest_pgdata); + catchup_preflight_checks(&source_node_info, source_conn, source_pgdata, dest_pgdata); + + elog(LOG, "Database catchup start"); + + { + char label[1024]; + /* notify start of backup to PostgreSQL server */ + time2iso(label, lengthof(label), current.start_time, false); + strncat(label, " with pg_probackup", lengthof(label) - + strlen(" with pg_probackup")); + + /* Call pg_start_backup function in PostgreSQL connect */ + pg_start_backup(label, smooth_checkpoint, ¤t, &source_node_info, source_conn); + elog(LOG, "pg_start_backup START LSN %X/%X", (uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn)); + } + + //REVIEW I wonder, if we can move this piece above and call before pg_start backup()? + //It seems to be a part of setup phase. + if (current.backup_mode != BACKUP_MODE_FULL) + { + dest_filelist = parray_new(); + dir_list_file(dest_filelist, dest_pgdata, + true, true, false, backup_logs, true, 0, FIO_LOCAL_HOST); + + // fill dest_redo.lsn and dest_redo.tli + get_redo(dest_pgdata, FIO_LOCAL_HOST, &dest_redo); + elog(INFO, "syncLSN = %X/%X", (uint32) (dest_redo.lsn >> 32), (uint32) dest_redo.lsn); + + /* + * Future improvement to catch partial catchup: + * 1. rename dest pg_control into something like pg_control.pbk + * (so user can't start partial catchup'ed instance from this point) + * 2. try to read by get_redo() pg_control and pg_control.pbk (to detect partial catchup) + * 3. at the end (after copy of correct pg_control), remove pg_control.pbk + */ + } + + //REVIEW I wonder, if we can move this piece above and call before pg_start backup()? + //It seems to be a part of setup phase. + /* + * TODO: move to separate function to use in both backup.c and catchup.c + */ + if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK) + { + XLogRecPtr ptrack_lsn = get_last_ptrack_lsn(source_conn, &source_node_info); + + // new ptrack is more robust and checks Start LSN + if (ptrack_lsn > dest_redo.lsn || ptrack_lsn == InvalidXLogRecPtr) + elog(ERROR, "LSN from ptrack_control in source %X/%X is greater than checkpoint LSN in destination %X/%X.\n" + "You can perform only FULL catchup.", + (uint32) (ptrack_lsn >> 32), (uint32) (ptrack_lsn), + (uint32) (dest_redo.lsn >> 32), + (uint32) (dest_redo.lsn)); + } + + /* Check that dest_redo.lsn is less than current.start_lsn */ + if (current.backup_mode != BACKUP_MODE_FULL && + dest_redo.lsn > current.start_lsn) + elog(ERROR, "Current START LSN %X/%X is lower than SYNC LSN %X/%X, " + "it may indicate that we are trying to catchup with PostgreSQL instance from the past", + (uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn), + (uint32) (dest_redo.lsn >> 32), (uint32) (dest_redo.lsn)); + + /* Start stream replication */ + join_path_components(dest_xlog_path, dest_pgdata, PG_XLOG_DIR); + fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST); + start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt, + current.start_lsn, current.tli); + + source_filelist = parray_new(); + + /* list files with the logical path. omit $PGDATA */ + if (fio_is_remote(FIO_DB_HOST)) + fio_list_dir(source_filelist, source_pgdata, + true, true, false, backup_logs, true, 0); + else + dir_list_file(source_filelist, source_pgdata, + true, true, false, backup_logs, true, 0, FIO_LOCAL_HOST); + + //REVIEW FIXME. Let's fix that before release. + // TODO filter pg_xlog/wal? + // TODO what if wal is not a dir (symlink to a dir)? + + /* close ssh session in main thread */ + fio_disconnect(); + + //REVIEW Do we want to do similar calculation for dest? + current.pgdata_bytes += calculate_datasize_of_filelist(source_filelist); + pretty_size(current.pgdata_bytes, pretty_bytes, lengthof(pretty_bytes)); + elog(INFO, "Source PGDATA size: %s", pretty_bytes); + + /* + * Sort pathname ascending. It is necessary to create intermediate + * directories sequentially. + * + * For example: + * 1 - create 'base' + * 2 - create 'base/1' + * + * Sorted array is used at least in parse_filelist_filenames(), + * extractPageMap(), make_pagemap_from_ptrack(). + */ + parray_qsort(source_filelist, pgFileCompareRelPathWithExternal); + + /* Extract information about files in source_filelist parsing their names:*/ + parse_filelist_filenames(source_filelist, source_pgdata); + + elog(LOG, "Start LSN (source): %X/%X, TLI: %X", + (uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn), + current.tli); + if (current.backup_mode != BACKUP_MODE_FULL) + elog(LOG, "LSN in destination: %X/%X, TLI: %X", + (uint32) (dest_redo.lsn >> 32), (uint32) (dest_redo.lsn), + dest_redo.tli); + + /* Build page mapping in PTRACK mode */ + if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK) + { + time(&start_time); + elog(INFO, "Extracting pagemap of changed blocks"); + + /* Build the page map from ptrack information */ + make_pagemap_from_ptrack_2(source_filelist, source_conn, + source_node_info.ptrack_schema, + source_node_info.ptrack_version_num, + dest_redo.lsn); + time(&end_time); + elog(INFO, "Pagemap successfully extracted, time elapsed: %.0f sec", + difftime(end_time, start_time)); + } + + /* + * Make directories before catchup + */ + /* + * We iterate over source_filelist and for every directory with parent 'pg_tblspc' + * we must lookup this directory name in tablespace map. + * If we got a match, we treat this directory as tablespace. + * It means that we create directory specified in tablespace_map and + * original directory created as symlink to it. + */ + for (i = 0; i < parray_num(source_filelist); i++) + { + pgFile *file = (pgFile *) parray_get(source_filelist, i); + char parent_dir[MAXPGPATH]; + + if (!S_ISDIR(file->mode)) + continue; + + /* + * check if it is fake "directory" and is a tablespace link + * this is because we passed the follow_symlink when building the list + */ + /* get parent dir of rel_path */ + strncpy(parent_dir, file->rel_path, MAXPGPATH); + get_parent_directory(parent_dir); + + /* check if directory is actually link to tablespace */ + if (strcmp(parent_dir, PG_TBLSPC_DIR) != 0) + { + /* if the entry is a regular directory, create it in the destination */ + char dirpath[MAXPGPATH]; + + join_path_components(dirpath, dest_pgdata, file->rel_path); + + elog(VERBOSE, "Create directory '%s'", dirpath); + fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST); + } + else + { + /* this directory located in pg_tblspc */ + const char *linked_path = NULL; + char to_path[MAXPGPATH]; + + // TODO perform additional check that this is actually symlink? + { /* get full symlink path and map this path to new location */ + char source_full_path[MAXPGPATH]; + char symlink_content[MAXPGPATH]; + join_path_components(source_full_path, source_pgdata, file->rel_path); + fio_readlink(source_full_path, symlink_content, sizeof(symlink_content), FIO_DB_HOST); + /* we checked that mapping exists in preflight_checks for local catchup */ + linked_path = get_tablespace_mapping(symlink_content); + elog(INFO, "Map tablespace full_path: \"%s\" old_symlink_content: \"%s\" new_symlink_content: \"%s\"\n", + source_full_path, + symlink_content, + linked_path); + } + + if (!is_absolute_path(linked_path)) + elog(ERROR, "Tablespace directory path must be an absolute path: %s\n", + linked_path); + + join_path_components(to_path, dest_pgdata, file->rel_path); + + elog(VERBOSE, "Create directory \"%s\" and symbolic link \"%s\"", + linked_path, to_path); + + /* create tablespace directory */ + if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0) + elog(ERROR, "Could not create tablespace directory \"%s\": %s", + linked_path, strerror(errno)); + + /* create link to linked_path */ + if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0) + elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s", + linked_path, to_path, strerror(errno)); + } + } + + /* + * find pg_control file (in already sorted source_filelist) + * and exclude it from list for future special processing + */ + { + int control_file_elem_index; + pgFile search_key; + MemSet(&search_key, 0, sizeof(pgFile)); + /* pgFileCompareRelPathWithExternal uses only .rel_path and .external_dir_num for comparision */ + search_key.rel_path = XLOG_CONTROL_FILE; + search_key.external_dir_num = 0; + control_file_elem_index = parray_bsearch_index(source_filelist, &search_key, pgFileCompareRelPathWithExternal); + if(control_file_elem_index < 0) + elog(ERROR, "\"%s\" not found in \"%s\"\n", XLOG_CONTROL_FILE, source_pgdata); + source_pg_control_file = parray_remove(source_filelist, control_file_elem_index); + } + + /* + * remove absent source files in dest (dropped tables, etc...) + * note: global/pg_control will also be deleted here + */ + if (current.backup_mode != BACKUP_MODE_FULL) + { + elog(INFO, "Removing redundant files in destination directory"); + parray_qsort(dest_filelist, pgFileCompareRelPathWithExternalDesc); + for (i = 0; i < parray_num(dest_filelist); i++) + { + bool redundant = true; + pgFile *file = (pgFile *) parray_get(dest_filelist, i); + + //TODO optimize it and use some merge-like algorithm + //instead of bsearch for each file. + if (parray_bsearch(source_filelist, file, pgFileCompareRelPathWithExternal)) + redundant = false; + + /* pg_filenode.map are always restored, because it's crc cannot be trusted */ + Assert(file->external_dir_num == 0); + if (pg_strcasecmp(file->name, RELMAPPER_FILENAME) == 0) + redundant = true; + + //REVIEW This check seems unneded. Anyway we delete only redundant stuff below. + /* do not delete the useful internal directories */ + if (S_ISDIR(file->mode) && !redundant) + continue; + + /* if file does not exists in destination list, then we can safely unlink it */ + if (redundant) + { + char fullpath[MAXPGPATH]; + + join_path_components(fullpath, dest_pgdata, file->rel_path); + + fio_delete(file->mode, fullpath, FIO_DB_HOST); + elog(VERBOSE, "Deleted file \"%s\"", fullpath); + + /* shrink pgdata list */ + pgFileFree(file); + parray_remove(dest_filelist, i); + i--; + } + } + } + + /* clear file locks */ + pfilearray_clear_locks(source_filelist); + + /* Sort by size for load balancing */ + parray_qsort(source_filelist, pgFileCompareSizeDesc); + + /* Sort the array for binary search */ + if (dest_filelist) + parray_qsort(dest_filelist, pgFileCompareRelPathWithExternal); + + /* run copy threads */ + elog(INFO, "Start transferring data files"); + time(&start_time); + catchup_isok = catchup_multithreaded_copy(num_threads, &source_node_info, + source_pgdata, dest_pgdata, + source_filelist, dest_filelist, + dest_redo.lsn, current.backup_mode); + + /* at last copy control file */ + if (catchup_isok) + { + char from_fullpath[MAXPGPATH]; + char to_fullpath[MAXPGPATH]; + join_path_components(from_fullpath, source_pgdata, source_pg_control_file->rel_path); + join_path_components(to_fullpath, dest_pgdata, source_pg_control_file->rel_path); + copy_pgcontrol_file(from_fullpath, FIO_DB_HOST, + to_fullpath, FIO_LOCAL_HOST, source_pg_control_file); + } + + time(&end_time); + pretty_time_interval(difftime(end_time, start_time), + pretty_time, lengthof(pretty_time)); + if (catchup_isok) + elog(INFO, "Data files are transferred, time elapsed: %s", + pretty_time); + else + elog(ERROR, "Data files transferring failed, time elapsed: %s", + pretty_time); + + /* Notify end of backup */ + { + //REVIEW Is it relevant to catchup? I suppose it isn't, since catchup is a new code. + //If we do need it, please write a comment explaining that. + /* kludge against some old bug in archive_timeout. TODO: remove in 3.0.0 */ + int timeout = (instance_config.archive_timeout > 0) ? + instance_config.archive_timeout : ARCHIVE_TIMEOUT_DEFAULT; + char *stop_backup_query_text = NULL; + + pg_silent_client_messages(source_conn); + + //REVIEW. Do we want to support pg 9.5? I suppose we never test it... + //Maybe check it and error out early? + /* Create restore point + * Only if backup is from master. + * For PG 9.5 create restore point only if pguser is superuser. + */ + if (!current.from_replica && + !(source_node_info.server_version < 90600 && + !source_node_info.is_superuser)) //TODO: check correctness + pg_create_restore_point(source_conn, current.start_time); + + /* Execute pg_stop_backup using PostgreSQL connection */ + pg_stop_backup_send(source_conn, source_node_info.server_version, current.from_replica, exclusive_backup, &stop_backup_query_text); + + /* + * Wait for the result of pg_stop_backup(), but no longer than + * archive_timeout seconds + */ + pg_stop_backup_consume(source_conn, source_node_info.server_version, exclusive_backup, timeout, stop_backup_query_text, &stop_backup_result); + + /* Cleanup */ + pg_free(stop_backup_query_text); + } + + wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, ¤t); + +#if PG_VERSION_NUM >= 90600 + /* Write backup_label */ + Assert(stop_backup_result.backup_label_content != NULL); + pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label", + stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len, + NULL); + free(stop_backup_result.backup_label_content); + stop_backup_result.backup_label_content = NULL; + stop_backup_result.backup_label_content_len = 0; + + /* tablespace_map */ + if (stop_backup_result.tablespace_map_content != NULL) + { + // TODO what if tablespace is created during catchup? + /* Because we have already created symlinks in pg_tblspc earlier, + * we do not need to write the tablespace_map file. + * So this call is unnecessary: + * pg_stop_backup_write_file_helper(dest_pgdata, PG_TABLESPACE_MAP_FILE, "tablespace map", + * stop_backup_result.tablespace_map_content, stop_backup_result.tablespace_map_content_len, + * NULL); + */ + free(stop_backup_result.tablespace_map_content); + stop_backup_result.tablespace_map_content = NULL; + stop_backup_result.tablespace_map_content_len = 0; + } +#endif + + if(wait_WAL_streaming_end(NULL)) + elog(ERROR, "WAL streaming failed"); + + //REVIEW Please add a comment about these lsns. It is a crutial part of the algorithm. + current.recovery_xid = stop_backup_result.snapshot_xid; + + elog(LOG, "Getting the Recovery Time from WAL"); + + /* iterate over WAL from stop_backup lsn to start_backup lsn */ + if (!read_recovery_info(dest_xlog_path, current.tli, + instance_config.xlog_seg_size, + current.start_lsn, current.stop_lsn, + ¤t.recovery_time)) + { + elog(LOG, "Failed to find Recovery Time in WAL, forced to trust current_timestamp"); + current.recovery_time = stop_backup_result.invocation_time; + } + + /* + * In case of backup from replica >= 9.6 we must fix minRecPoint + */ + if (current.from_replica && !exclusive_backup) + { + set_min_recovery_point(source_pg_control_file, dest_pgdata, current.stop_lsn); + } + + /* close ssh session in main thread */ + fio_disconnect(); + + /* Sync all copied files unless '--no-sync' flag is used */ + if (catchup_isok) + { + if (sync_dest_files) + catchup_sync_destination_files(dest_pgdata, FIO_LOCAL_HOST, source_filelist, source_pg_control_file); + else + elog(WARNING, "Files are not synced to disk"); + } + + /* Cleanup */ + if (dest_filelist) + { + parray_walk(dest_filelist, pgFileFree); + parray_free(dest_filelist); + } + parray_walk(source_filelist, pgFileFree); + parray_free(source_filelist); + pgFileFree(source_pg_control_file); + + //REVIEW: Are we going to do that before release? + /* TODO: show the amount of transfered data in bytes and calculate incremental ratio */ + + return 0; +} diff --git a/src/data.c b/src/data.c index 314490585..49b696059 100644 --- a/src/data.c +++ b/src/data.c @@ -268,7 +268,7 @@ get_checksum_errormsg(Page page, char **errormsg, BlockNumber absolute_blkno) * PageIsOk(0) if page was successfully retrieved * PageIsTruncated(-1) if the page was truncated * SkipCurrentPage(-2) if we need to skip this page, - * only used for DELTA backup + * only used for DELTA and PTRACK backup * PageIsCorrupted(-3) if the page checksum mismatch * or header corruption, * only used for checkdb @@ -400,7 +400,12 @@ prepare_page(pgFile *file, XLogRecPtr prev_backup_start_lsn, page_st->lsn > 0 && page_st->lsn < prev_backup_start_lsn) { - elog(VERBOSE, "Skipping blknum %u in file: \"%s\"", blknum, from_fullpath); + elog(VERBOSE, "Skipping blknum %u in file: \"%s\", file->exists_in_prev: %s, page_st->lsn: %X/%X, prev_backup_start_lsn: %X/%X", + blknum, from_fullpath, + file->exists_in_prev ? "true" : "false", + (uint32) (page_st->lsn >> 32), (uint32) page_st->lsn, + (uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn + ); return SkipCurrentPage; } @@ -458,6 +463,23 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum, return compressed_size; } +/* взята из compress_and_backup_page, но выпилена вся магия заголовков и компрессии, просто копирование 1-в-1 */ +static int +copy_page(pgFile *file, BlockNumber blknum, + FILE *in, FILE *out, Page page, + const char *to_fullpath) +{ + /* write data page */ + if (fio_fwrite(out, page, BLCKSZ) != BLCKSZ) + elog(ERROR, "File: \"%s\", cannot write at block %u: %s", + to_fullpath, blknum, strerror(errno)); + + file->write_size += BLCKSZ; + file->uncompressed_size += BLCKSZ; + + return BLCKSZ; +} + /* * Backup data file in the from_root directory to the to_root directory with * same relative path. If prev_backup_start_lsn is not NULL, only pages with @@ -623,6 +645,169 @@ backup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpat pg_free(headers); } +/* + * Backup data file in the from_root directory to the to_root directory with + * same relative path. If prev_backup_start_lsn is not NULL, only pages with + * higher lsn will be copied. + * Not just copy file, but read it block by block (use bitmap in case of + * incremental backup), validate checksum, optionally compress and write to + * backup with special header. + */ +void +catchup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath, + XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode, + CompressAlg calg, int clevel, uint32 checksum_version, + int ptrack_version_num, const char *ptrack_schema, + bool is_merge, size_t prev_size) +{ + int rc; + bool use_pagemap; + char *errmsg = NULL; + BlockNumber err_blknum = 0; + /* page headers */ + BackupPageHeader2 *headers = NULL; + + /* sanity */ + if (file->size % BLCKSZ != 0) + elog(WARNING, "File: \"%s\", invalid file size %zu", from_fullpath, file->size); + + /* + * Compute expected number of blocks in the file. + * NOTE This is a normal situation, if the file size has changed + * since the moment we computed it. + */ + file->n_blocks = file->size/BLCKSZ; + + /* + * Skip unchanged file only if it exists in previous backup. + * This way we can correctly handle null-sized files which are + * not tracked by pagemap and thus always marked as unchanged. + */ + if (backup_mode == BACKUP_MODE_DIFF_PTRACK && + file->pagemap.bitmapsize == PageBitmapIsEmpty && + file->exists_in_prev && file->size == prev_size && !file->pagemap_isabsent) + { + /* + * There are no changed blocks since last backup. We want to make + * incremental backup, so we should exit. + */ + file->write_size = BYTES_INVALID; + return; + } + + /* reset size summary */ + file->read_size = 0; + file->write_size = 0; + file->uncompressed_size = 0; + INIT_FILE_CRC32(true, file->crc); + + /* + * Read each page, verify checksum and write it to backup. + * If page map is empty or file is not present in previous backup + * backup all pages of the relation. + * + * In PTRACK 1.x there was a problem + * of data files with missing _ptrack map. + * Such files should be fully copied. + */ + + if (file->pagemap.bitmapsize == PageBitmapIsEmpty || + file->pagemap_isabsent || !file->exists_in_prev || + !file->pagemap.bitmap) + use_pagemap = false; + else + use_pagemap = true; + + if (use_pagemap) + elog(VERBOSE, "Using pagemap for file \"%s\"", file->rel_path); + + /* Remote mode */ + if (fio_is_remote(FIO_DB_HOST)) + { + rc = fio_copy_pages(to_fullpath, from_fullpath, file, + /* send prev backup START_LSN */ + (backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) && + file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr, + calg, clevel, checksum_version, + /* send pagemap if any */ + use_pagemap, + /* variables for error reporting */ + &err_blknum, &errmsg, &headers); + } + else + { + /* TODO: stop handling errors internally */ + rc = copy_pages(to_fullpath, from_fullpath, file, + /* send prev backup START_LSN */ + (backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) && + file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr, + checksum_version, use_pagemap, + backup_mode, ptrack_version_num, ptrack_schema); + } + + /* check for errors */ + if (rc == FILE_MISSING) + { + elog(is_merge ? ERROR : LOG, "File not found: \"%s\"", from_fullpath); + file->write_size = FILE_NOT_FOUND; + goto cleanup; + } + + else if (rc == WRITE_FAILED) + elog(ERROR, "Cannot write block %u of \"%s\": %s", + err_blknum, to_fullpath, strerror(errno)); + + else if (rc == PAGE_CORRUPTION) + { + if (errmsg) + elog(ERROR, "Corruption detected in file \"%s\", block %u: %s", + from_fullpath, err_blknum, errmsg); + else + elog(ERROR, "Corruption detected in file \"%s\", block %u", + from_fullpath, err_blknum); + } + /* OPEN_FAILED and READ_FAILED */ + else if (rc == OPEN_FAILED) + { + if (errmsg) + elog(ERROR, "%s", errmsg); + else + elog(ERROR, "Cannot open file \"%s\"", from_fullpath); + } + else if (rc == READ_FAILED) + { + if (errmsg) + elog(ERROR, "%s", errmsg); + else + elog(ERROR, "Cannot read file \"%s\"", from_fullpath); + } + + file->read_size = rc * BLCKSZ; + + /* refresh n_blocks for FULL and DELTA */ + if (backup_mode == BACKUP_MODE_FULL || + backup_mode == BACKUP_MODE_DIFF_DELTA) + file->n_blocks = file->read_size / BLCKSZ; + + /* Determine that file didn`t changed in case of incremental catchup */ + if (backup_mode != BACKUP_MODE_FULL && + file->exists_in_prev && + file->write_size == 0 && + file->n_blocks > 0) + { + file->write_size = BYTES_INVALID; + } + +cleanup: + + /* finish CRC calculation */ + FIN_FILE_CRC32(true, file->crc); + + pg_free(errmsg); + pg_free(file->pagemap.bitmap); + pg_free(headers); +} + /* * Backup non data file * We do not apply compression to this file. @@ -1992,6 +2177,7 @@ send_pages(const char *to_fullpath, const char *from_fullpath, true, checksum_version, ptrack_version_num, ptrack_schema, from_fullpath, &page_st); + if (rc == PageIsTruncated) break; @@ -2068,6 +2254,130 @@ send_pages(const char *to_fullpath, const char *from_fullpath, return n_blocks_read; } +/* copy local file (взята из send_pages, но используется простое копирование странички, без добавления заголовков и компрессии) */ +int +copy_pages(const char *to_fullpath, const char *from_fullpath, + pgFile *file, XLogRecPtr sync_lsn, + uint32 checksum_version, bool use_pagemap, + BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema) +{ + FILE *in = NULL; + FILE *out = NULL; + char curr_page[BLCKSZ]; + int n_blocks_read = 0; + BlockNumber blknum = 0; + datapagemap_iterator_t *iter = NULL; + + /* stdio buffers */ + char *in_buf = NULL; + char *out_buf = NULL; + + /* open source file for read */ + in = fopen(from_fullpath, PG_BINARY_R); + if (in == NULL) + { + /* + * If file is not found, this is not en error. + * It could have been deleted by concurrent postgres transaction. + */ + if (errno == ENOENT) + return FILE_MISSING; + + elog(ERROR, "Cannot open file \"%s\": %s", from_fullpath, strerror(errno)); + } + + /* + * Enable stdio buffering for local input file, + * unless the pagemap is involved, which + * imply a lot of random access. + */ + + if (use_pagemap) + { + iter = datapagemap_iterate(&file->pagemap); + datapagemap_next(iter, &blknum); /* set first block */ + + setvbuf(in, NULL, _IONBF, BUFSIZ); + } + else + { + in_buf = pgut_malloc(STDIO_BUFSIZE); + setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE); + } + + out = fio_fopen(to_fullpath, PG_BINARY_R "+", FIO_BACKUP_HOST); + if (out == NULL) + elog(ERROR, "Cannot open destination file \"%s\": %s", + to_fullpath, strerror(errno)); + + /* update file permission */ + if (fio_chmod(to_fullpath, file->mode, FIO_BACKUP_HOST) == -1) + elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath, + strerror(errno)); + + elog(VERBOSE, "ftruncate file \"%s\" to size %lu", + to_fullpath, file->size); + if (fio_ftruncate(out, file->size) == -1) + elog(ERROR, "Cannot ftruncate file \"%s\" to size %lu: %s", + to_fullpath, file->size, strerror(errno)); + + if (!fio_is_remote_file(out)) + { + out_buf = pgut_malloc(STDIO_BUFSIZE); + setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE); + } + + while (blknum < file->n_blocks) + { + PageState page_st; + int rc = prepare_page(file, sync_lsn, + blknum, in, backup_mode, curr_page, + true, checksum_version, + ptrack_version_num, ptrack_schema, + from_fullpath, &page_st); + if (rc == PageIsTruncated) + break; + + else if (rc == PageIsOk) + { + if (fio_fseek(out, blknum * BLCKSZ) < 0) + { + elog(ERROR, "Cannot seek block %u of \"%s\": %s", + blknum, to_fullpath, strerror(errno)); + } + copy_page(file, blknum, in, out, curr_page, to_fullpath); + } + + n_blocks_read++; + + /* next block */ + if (use_pagemap) + { + /* exit if pagemap is exhausted */ + if (!datapagemap_next(iter, &blknum)) + break; + } + else + blknum++; + } + + /* cleanup */ + if (in && fclose(in)) + elog(ERROR, "Cannot close the source file \"%s\": %s", + to_fullpath, strerror(errno)); + + /* close local output file */ + if (out && fio_fclose(out)) + elog(ERROR, "Cannot close the destination file \"%s\": %s", + to_fullpath, strerror(errno)); + + pg_free(iter); + pg_free(in_buf); + pg_free(out_buf); + + return n_blocks_read; +} + /* * Attempt to open header file, read content and return as * array of headers. diff --git a/src/dir.c b/src/dir.c index ce255d0ad..473534c8b 100644 --- a/src/dir.c +++ b/src/dir.c @@ -485,6 +485,13 @@ pgFileCompareSize(const void *f1, const void *f2) return 0; } +/* Compare two pgFile with their size in descending order */ +int +pgFileCompareSizeDesc(const void *f1, const void *f2) +{ + return -1 * pgFileCompareSize(f1, f2); +} + static int pgCompareString(const void *str1, const void *str2) { @@ -887,7 +894,7 @@ dir_list_file_internal(parray *files, pgFile *parent, const char *parent_dir, * * Copy of function get_tablespace_mapping() from pg_basebackup.c. */ -static const char * +const char * get_tablespace_mapping(const char *dir) { TablespaceListCell *cell; diff --git a/src/help.c b/src/help.c index e1c8d6833..921feaec0 100644 --- a/src/help.c +++ b/src/help.c @@ -2,7 +2,7 @@ * * help.c * - * Copyright (c) 2017-2019, Postgres Professional + * Copyright (c) 2017-2021, Postgres Professional * *------------------------------------------------------------------------- */ @@ -29,6 +29,7 @@ static void help_archive_get(void); static void help_checkdb(void); static void help_help(void); static void help_version(void); +static void help_catchup(void); void help_print_version(void) @@ -70,6 +71,7 @@ help_command(ProbackupSubcmd const subcmd) &help_internal, // AGENT_CMD &help_help, &help_version, + &help_catchup, }; Assert((int)subcmd < sizeof(help_functions) / sizeof(help_functions[0])); @@ -246,6 +248,19 @@ help_pg_probackup(void) printf(_(" [--ssh-options]\n")); printf(_(" [--help]\n")); + printf(_("\n %s catchup -b catchup-mode\n"), PROGRAM_NAME); + printf(_(" --source-pgdata=path_to_pgdata_on_remote_server\n")); + printf(_(" --destination-pgdata=path_to_local_dir\n")); + printf(_(" [--stream [-S slot-name]] [--temp-slot]\n")); + printf(_(" [-j num-threads]\n")); + printf(_(" [-T OLDDIR=NEWDIR]\n")); + printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n")); + printf(_(" [-w --no-password] [-W --password]\n")); + printf(_(" [--remote-proto] [--remote-host]\n")); + printf(_(" [--remote-port] [--remote-path] [--remote-user]\n")); + printf(_(" [--ssh-options]\n")); + printf(_(" [--help]\n")); + if ((PROGRAM_URL || PROGRAM_EMAIL)) { printf("\n"); @@ -1009,3 +1024,49 @@ help_version(void) printf(_("\n%s version\n"), PROGRAM_NAME); printf(_("%s --version\n\n"), PROGRAM_NAME); } + +static void +help_catchup(void) +{ + printf(_("\n%s catchup -b catchup-mode\n"), PROGRAM_NAME); + printf(_(" --source-pgdata=path_to_pgdata_on_remote_server\n")); + printf(_(" --destination-pgdata=path_to_local_dir\n")); + printf(_(" [--stream [-S slot-name]] [--temp-slot]\n")); + printf(_(" [-j num-threads]\n")); + printf(_(" [-T OLDDIR=NEWDIR]\n")); + printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n")); + printf(_(" [-w --no-password] [-W --password]\n")); + printf(_(" [--remote-proto] [--remote-host]\n")); + printf(_(" [--remote-port] [--remote-path] [--remote-user]\n")); + printf(_(" [--ssh-options]\n")); + printf(_(" [--help]\n\n")); + + printf(_(" -b, --backup-mode=catchup-mode catchup mode=FULL|DELTA|PTRACK\n")); + printf(_(" --stream stream the transaction log (only supported mode)\n")); + printf(_(" -S, --slot=SLOTNAME replication slot to use\n")); + printf(_(" --temp-slot use temporary replication slot\n")); + + printf(_(" -j, --threads=NUM number of parallel threads\n")); + + printf(_(" -T, --tablespace-mapping=OLDDIR=NEWDIR\n")); + printf(_(" relocate the tablespace from directory OLDDIR to NEWDIR\n")); + + printf(_("\n Connection options:\n")); + printf(_(" -U, --pguser=USERNAME user name to connect as (default: current local user)\n")); + printf(_(" -d, --pgdatabase=DBNAME database to connect (default: username)\n")); + printf(_(" -h, --pghost=HOSTNAME database server host or socket directory(default: 'local socket')\n")); + printf(_(" -p, --pgport=PORT database server port (default: 5432)\n")); + printf(_(" -w, --no-password never prompt for password\n")); + printf(_(" -W, --password force password prompt\n\n")); + + printf(_("\n Remote options:\n")); + printf(_(" --remote-proto=protocol remote protocol to use\n")); + printf(_(" available options: 'ssh', 'none' (default: ssh)\n")); + printf(_(" --remote-host=hostname remote host address or hostname\n")); + printf(_(" --remote-port=port remote host port (default: 22)\n")); + printf(_(" --remote-path=path path to directory with pg_probackup binary on remote host\n")); + printf(_(" (default: current binary path)\n")); + printf(_(" --remote-user=username user name for ssh connection (default: current user)\n")); + printf(_(" --ssh-options=ssh_options additional ssh options (default: none)\n")); + printf(_(" (example: --ssh-options='-c cipher_spec -F configfile')\n\n")); +} diff --git a/src/init.c b/src/init.c index dc821325a..a4911cb5c 100644 --- a/src/init.c +++ b/src/init.c @@ -57,7 +57,7 @@ do_add_instance(InstanceState *instanceState, InstanceConfig *instance) "(-D, --pgdata)"); /* Read system_identifier from PGDATA */ - instance->system_identifier = get_system_identifier(instance->pgdata); + instance->system_identifier = get_system_identifier(instance->pgdata, FIO_DB_HOST); /* Starting from PostgreSQL 11 read WAL segment size from PGDATA */ instance->xlog_seg_size = get_xlog_seg_size(instance->pgdata); diff --git a/src/pg_probackup.c b/src/pg_probackup.c index 3150900b6..00796be04 100644 --- a/src/pg_probackup.c +++ b/src/pg_probackup.c @@ -88,6 +88,9 @@ bool backup_logs = false; bool smooth_checkpoint; char *remote_agent; static char *backup_note = NULL; +/* catchup options */ +static char *catchup_source_pgdata = NULL; +static char *catchup_destination_pgdata = NULL; /* restore options */ static char *target_time = NULL; static char *target_xid = NULL; @@ -201,6 +204,9 @@ static ConfigOption cmd_options[] = { 'b', 184, "merge-expired", &merge_expired, SOURCE_CMD_STRICT }, { 'b', 185, "dry-run", &dry_run, SOURCE_CMD_STRICT }, { 's', 238, "note", &backup_note, SOURCE_CMD_STRICT }, + /* catchup options */ + { 's', 239, "source-pgdata", &catchup_source_pgdata, SOURCE_CMD_STRICT }, + { 's', 240, "destination-pgdata", &catchup_destination_pgdata, SOURCE_CMD_STRICT }, /* restore options */ { 's', 136, "recovery-target-time", &target_time, SOURCE_CMD_STRICT }, { 's', 137, "recovery-target-xid", &target_xid, SOURCE_CMD_STRICT }, @@ -445,11 +451,12 @@ main(int argc, char *argv[]) catalogState->catalog_path, WAL_SUBDIR); } - /* backup_path is required for all pg_probackup commands except help, version and checkdb */ + /* backup_path is required for all pg_probackup commands except help, version, checkdb and catchup */ if (backup_path == NULL && backup_subcmd != CHECKDB_CMD && backup_subcmd != HELP_CMD && - backup_subcmd != VERSION_CMD) + backup_subcmd != VERSION_CMD && + backup_subcmd != CATCHUP_CMD) elog(ERROR, "required parameter not specified: BACKUP_PATH (-B, --backup-path)"); /* ===== catalogState (END) ======*/ @@ -458,12 +465,12 @@ main(int argc, char *argv[]) /* * Option --instance is required for all commands except - * init, show, checkdb and validate + * init, show, checkdb, validate and catchup */ if (instance_name == NULL) { if (backup_subcmd != INIT_CMD && backup_subcmd != SHOW_CMD && - backup_subcmd != VALIDATE_CMD && backup_subcmd != CHECKDB_CMD) + backup_subcmd != VALIDATE_CMD && backup_subcmd != CHECKDB_CMD && backup_subcmd != CATCHUP_CMD) elog(ERROR, "required parameter not specified: --instance"); } else @@ -545,6 +552,10 @@ main(int argc, char *argv[]) setMyLocation(backup_subcmd); } } + else if (backup_subcmd == CATCHUP_CMD) + { + config_get_opt_env(instance_options); + } /* * Disable logging into file for archive-push and archive-get. @@ -587,6 +598,13 @@ main(int argc, char *argv[]) "You must specify --log-directory option when running checkdb with " "--log-level-file option enabled."); + if (backup_subcmd == CATCHUP_CMD && + instance_config.logger.log_level_file != LOG_OFF && + instance_config.logger.log_directory == NULL) + elog(ERROR, "Cannot save catchup logs to a file. " + "You must specify --log-directory option when running catchup with " + "--log-level-file option enabled."); + /* Initialize logger */ init_logger(backup_path, &instance_config.logger); @@ -745,6 +763,25 @@ main(int argc, char *argv[]) } } + /* checking required options */ + if (backup_subcmd == CATCHUP_CMD) + { + if (catchup_source_pgdata == NULL) + elog(ERROR, "You must specify \"--source-pgdata\" option with the \"%s\" command", get_subcmd_name(backup_subcmd)); + if (catchup_destination_pgdata == NULL) + elog(ERROR, "You must specify \"--destination-pgdata\" option with the \"%s\" command", get_subcmd_name(backup_subcmd)); + if (current.backup_mode == BACKUP_MODE_INVALID) + elog(ERROR, "Required parameter not specified: BACKUP_MODE (-b, --backup-mode)"); + if (current.backup_mode != BACKUP_MODE_FULL && current.backup_mode != BACKUP_MODE_DIFF_PTRACK && current.backup_mode != BACKUP_MODE_DIFF_DELTA) + elog(ERROR, "Only \"FULL\", \"PTRACK\" and \"DELTA\" modes are supported with the \"%s\" command", get_subcmd_name(backup_subcmd)); + if (!stream_wal) + elog(INFO, "--stream is required, forcing stream mode"); + current.stream = stream_wal = true; + if (instance_config.external_dir_str) + elog(ERROR, "external directories not supported fom \"%s\" command", get_subcmd_name(backup_subcmd)); + // TODO проверить instance_config.conn_opt + } + /* sanity */ if (backup_subcmd == VALIDATE_CMD && restore_params->no_validate) elog(ERROR, "You cannot specify \"--no-validate\" option with the \"%s\" command", @@ -787,6 +824,8 @@ main(int argc, char *argv[]) return do_backup(instanceState, set_backup_params, no_validate, no_sync, backup_logs); } + case CATCHUP_CMD: + return do_catchup(catchup_source_pgdata, catchup_destination_pgdata, num_threads, !no_sync); case RESTORE_CMD: return do_restore_or_validate(instanceState, current.backup_id, recovery_target_options, diff --git a/src/pg_probackup.h b/src/pg_probackup.h index ccbf803fd..1cad526dd 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -17,6 +17,7 @@ #include "access/xlog_internal.h" #include "utils/pg_crc.h" +#include "catalog/pg_control.h" #if PG_VERSION_NUM >= 120000 #include "common/logging.h" @@ -420,7 +421,7 @@ typedef struct PGNodeInfo char server_version_str[100]; int ptrack_version_num; - bool is_ptrack_enable; + bool is_ptrack_enabled; const char *ptrack_schema; /* used only for ptrack 2.x */ } PGNodeInfo; @@ -840,13 +841,16 @@ extern const char *deparse_backup_mode(BackupMode mode); extern void process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno); +/* in catchup.c */ +extern int do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads, bool sync_dest_files); + /* in restore.c */ extern int do_restore_or_validate(InstanceState *instanceState, time_t target_backup_id, pgRecoveryTarget *rt, pgRestoreParams *params, bool no_sync); -extern bool satisfy_timeline(const parray *timelines, const pgBackup *backup); +extern bool satisfy_timeline(const parray *timelines, TimeLineID tli, XLogRecPtr lsn); extern bool satisfy_recovery_target(const pgBackup *backup, const pgRecoveryTarget *rt); extern pgRecoveryTarget *parseRecoveryTargetOptions( @@ -861,6 +865,8 @@ extern parray *get_dbOid_exclude_list(pgBackup *backup, parray *datname_list, extern parray *get_backup_filelist(pgBackup *backup, bool strict); extern parray *read_timeline_history(const char *arclog_path, TimeLineID targetTLI, bool strict); extern bool tliIsPartOfHistory(const parray *timelines, TimeLineID tli); +extern DestDirIncrCompatibility check_incremental_compatibility(const char *pgdata, uint64 system_identifier, + IncrRestoreMode incremental_mode); /* in merge.c */ extern void do_merge(InstanceState *instanceState, time_t backup_id, bool no_validate, bool no_sync); @@ -1002,6 +1008,7 @@ extern void dir_list_file(parray *files, const char *root, bool exclude, bool follow_symlink, bool add_root, bool backup_logs, bool skip_hidden, int external_dir_num, fio_location location); +extern const char *get_tablespace_mapping(const char *dir); extern void create_data_directories(parray *dest_files, const char *data_dir, const char *backup_dir, @@ -1054,6 +1061,7 @@ extern int pgFileCompareRelPathWithExternal(const void *f1, const void *f2); extern int pgFileCompareRelPathWithExternalDesc(const void *f1, const void *f2); extern int pgFileCompareLinked(const void *f1, const void *f2); extern int pgFileCompareSize(const void *f1, const void *f2); +extern int pgFileCompareSizeDesc(const void *f1, const void *f2); extern int pgCompareOid(const void *f1, const void *f2); extern void pfilearray_clear_locks(parray *file_list); @@ -1061,6 +1069,12 @@ extern void pfilearray_clear_locks(parray *file_list); extern bool check_data_file(ConnectionArgs *arguments, pgFile *file, const char *from_fullpath, uint32 checksum_version); + +extern void catchup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath, + XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode, + CompressAlg calg, int clevel, uint32 checksum_version, + int ptrack_version_num, const char *ptrack_schema, + bool is_merge, size_t prev_size); extern void backup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath, XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode, CompressAlg calg, int clevel, uint32 checksum_version, @@ -1129,14 +1143,15 @@ extern XLogRecPtr get_next_record_lsn(const char *archivedir, XLogSegNo segno, T /* in util.c */ extern TimeLineID get_current_timeline(PGconn *conn); -extern TimeLineID get_current_timeline_from_control(bool safe); +extern TimeLineID get_current_timeline_from_control(const char *pgdata_path, fio_location location, bool safe); extern XLogRecPtr get_checkpoint_location(PGconn *conn); -extern uint64 get_system_identifier(const char *pgdata_path); +extern uint64 get_system_identifier(const char *pgdata_path, fio_location location); extern uint64 get_remote_system_identifier(PGconn *conn); extern uint32 get_data_checksum_version(bool safe); extern pg_crc32c get_pgcontrol_checksum(const char *pgdata_path); -extern uint32 get_xlog_seg_size(char *pgdata_path); -extern void get_redo(const char *pgdata_path, RedoParams *redo); +extern DBState get_system_dbstate(const char *pgdata_path, fio_location location); +extern uint32 get_xlog_seg_size(const char *pgdata_path); +extern void get_redo(const char *pgdata_path, fio_location pgdata_location, RedoParams *redo); extern void set_min_recovery_point(pgFile *file, const char *backup_path, XLogRecPtr stop_backup_lsn); extern void copy_pgcontrol_file(const char *from_fullpath, fio_location from_location, @@ -1161,7 +1176,7 @@ extern void pretty_size(int64 size, char *buf, size_t len); extern void pretty_time_interval(double time, char *buf, size_t len); extern PGconn *pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo); -extern void check_system_identifiers(PGconn *conn, char *pgdata); +extern void check_system_identifiers(PGconn *conn, const char *pgdata); extern void parse_filelist_filenames(parray *files, const char *root); /* in ptrack.c */ @@ -1170,7 +1185,8 @@ extern void make_pagemap_from_ptrack_2(parray* files, PGconn* backup_conn, int ptrack_version_num, XLogRecPtr lsn); extern void get_ptrack_version(PGconn *backup_conn, PGNodeInfo *nodeInfo); -extern bool pg_ptrack_enable(PGconn *backup_conn, int ptrack_version_num); +extern bool pg_is_ptrack_enabled(PGconn *backup_conn, int ptrack_version_num); + extern XLogRecPtr get_last_ptrack_lsn(PGconn *backup_conn, PGNodeInfo *nodeInfo); extern parray * pg_ptrack_get_pagemapset(PGconn *backup_conn, const char *ptrack_schema, int ptrack_version_num, XLogRecPtr lsn); @@ -1182,6 +1198,10 @@ extern int send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file, XLogRecPtr prev_backup_start_lsn, CompressAlg calg, int clevel, uint32 checksum_version, bool use_pagemap, BackupPageHeader2 **headers, BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema); +extern int copy_pages(const char *to_fullpath, const char *from_fullpath, + pgFile *file, XLogRecPtr prev_backup_start_lsn, + uint32 checksum_version, bool use_pagemap, + BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema); /* FIO */ extern void setMyLocation(ProbackupSubcmd const subcmd); @@ -1190,6 +1210,10 @@ extern int fio_send_pages(const char *to_fullpath, const char *from_fullpath, pg XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version, bool use_pagemap, BlockNumber *err_blknum, char **errormsg, BackupPageHeader2 **headers); +extern int fio_copy_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file, + XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version, + bool use_pagemap, BlockNumber *err_blknum, char **errormsg, + BackupPageHeader2 **headers); /* return codes for fio_send_pages */ extern int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out, char **errormsg); extern int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out, @@ -1243,6 +1267,7 @@ extern void start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOptions *conn_opt, XLogRecPtr startpos, TimeLineID starttli); extern int wait_WAL_streaming_end(parray *backup_files_list); +extern parray* parse_tli_history_buffer(char *history, TimeLineID tli); /* external variables and functions, implemented in backup.c */ typedef struct PGStopBackupResult @@ -1280,5 +1305,6 @@ extern XLogRecPtr wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr lsn, bool bool in_prev_segment, bool segment_only, int timeout_elevel, bool in_stream_dir); extern void wait_wal_and_calculate_stop_lsn(const char *xlog_path, XLogRecPtr stop_lsn, pgBackup *backup); +extern int64 calculate_datasize_of_filelist(parray *filelist); #endif /* PG_PROBACKUP_H */ diff --git a/src/ptrack.c b/src/ptrack.c index 6825686c6..c631d7386 100644 --- a/src/ptrack.c +++ b/src/ptrack.c @@ -118,7 +118,7 @@ get_ptrack_version(PGconn *backup_conn, PGNodeInfo *nodeInfo) * Check if ptrack is enabled in target instance */ bool -pg_ptrack_enable(PGconn *backup_conn, int ptrack_version_num) +pg_is_ptrack_enabled(PGconn *backup_conn, int ptrack_version_num) { PGresult *res_db; bool result = false; diff --git a/src/restore.c b/src/restore.c index 7f5df1a00..005984aed 100644 --- a/src/restore.c +++ b/src/restore.c @@ -67,8 +67,6 @@ static void restore_chain(pgBackup *dest_backup, parray *parent_chain, parray *dbOid_exclude_list, pgRestoreParams *params, const char *pgdata_path, bool no_sync, bool cleanup_pgdata, bool backup_has_tblspc); -static DestDirIncrCompatibility check_incremental_compatibility(const char *pgdata, uint64 system_identifier, - IncrRestoreMode incremental_mode); /* * Iterate over backup list to find all ancestors of the broken parent_backup @@ -293,7 +291,7 @@ do_restore_or_validate(InstanceState *instanceState, time_t target_backup_id, pg if (!timelines) elog(ERROR, "Failed to get history file for target timeline %i", rt->target_tli); - if (!satisfy_timeline(timelines, current_backup)) + if (!satisfy_timeline(timelines, current_backup->tli, current_backup->stop_lsn)) { if (target_backup_id != INVALID_BACKUP_ID) elog(ERROR, "target backup %s does not satisfy target timeline", @@ -487,7 +485,7 @@ do_restore_or_validate(InstanceState *instanceState, time_t target_backup_id, pg { RedoParams redo; parray *timelines = NULL; - get_redo(instance_config.pgdata, &redo); + get_redo(instance_config.pgdata, FIO_DB_HOST, &redo); if (redo.checksum_version == 0) elog(ERROR, "Incremental restore in 'lsn' mode require " @@ -1819,7 +1817,7 @@ satisfy_recovery_target(const pgBackup *backup, const pgRecoveryTarget *rt) /* TODO description */ bool -satisfy_timeline(const parray *timelines, const pgBackup *backup) +satisfy_timeline(const parray *timelines, TimeLineID tli, XLogRecPtr lsn) { int i; @@ -1828,9 +1826,9 @@ satisfy_timeline(const parray *timelines, const pgBackup *backup) TimeLineHistoryEntry *timeline; timeline = (TimeLineHistoryEntry *) parray_get(timelines, i); - if (backup->tli == timeline->tli && + if (tli == timeline->tli && (XLogRecPtrIsInvalid(timeline->end) || - backup->stop_lsn <= timeline->end)) + lsn <= timeline->end)) return true; } return false; @@ -2186,9 +2184,9 @@ check_incremental_compatibility(const char *pgdata, uint64 system_identifier, * data files content, because based on pg_control information we will * choose a backup suitable for lsn based incremental restore. */ - elog(INFO, "Trying to read pg_control file in destination direstory"); + elog(INFO, "Trying to read pg_control file in destination directory"); - system_id_pgdata = get_system_identifier(pgdata); + system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST); if (system_id_pgdata == instance_config.system_identifier) system_id_match = true; diff --git a/src/stream.c b/src/stream.c index 01161f720..615d25281 100644 --- a/src/stream.c +++ b/src/stream.c @@ -70,7 +70,6 @@ static void add_walsegment_to_filelist(parray *filelist, uint32 timeline, uint32 xlog_seg_size); static void add_history_file_to_filelist(parray *filelist, uint32 timeline, char *basedir); -static parray* parse_tli_history_buffer(char *history, TimeLineID tli); /* * Run IDENTIFY_SYSTEM through a given connection and @@ -173,7 +172,7 @@ StreamLog(void *arg) */ stream_arg->startpos -= stream_arg->startpos % instance_config.xlog_seg_size; - xlog_files_list = parray_new(); + xlog_files_list = parray_new(); /* Initialize timeout */ stream_stop_begin = 0; @@ -308,14 +307,14 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished) /* we assume that we get called once at the end of each segment */ if (segment_finished) - { - elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"), - (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline); + { + elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"), + (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline); - add_walsegment_to_filelist(xlog_files_list, timeline, xlogpos, - (char*) stream_thread_arg.basedir, - instance_config.xlog_seg_size); - } + add_walsegment_to_filelist(xlog_files_list, timeline, xlogpos, + (char*) stream_thread_arg.basedir, + instance_config.xlog_seg_size); + } /* * Note that we report the previous, not current, position here. After a @@ -588,20 +587,25 @@ start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOption /* Set error exit code as default */ stream_thread_arg.ret = 1; /* we must use startpos as start_lsn from start_backup */ - stream_thread_arg.startpos = current.start_lsn; - stream_thread_arg.starttli = current.tli; + stream_thread_arg.startpos = startpos; + stream_thread_arg.starttli = starttli; thread_interrupted = false; pthread_create(&stream_thread, NULL, StreamLog, &stream_thread_arg); } -/* Wait for the completion of stream */ +/* + * Wait for the completion of stream + * append list of streamed xlog files + * into backup_files_list (if it is not NULL) + */ int wait_WAL_streaming_end(parray *backup_files_list) { pthread_join(stream_thread, NULL); - parray_concat(backup_files_list, xlog_files_list); + if(backup_files_list != NULL) + parray_concat(backup_files_list, xlog_files_list); parray_free(xlog_files_list); return stream_thread_arg.ret; } diff --git a/src/util.c b/src/util.c index c0a1dc9e4..4e32e0639 100644 --- a/src/util.c +++ b/src/util.c @@ -10,8 +10,6 @@ #include "pg_probackup.h" -#include "catalog/pg_control.h" - #include #include @@ -174,7 +172,7 @@ get_current_timeline(PGconn *conn) if (PQresultStatus(res) == PGRES_TUPLES_OK) val = PQgetvalue(res, 0, 0); else - return get_current_timeline_from_control(false); + return get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false); if (!parse_uint32(val, &tli, 0)) { @@ -182,7 +180,7 @@ get_current_timeline(PGconn *conn) elog(WARNING, "Invalid value of timeline_id %s", val); /* TODO 3.0 remove it and just error out */ - return get_current_timeline_from_control(false); + return get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false); } return tli; @@ -190,15 +188,15 @@ get_current_timeline(PGconn *conn) /* Get timeline from pg_control file */ TimeLineID -get_current_timeline_from_control(bool safe) +get_current_timeline_from_control(const char *pgdata_path, fio_location location, bool safe) { ControlFileData ControlFile; char *buffer; size_t size; /* First fetch file... */ - buffer = slurpFile(instance_config.pgdata, XLOG_CONTROL_FILE, &size, - safe, FIO_DB_HOST); + buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, + safe, location); if (safe && buffer == NULL) return 0; @@ -249,14 +247,14 @@ get_checkpoint_location(PGconn *conn) } uint64 -get_system_identifier(const char *pgdata_path) +get_system_identifier(const char *pgdata_path, fio_location location) { ControlFileData ControlFile; char *buffer; size_t size; /* First fetch file... */ - buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, FIO_DB_HOST); + buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, location); if (buffer == NULL) return 0; digestControlFile(&ControlFile, buffer, size); @@ -299,7 +297,7 @@ get_remote_system_identifier(PGconn *conn) } uint32 -get_xlog_seg_size(char *pgdata_path) +get_xlog_seg_size(const char *pgdata_path) { #if PG_VERSION_NUM >= 110000 ControlFileData ControlFile; @@ -351,15 +349,31 @@ get_pgcontrol_checksum(const char *pgdata_path) return ControlFile.crc; } +DBState +get_system_dbstate(const char *pgdata_path, fio_location location) +{ + ControlFileData ControlFile; + char *buffer; + size_t size; + + buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, location); + if (buffer == NULL) + return 0; + digestControlFile(&ControlFile, buffer, size); + pg_free(buffer); + + return ControlFile.state; +} + void -get_redo(const char *pgdata_path, RedoParams *redo) +get_redo(const char *pgdata_path, fio_location pgdata_location, RedoParams *redo) { ControlFileData ControlFile; char *buffer; size_t size; /* First fetch file... */ - buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, FIO_DB_HOST); + buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, pgdata_location); digestControlFile(&ControlFile, buffer, size); pg_free(buffer); diff --git a/src/utils/configuration.c b/src/utils/configuration.c index afc1bc056..04bfbbe3b 100644 --- a/src/utils/configuration.c +++ b/src/utils/configuration.c @@ -110,6 +110,7 @@ static char const * const subcmd_names[] = "agent", "help", "version", + "catchup", }; ProbackupSubcmd diff --git a/src/utils/configuration.h b/src/utils/configuration.h index 4ed4e0e61..3a5de4b83 100644 --- a/src/utils/configuration.h +++ b/src/utils/configuration.h @@ -38,7 +38,8 @@ typedef enum ProbackupSubcmd SSH_CMD, AGENT_CMD, HELP_CMD, - VERSION_CMD + VERSION_CMD, + CATCHUP_CMD, } ProbackupSubcmd; typedef enum OptionSource diff --git a/src/utils/file.c b/src/utils/file.c index e9792dd9c..b808d6293 100644 --- a/src/utils/file.c +++ b/src/utils/file.c @@ -94,7 +94,7 @@ setMyLocation(ProbackupSubcmd const subcmd) MyLocation = IsSshProtocol() ? (subcmd == ARCHIVE_PUSH_CMD || subcmd == ARCHIVE_GET_CMD) ? FIO_DB_HOST - : (subcmd == BACKUP_CMD || subcmd == RESTORE_CMD || subcmd == ADD_INSTANCE_CMD) + : (subcmd == BACKUP_CMD || subcmd == RESTORE_CMD || subcmd == ADD_INSTANCE_CMD || subcmd == CATCHUP_CMD) ? FIO_BACKUP_HOST : FIO_LOCAL_HOST : FIO_LOCAL_HOST; @@ -1139,6 +1139,46 @@ fio_stat(char const* path, struct stat* st, bool follow_symlink, fio_location lo } } +/* + * Read value of a symbolic link + * this is a wrapper about readlink() syscall + * side effects: string truncation occur (and it + * can be checked by caller by comparing + * returned value >= valsiz) + */ +ssize_t +fio_readlink(const char *path, char *value, size_t valsiz, fio_location location) +{ + if (!fio_is_remote(location)) + { + /* readlink don't place trailing \0 */ + ssize_t len = readlink(path, value, valsiz); + value[len < valsiz ? len : valsiz] = '\0'; + return len; + } + else + { + fio_header hdr; + size_t path_len = strlen(path) + 1; + + hdr.cop = FIO_READLINK; + hdr.handle = -1; + Assert(valsiz <= UINT_MAX); /* max value of fio_header.arg */ + hdr.arg = valsiz; + hdr.size = path_len; + + IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr)); + IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len); + + IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr)); + Assert(hdr.cop == FIO_READLINK); + Assert(hdr.size <= valsiz); + IO_CHECK(fio_read_all(fio_stdin, value, hdr.size), hdr.size); + value[hdr.size < valsiz ? hdr.size : valsiz] = '\0'; + return hdr.size; + } +} + /* Check presence of the file */ int fio_access(char const* path, int mode, fio_location location) @@ -1769,7 +1809,7 @@ fio_send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file, /* send message with header - 8bytes 24bytes var var + 16bytes 24bytes var var -------------------------------------------------------------- | fio_header | fio_send_request | FILE PATH | BITMAP(if any) | -------------------------------------------------------------- @@ -1903,6 +1943,198 @@ fio_send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file, return n_blocks_read; } +/* + * Return number of actually(!) readed blocks, attempts or + * half-readed block are not counted. + * Return values in case of error: + * FILE_MISSING + * OPEN_FAILED + * READ_ERROR + * PAGE_CORRUPTION + * WRITE_FAILED + * + * If none of the above, this function return number of blocks + * readed by remote agent. + * + * In case of DELTA mode horizonLsn must be a valid lsn, + * otherwise it should be set to InvalidXLogRecPtr. + * Взято из fio_send_pages + */ +int +fio_copy_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file, + XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version, + bool use_pagemap, BlockNumber* err_blknum, char **errormsg, + BackupPageHeader2 **headers) +{ + FILE *out = NULL; + char *out_buf = NULL; + struct { + fio_header hdr; + fio_send_request arg; + } req; + BlockNumber n_blocks_read = 0; + BlockNumber blknum = 0; + + /* send message with header + + 16bytes 24bytes var var + -------------------------------------------------------------- + | fio_header | fio_send_request | FILE PATH | BITMAP(if any) | + -------------------------------------------------------------- + */ + + req.hdr.cop = FIO_SEND_PAGES; + + if (use_pagemap) + { + req.hdr.size = sizeof(fio_send_request) + (*file).pagemap.bitmapsize + strlen(from_fullpath) + 1; + req.arg.bitmapsize = (*file).pagemap.bitmapsize; + + /* TODO: add optimization for the case of pagemap + * containing small number of blocks with big serial numbers: + * https://github.com/postgrespro/pg_probackup/blob/remote_page_backup/src/utils/file.c#L1211 + */ + } + else + { + req.hdr.size = sizeof(fio_send_request) + strlen(from_fullpath) + 1; + req.arg.bitmapsize = 0; + } + + req.arg.nblocks = file->size/BLCKSZ; + req.arg.segmentno = file->segno * RELSEG_SIZE; + req.arg.horizonLsn = horizonLsn; + req.arg.checksumVersion = checksum_version; + req.arg.calg = calg; + req.arg.clevel = clevel; + req.arg.path_len = strlen(from_fullpath) + 1; + + file->compress_alg = calg; /* TODO: wtf? why here? */ + +//<----- +// datapagemap_iterator_t *iter; +// BlockNumber blkno; +// iter = datapagemap_iterate(pagemap); +// while (datapagemap_next(iter, &blkno)) +// elog(INFO, "block %u", blkno); +// pg_free(iter); +//<----- + + /* send header */ + IO_CHECK(fio_write_all(fio_stdout, &req, sizeof(req)), sizeof(req)); + + /* send file path */ + IO_CHECK(fio_write_all(fio_stdout, from_fullpath, req.arg.path_len), req.arg.path_len); + + /* send pagemap if any */ + if (use_pagemap) + IO_CHECK(fio_write_all(fio_stdout, (*file).pagemap.bitmap, (*file).pagemap.bitmapsize), (*file).pagemap.bitmapsize); + + out = fio_fopen(to_fullpath, PG_BINARY_R "+", FIO_BACKUP_HOST); + if (out == NULL) + elog(ERROR, "Cannot open restore target file \"%s\": %s", to_fullpath, strerror(errno)); + + /* update file permission */ + if (fio_chmod(to_fullpath, file->mode, FIO_BACKUP_HOST) == -1) + elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath, + strerror(errno)); + + elog(VERBOSE, "ftruncate file \"%s\" to size %lu", + to_fullpath, file->size); + if (fio_ftruncate(out, file->size) == -1) + elog(ERROR, "Cannot ftruncate file \"%s\" to size %lu: %s", + to_fullpath, file->size, strerror(errno)); + + if (!fio_is_remote_file(out)) + { + out_buf = pgut_malloc(STDIO_BUFSIZE); + setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE); + } + + while (true) + { + fio_header hdr; + char buf[BLCKSZ + sizeof(BackupPageHeader)]; + IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr)); + + if (interrupted) + elog(ERROR, "Interrupted during page reading"); + + if (hdr.cop == FIO_ERROR) + { + /* FILE_MISSING, OPEN_FAILED and READ_FAILED */ + if (hdr.size > 0) + { + IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size); + *errormsg = pgut_malloc(hdr.size); + snprintf(*errormsg, hdr.size, "%s", buf); + } + + return hdr.arg; + } + else if (hdr.cop == FIO_SEND_FILE_CORRUPTION) + { + *err_blknum = hdr.arg; + + if (hdr.size > 0) + { + IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size); + *errormsg = pgut_malloc(hdr.size); + snprintf(*errormsg, hdr.size, "%s", buf); + } + return PAGE_CORRUPTION; + } + else if (hdr.cop == FIO_SEND_FILE_EOF) + { + /* n_blocks_read reported by EOF */ + n_blocks_read = hdr.arg; + + /* receive headers if any */ + if (hdr.size > 0) + { + *headers = pgut_malloc(hdr.size); + IO_CHECK(fio_read_all(fio_stdin, *headers, hdr.size), hdr.size); + file->n_headers = (hdr.size / sizeof(BackupPageHeader2)) -1; + } + + break; + } + else if (hdr.cop == FIO_PAGE) + { + blknum = hdr.arg; + + Assert(hdr.size <= sizeof(buf)); + IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size); + + COMP_FILE_CRC32(true, file->crc, buf, hdr.size); + + if (fio_fseek(out, blknum * BLCKSZ) < 0) + { + elog(ERROR, "Cannot seek block %u of \"%s\": %s", + blknum, to_fullpath, strerror(errno)); + } + // должен прилетать некомпрессированный блок с заголовком + // Вставить assert? + if (fio_fwrite(out, buf + sizeof(BackupPageHeader), hdr.size - sizeof(BackupPageHeader)) != BLCKSZ) + { + fio_fclose(out); + *err_blknum = blknum; + return WRITE_FAILED; + } + file->write_size += BLCKSZ; + file->uncompressed_size += BLCKSZ; + } + else + elog(ERROR, "Remote agent returned message of unexpected type: %i", hdr.cop); + } + + if (out) + fclose(out); + pg_free(out_buf); + + return n_blocks_read; +} + /* TODO: read file using large buffer * Return codes: * FIO_ERROR: @@ -3147,6 +3379,26 @@ fio_communicate(int in, int out) case FIO_GET_ASYNC_ERROR: fio_get_async_error_impl(out); break; + case FIO_READLINK: /* Read content of a symbolic link */ + { + /* + * We need a buf for a arguments and for a result at the same time + * hdr.size = strlen(symlink_name) + 1 + * hdr.arg = bufsize for a answer (symlink content) + */ + size_t filename_size = (size_t)hdr.size; + if (filename_size + hdr.arg > buf_size) { + buf_size = hdr.arg; + buf = (char*)realloc(buf, buf_size); + } + rc = readlink(buf, buf + filename_size, hdr.arg); + hdr.cop = FIO_READLINK; + hdr.size = rc > 0 ? rc : 0; + IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr)); + if (hdr.size != 0) + IO_CHECK(fio_write_all(out, buf + filename_size, hdr.size), hdr.size); + } + break; default: Assert(false); } diff --git a/src/utils/file.h b/src/utils/file.h index ad65b9901..edb5ea0f9 100644 --- a/src/utils/file.h +++ b/src/utils/file.h @@ -55,7 +55,8 @@ typedef enum FIO_LIST_DIR, FIO_CHECK_POSTMASTER, FIO_GET_ASYNC_ERROR, - FIO_WRITE_ASYNC + FIO_WRITE_ASYNC, + FIO_READLINK } fio_operations; typedef enum @@ -128,6 +129,7 @@ extern int fio_mkdir(char const* path, int mode, fio_location location); extern int fio_chmod(char const* path, int mode, fio_location location); extern int fio_access(char const* path, int mode, fio_location location); extern int fio_stat(char const* path, struct stat* st, bool follow_symlinks, fio_location location); +extern ssize_t fio_readlink(const char *path, char *value, size_t valsiz, fio_location location); extern DIR* fio_opendir(char const* path, fio_location location); extern struct dirent * fio_readdir(DIR *dirp); extern int fio_closedir(DIR *dirp); diff --git a/src/utils/parray.c b/src/utils/parray.c index 95b83365d..792e26907 100644 --- a/src/utils/parray.c +++ b/src/utils/parray.c @@ -198,6 +198,13 @@ parray_bsearch(parray *array, const void *key, int(*compare)(const void *, const return bsearch(&key, array->data, array->used, sizeof(void *), compare); } +int +parray_bsearch_index(parray *array, const void *key, int(*compare)(const void *, const void *)) +{ + void **elem = parray_bsearch(array, key, compare); + return elem != NULL ? elem - array->data : -1; +} + /* checks that parray contains element */ bool parray_contains(parray *array, void *elem) { diff --git a/src/utils/parray.h b/src/utils/parray.h index 85d7383f3..e92ad728c 100644 --- a/src/utils/parray.h +++ b/src/utils/parray.h @@ -29,6 +29,7 @@ extern bool parray_rm(parray *array, const void *key, int(*compare)(const void * extern size_t parray_num(const parray *array); extern void parray_qsort(parray *array, int(*compare)(const void *, const void *)); extern void *parray_bsearch(parray *array, const void *key, int(*compare)(const void *, const void *)); +extern int parray_bsearch_index(parray *array, const void *key, int(*compare)(const void *, const void *)); extern void parray_walk(parray *array, void (*action)(void *)); extern bool parray_contains(parray *array, void *elem); diff --git a/tests/__init__.py b/tests/__init__.py index dbf84feea..080512760 100644 --- a/tests/__init__.py +++ b/tests/__init__.py @@ -6,7 +6,8 @@ retention, pgpro560, pgpro589, pgpro2068, false_positive, replica, \ compression, page, ptrack, archive, exclude, cfs_backup, cfs_restore, \ cfs_validate_backup, auth_test, time_stamp, snapfs, logging, \ - locking, remote, external, config, checkdb, set_backup, incr_restore + locking, remote, external, config, checkdb, set_backup, incr_restore, \ + catchup def load_tests(loader, tests, pattern): @@ -23,6 +24,7 @@ def load_tests(loader, tests, pattern): # suite.addTests(loader.loadTestsFromModule(auth_test)) suite.addTests(loader.loadTestsFromModule(archive)) suite.addTests(loader.loadTestsFromModule(backup)) + suite.addTests(loader.loadTestsFromModule(catchup)) suite.addTests(loader.loadTestsFromModule(compatibility)) suite.addTests(loader.loadTestsFromModule(checkdb)) suite.addTests(loader.loadTestsFromModule(config)) diff --git a/tests/catchup.py b/tests/catchup.py new file mode 100644 index 000000000..5df538e42 --- /dev/null +++ b/tests/catchup.py @@ -0,0 +1,977 @@ +import os +import signal +import unittest +from .helpers.ptrack_helpers import ProbackupTest, ProbackupException + +module_name = 'catchup' + +class CatchupTest(ProbackupTest, unittest.TestCase): + def setUp(self): + self.fname = self.id().split('.')[3] + +######################################### +# Basic tests +######################################### + def test_basic_full_catchup(self): + """ + Test 'multithreaded basebackup' mode (aka FULL catchup) + """ + # preparation + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True + ) + src_pg.slow_start() + src_pg.safe_psql( + "postgres", + "CREATE TABLE ultimate_question AS SELECT 42 AS answer") + src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + + # do full catchup + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + + # 1st check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # run&recover catchup'ed instance + src_pg.stop() + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + + # 2nd check: run verification query + dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy') + + # Cleanup + dst_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_full_catchup_with_tablespace(self): + """ + Test tablespace transfers + """ + # preparation + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True + ) + src_pg.slow_start() + tblspace1_old_path = self.get_tblspace_path(src_pg, 'tblspace1_old') + self.create_tblspace_in_node(src_pg, 'tblspace1', tblspc_path = tblspace1_old_path) + src_pg.safe_psql( + "postgres", + "CREATE TABLE ultimate_question TABLESPACE tblspace1 AS SELECT 42 AS answer") + src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + + # do full catchup with tablespace mapping + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + tblspace1_new_path = self.get_tblspace_path(dst_pg, 'tblspace1_new') + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = [ + '-d', 'postgres', + '-p', str(src_pg.port), + '--stream', + '-T', '{0}={1}'.format(tblspace1_old_path, tblspace1_new_path) + ] + ) + + # 1st check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # make changes in master tablespace + src_pg.safe_psql( + "postgres", + "UPDATE ultimate_question SET answer = -1") + src_pg.stop() + + # run&recover catchup'ed instance + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + + # 2nd check: run verification query + dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy') + + # Cleanup + dst_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_basic_delta_catchup(self): + """ + Test delta catchup + """ + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + pg_options = { 'wal_log_hints': 'on' } + ) + src_pg.slow_start() + src_pg.safe_psql( + "postgres", + "CREATE TABLE ultimate_question(answer int)") + + # preparation 2: make clean shutdowned lagging behind replica + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + self.set_replica(src_pg, dst_pg) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start(replica = True) + dst_pg.stop() + + # preparation 3: make changes on master (source) + src_pg.pgbench_init(scale = 10) + pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum']) + pgbench.wait() + src_pg.safe_psql("postgres", "INSERT INTO ultimate_question VALUES(42)") + src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + + # do delta catchup + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + + # 1st check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # run&recover catchup'ed instance + src_pg.stop() + self.set_replica(master = src_pg, replica = dst_pg) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start(replica = True) + + # 2nd check: run verification query + dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy') + + # Cleanup + dst_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_basic_ptrack_catchup(self): + """ + Test ptrack catchup + """ + if not self.ptrack: + return unittest.skip('Skipped because ptrack support is disabled') + + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + ptrack_enable = True, + initdb_params = ['--data-checksums'] + ) + src_pg.slow_start() + src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack") + src_pg.safe_psql( + "postgres", + "CREATE TABLE ultimate_question(answer int)") + + # preparation 2: make clean shutdowned lagging behind replica + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + self.set_replica(src_pg, dst_pg) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start(replica = True) + dst_pg.stop() + + # preparation 3: make changes on master (source) + src_pg.pgbench_init(scale = 10) + pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum']) + pgbench.wait() + src_pg.safe_psql("postgres", "INSERT INTO ultimate_question VALUES(42)") + src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + + # do ptrack catchup + self.catchup_node( + backup_mode = 'PTRACK', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + + # 1st check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # run&recover catchup'ed instance + src_pg.stop() + self.set_replica(master = src_pg, replica = dst_pg) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start(replica = True) + + # 2nd check: run verification query + dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy') + + # Cleanup + dst_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_tli_delta_catchup(self): + """ + Test that we correctly follow timeline change with delta catchup + """ + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + pg_options = { 'wal_log_hints': 'on' } + ) + src_pg.slow_start() + + # preparation 2: destination + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + dst_pg.stop() + + # preparation 3: promote source + src_pg.stop() + self.set_replica(dst_pg, src_pg) # fake replication + src_pg.slow_start(replica = True) + src_pg.promote() + src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer") + src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + + # do catchup + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + + # 1st check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # run&recover catchup'ed instance + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + + # 2nd check: run verification query + dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy') + + # Cleanup + src_pg.stop() + dst_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_tli_ptrack_catchup(self): + """ + Test that we correctly follow timeline change with ptrack catchup + """ + if not self.ptrack: + return unittest.skip('Skipped because ptrack support is disabled') + + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + ptrack_enable = True, + initdb_params = ['--data-checksums'] + ) + src_pg.slow_start() + src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack") + + # preparation 2: destination + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + dst_pg.stop() + + # preparation 3: promote source + src_pg.stop() + self.set_replica(dst_pg, src_pg) # fake replication + src_pg.slow_start(replica = True) + src_pg.promote() + src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer") + src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + + # do catchup + self.catchup_node( + backup_mode = 'PTRACK', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + + # 1st check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # run&recover catchup'ed instance + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + + # 2nd check: run verification query + dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy') + + # Cleanup + src_pg.stop() + dst_pg.stop() + self.del_test_dir(module_name, self.fname) + +######################################### +# Test various corner conditions +######################################### + def test_table_drop_with_delta(self): + """ + Test that dropped table in source will be dropped in delta catchup'ed instance too + """ + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + pg_options = { 'wal_log_hints': 'on' } + ) + src_pg.slow_start() + src_pg.safe_psql( + "postgres", + "CREATE TABLE ultimate_question AS SELECT 42 AS answer") + + # preparation 2: make clean shutdowned lagging behind replica + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + dst_pg.stop() + + # preparation 3: make changes on master (source) + # perform checkpoint twice to ensure, that datafile is actually deleted on filesystem + src_pg.safe_psql("postgres", "DROP TABLE ultimate_question") + src_pg.safe_psql("postgres", "CHECKPOINT") + src_pg.safe_psql("postgres", "CHECKPOINT") + + # do delta catchup + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + + # Check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # Cleanup + src_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_table_drop_with_ptrack(self): + """ + Test that dropped table in source will be dropped in ptrack catchup'ed instance too + """ + if not self.ptrack: + return unittest.skip('Skipped because ptrack support is disabled') + + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + ptrack_enable = True, + initdb_params = ['--data-checksums'] + ) + src_pg.slow_start() + src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack") + src_pg.safe_psql( + "postgres", + "CREATE TABLE ultimate_question AS SELECT 42 AS answer") + + # preparation 2: make clean shutdowned lagging behind replica + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + dst_pg.stop() + + # preparation 3: make changes on master (source) + # perform checkpoint twice to ensure, that datafile is actually deleted on filesystem + src_pg.safe_psql("postgres", "DROP TABLE ultimate_question") + src_pg.safe_psql("postgres", "CHECKPOINT") + src_pg.safe_psql("postgres", "CHECKPOINT") + + # do ptrack catchup + self.catchup_node( + backup_mode = 'PTRACK', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + + # Check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # Cleanup + src_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_tablefile_truncation_with_delta(self): + """ + Test that truncated table in source will be truncated in delta catchup'ed instance too + """ + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + pg_options = { 'wal_log_hints': 'on' } + ) + src_pg.slow_start() + src_pg.safe_psql( + "postgres", + "CREATE SEQUENCE t_seq; " + "CREATE TABLE t_heap AS SELECT i AS id, " + "md5(i::text) AS text, " + "md5(repeat(i::text, 10))::tsvector AS tsvector " + "FROM generate_series(0, 1024) i") + src_pg.safe_psql("postgres", "VACUUM t_heap") + + # preparation 2: make clean shutdowned lagging behind replica + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dest_options = {} + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + dst_pg.stop() + + # preparation 3: make changes on master (source) + src_pg.safe_psql("postgres", "DELETE FROM t_heap WHERE ctid >= '(11,0)'") + src_pg.safe_psql("postgres", "VACUUM t_heap") + + # do delta catchup + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + + # Check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # Cleanup + src_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_tablefile_truncation_with_ptrack(self): + """ + Test that truncated table in source will be truncated in ptrack catchup'ed instance too + """ + if not self.ptrack: + return unittest.skip('Skipped because ptrack support is disabled') + + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + ptrack_enable = True, + initdb_params = ['--data-checksums'] + ) + src_pg.slow_start() + src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack") + src_pg.safe_psql( + "postgres", + "CREATE SEQUENCE t_seq; " + "CREATE TABLE t_heap AS SELECT i AS id, " + "md5(i::text) AS text, " + "md5(repeat(i::text, 10))::tsvector AS tsvector " + "FROM generate_series(0, 1024) i") + src_pg.safe_psql("postgres", "VACUUM t_heap") + + # preparation 2: make clean shutdowned lagging behind replica + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dest_options = {} + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + dst_pg.stop() + + # preparation 3: make changes on master (source) + src_pg.safe_psql("postgres", "DELETE FROM t_heap WHERE ctid >= '(11,0)'") + src_pg.safe_psql("postgres", "VACUUM t_heap") + + # do ptrack catchup + self.catchup_node( + backup_mode = 'PTRACK', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + + # Check: compare data directories + self.compare_pgdata( + self.pgdata_content(src_pg.data_dir), + self.pgdata_content(dst_pg.data_dir) + ) + + # Cleanup + src_pg.stop() + self.del_test_dir(module_name, self.fname) + +######################################### +# Test reaction on user errors +######################################### + def test_local_tablespace_without_mapping(self): + """ + Test that we detect absence of needed --tablespace-mapping option + """ + if self.remote: + return unittest.skip('Skipped because this test tests local catchup error handling') + + src_pg = self.make_simple_node(base_dir = os.path.join(module_name, self.fname, 'src')) + src_pg.slow_start() + + tblspace_path = self.get_tblspace_path(src_pg, 'tblspace') + self.create_tblspace_in_node( + src_pg, 'tblspace', + tblspc_path = tblspace_path) + + src_pg.safe_psql( + "postgres", + "CREATE TABLE ultimate_question TABLESPACE tblspace AS SELECT 42 AS answer") + + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + try: + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = [ + '-d', 'postgres', + '-p', str(src_pg.port), + '--stream', + ] + ) + self.assertEqual(1, 0, "Expecting Error because '-T' parameter is not specified.\n Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + 'ERROR: Local catchup executed, but source database contains tablespace', + e.message, + '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd)) + + # Cleanup + src_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_running_dest_postmaster(self): + """ + Test that we detect running postmaster in destination + """ + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + pg_options = { 'wal_log_hints': 'on' } + ) + src_pg.slow_start() + + # preparation 2: destination + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + # leave running destination postmaster + # so don't call dst_pg.stop() + + # try delta catchup + try: + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + self.assertEqual(1, 0, "Expecting Error because postmaster in destination is running.\n Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + 'ERROR: Postmaster with pid ', + e.message, + '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd)) + + # Cleanup + src_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_same_db_id(self): + """ + Test that we detect different id's of source and destination + """ + # preparation: + # source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True + ) + src_pg.slow_start() + # destination + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + dst_pg.stop() + # fake destination + fake_dst_pg = self.make_simple_node(base_dir = os.path.join(module_name, self.fname, 'fake_dst')) + # fake source + fake_src_pg = self.make_simple_node(base_dir = os.path.join(module_name, self.fname, 'fake_src')) + + # try delta catchup (src (with correct src conn), fake_dst) + try: + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = src_pg.data_dir, + destination_node = fake_dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + self.assertEqual(1, 0, "Expecting Error because database identifiers mismatch.\n Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + 'ERROR: Database identifiers mismatch: ', + e.message, + '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd)) + + # try delta catchup (fake_src (with wrong src conn), dst) + try: + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = fake_src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + self.assertEqual(1, 0, "Expecting Error because database identifiers mismatch.\n Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + 'ERROR: Database identifiers mismatch: ', + e.message, + '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd)) + + # Cleanup + src_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_destination_dbstate(self): + """ + Test that we detect that destination pg is not cleanly shutdowned + """ + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + pg_options = { 'wal_log_hints': 'on' } + ) + src_pg.slow_start() + + # preparation 2: destination + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + + # try #1 + try: + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + self.assertEqual(1, 0, "Expecting Error because destination pg is not cleanly shutdowned.\n Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + 'ERROR: Destination directory contains "backup_label" file', + e.message, + '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd)) + + # try #2 + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + self.assertNotEqual(dst_pg.pid, 0, "Cannot detect pid of running postgres") + os.kill(dst_pg.pid, signal.SIGKILL) + try: + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + self.assertEqual(1, 0, "Expecting Error because destination pg is not cleanly shutdowned.\n Output: {0} \n CMD: {1}".format( + repr(self.output), self.cmd)) + except ProbackupException as e: + self.assertIn( + 'must be stopped cleanly', + e.message, + '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd)) + + # Cleanup + src_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_tli_destination_mismatch(self): + """ + Test that we detect TLI mismatch in destination + """ + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + pg_options = { 'wal_log_hints': 'on' } + ) + src_pg.slow_start() + + # preparation 2: destination + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + self.set_replica(src_pg, dst_pg) + dst_pg.slow_start(replica = True) + dst_pg.promote() + dst_pg.stop() + + # preparation 3: "useful" changes + src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer") + src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + + # try catchup + try: + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + dst_pg.stop() + self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy') + except ProbackupException as e: + self.assertIn( + 'ERROR: Source is behind destination in timeline history', + e.message, + '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd)) + + # Cleanup + src_pg.stop() + self.del_test_dir(module_name, self.fname) + + def test_tli_source_mismatch(self): + """ + Test that we detect TLI mismatch in source history + """ + # preparation 1: source + src_pg = self.make_simple_node( + base_dir = os.path.join(module_name, self.fname, 'src'), + set_replication = True, + pg_options = { 'wal_log_hints': 'on' } + ) + src_pg.slow_start() + + # preparation 2: fake source (promouted copy) + fake_src_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'fake_src')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = fake_src_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + fake_src_options = {} + fake_src_options['port'] = str(fake_src_pg.port) + self.set_auto_conf(fake_src_pg, fake_src_options) + self.set_replica(src_pg, fake_src_pg) + fake_src_pg.slow_start(replica = True) + fake_src_pg.promote() + self.switch_wal_segment(fake_src_pg) + fake_src_pg.safe_psql( + "postgres", + "CREATE TABLE t_heap AS SELECT i AS id, " + "md5(i::text) AS text, " + "md5(repeat(i::text, 10))::tsvector AS tsvector " + "FROM generate_series(0, 256) i") + self.switch_wal_segment(fake_src_pg) + fake_src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 'trash' AS garbage") + + # preparation 3: destination + dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst')) + self.catchup_node( + backup_mode = 'FULL', + source_pgdata = src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream'] + ) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + dst_pg.stop() + + # preparation 4: "useful" changes + src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer") + src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + + # try catchup + try: + self.catchup_node( + backup_mode = 'DELTA', + source_pgdata = fake_src_pg.data_dir, + destination_node = dst_pg, + options = ['-d', 'postgres', '-p', str(fake_src_pg.port), '--stream'] + ) + dst_options = {} + dst_options['port'] = str(dst_pg.port) + self.set_auto_conf(dst_pg, dst_options) + dst_pg.slow_start() + dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question") + dst_pg.stop() + self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy') + except ProbackupException as e: + self.assertIn( + 'ERROR: Destination is not in source timeline history', + e.message, + '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd)) + + # Cleanup + src_pg.stop() + fake_src_pg.stop() + self.del_test_dir(module_name, self.fname) diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py index af27669b1..1de004250 100644 --- a/tests/helpers/ptrack_helpers.py +++ b/tests/helpers/ptrack_helpers.py @@ -345,14 +345,9 @@ def pg_config_version(self): # print('PGPROBACKUP_SSH_USER is not set') # exit(1) - def make_simple_node( + def make_empty_node( self, - base_dir=None, - set_replication=False, - ptrack_enable=False, - initdb_params=[], - pg_options={}): - + base_dir=None): real_base_dir = os.path.join(self.tmp_path, base_dir) shutil.rmtree(real_base_dir, ignore_errors=True) os.makedirs(real_base_dir) @@ -361,6 +356,17 @@ def make_simple_node( # bound method slow_start() to 'node' class instance node.slow_start = slow_start.__get__(node) node.should_rm_dirs = True + return node + + def make_simple_node( + self, + base_dir=None, + set_replication=False, + ptrack_enable=False, + initdb_params=[], + pg_options={}): + + node = self.make_empty_node(base_dir) node.init( initdb_params=initdb_params, allow_streaming=set_replication) @@ -1036,6 +1042,28 @@ def restore_node( return self.run_pb(cmd_list + options, gdb=gdb, old_binary=old_binary) + def catchup_node( + self, + backup_mode, source_pgdata, destination_node, + options = [] + ): + + cmd_list = [ + 'catchup', + '--backup-mode={0}'.format(backup_mode), + '--source-pgdata={0}'.format(source_pgdata), + '--destination-pgdata={0}'.format(destination_node.data_dir) + ] + if self.remote: + cmd_list += ['--remote-proto=ssh', '--remote-host=localhost'] + if self.verbose: + cmd_list += [ + '--log-level-file=VERBOSE', + '--log-directory={0}'.format(destination_node.logs_dir) + ] + + return self.run_pb(cmd_list + options) + def show_pb( self, backup_dir, instance=None, backup_id=None, options=[], as_text=False, as_json=True, old_binary=False, @@ -1736,10 +1764,10 @@ def compare_pgdata(self, original_pgdata, restored_pgdata): ): fail = True error_message += '\nFile permissions mismatch:\n' - error_message += ' File_old: {0} Permissions: {1}\n'.format( + error_message += ' File_old: {0} Permissions: {1:o}\n'.format( os.path.join(original_pgdata['pgdata'], file), original_pgdata['files'][file]['mode']) - error_message += ' File_new: {0} Permissions: {1}\n'.format( + error_message += ' File_new: {0} Permissions: {1:o}\n'.format( os.path.join(restored_pgdata['pgdata'], file), restored_pgdata['files'][file]['mode'])