diff --git a/Makefile b/Makefile
index 1431be4ef..5173aa38f 100644
--- a/Makefile
+++ b/Makefile
@@ -7,7 +7,7 @@ OBJS = src/utils/configuration.o src/utils/json.o src/utils/logger.o \
OBJS += src/archive.o src/backup.o src/catalog.o src/checkdb.o src/configure.o src/data.o \
src/delete.o src/dir.o src/fetch.o src/help.o src/init.o src/merge.o \
src/parsexlog.o src/ptrack.o src/pg_probackup.o src/restore.o src/show.o src/stream.o \
- src/util.o src/validate.o src/datapagemap.o
+ src/util.o src/validate.o src/datapagemap.o src/catchup.o
# borrowed files
OBJS += src/pg_crc.o src/receivelog.o src/streamutil.o \
diff --git a/doc/pgprobackup.xml b/doc/pgprobackup.xml
index b1ddd0032..f7814c2d2 100644
--- a/doc/pgprobackup.xml
+++ b/doc/pgprobackup.xml
@@ -143,6 +143,14 @@ doc/src/sgml/pgprobackup.sgml
wal_file_name
option
+
+ pg_probackup
+
+ catchup_mode
+ =path_to_pgdata_on_remote_server
+ =path_to_local_dir
+ option
+
@@ -283,6 +291,11 @@ doc/src/sgml/pgprobackup.sgml
Partial restore: restoring only the specified databases.
+
+
+ Catchup: cloning a PostgreSQL instance for a fallen-behind standby server to catch up
with master.
+
+
To manage backup data, pg_probackup creates a
@@ -1076,7 +1089,8 @@ GRANT SELECT ON TABLE pg_catalog.pg_database TO backup;
mode: ,
,
,
- ,
+ ,
+ , and
.
@@ -1431,6 +1445,7 @@ pg_probackup backup -B backup_dir --instance
+
Performing Cluster Verification
@@ -1506,6 +1521,7 @@ pg_probackup checkdb --amcheck --skip-block-validation [connection_
higher cost of CPU, memory, and I/O consumption.
+
Validating a Backup
@@ -2073,6 +2089,7 @@ pg_probackup restore -B backup_dir --instance ,
,
,
+ ,
and
processes can be
executed on several parallel threads. This can significantly
@@ -3390,6 +3407,148 @@ pg_probackup delete -B backup_dir --instance
+
+
+ Cloning PostgreSQL Instance
+
+ pg_probackup can create a copy of a PostgreSQL
+ instance directly, without using the backup catalog. This allows you
+ to add a new standby server in a parallel mode or to have a standby
+ server that has fallen behind catch up
with master.
+
+
+
+ Cloning a PostgreSQL instance is different from other pg_probackup
+ operations:
+
+
+
+ The backup catalog is not required.
+
+
+
+
+ STREAM WAL delivery mode is only supported.
+
+
+
+
+ Copying external directories
+ is not supported.
+
+
+
+
+ No SQL commands involving tablespaces, such as
+ CREATE TABLESPACE/DROP TABLESPACE,
+ can be run simultaneously with catchup.
+
+
+
+
+ catchup takes configuration files, such as
+ postgresql.conf, postgresql.auto.conf,
+ or pg_hba.conf, from the source server and overwrites them
+ on the target server.
+
+
+
+
+
+
+ Before cloning a PostgreSQL instance, set up the source database server as follows:
+
+
+
+ Configure
+ the database cluster for the instance to copy.
+
+
+
+
+ To copy from a remote server, configure the remote mode.
+
+
+
+
+ To use the PTRACK catchup mode, set up PTRACK backups.
+
+
+
+
+
+
+ To clone a PostgreSQL instance, ensure that the source
+ database server is running and accepting connections and
+ on the server with the destination database, run the following command:
+
+
+pg_probackup catchup -b catchup-mode --source-pgdata=path_to_pgdata_on_remote_server --destination-pgdata=path_to_local_dir --stream [connection_options] [remote_options]
+
+
+ Where catchup_mode can take one of the
+ following values: FULL, DELTA, or PTRACK.
+
+
+
+
+ FULL — creates a full copy of the PostgreSQL instance.
+ The destination directory must be empty for this mode.
+
+
+
+
+ DELTA — reads all data files in the data directory and
+ creates an incremental copy for pages that have changed
+ since the destination database was shut down cleanly.
+ For this mode, the destination directory must contain a previous
+ copy of the database that was shut down cleanly.
+
+
+
+
+ PTRACK — tracking page changes on the fly,
+ only copies pages that have changed since the point of divergence
+ of the source and destination databases.
+ For this mode, the destination directory must contain a previous
+ copy of the database that was shut down cleanly.
+
+
+
+
+ You can use connection_options to specify
+ the connection to the source database cluster. If it is located on a different server,
+ also specify remote_options.
+ If the source database contains tablespaces that must be located in
+ a different directory, additionally specify the
+ option:
+
+pg_probackup catchup -b catchup-mode --source-pgdata=path_to_pgdata_on_remote_server --destination-pgdata=path_to_local_dir --stream --tablespace-mapping=OLDDIR=NEWDIR
+
+ To run the catchup command on parallel threads, specify the number
+ of threads with the option:
+
+pg_probackup catchup -b catchup-mode --source-pgdata=path_to_pgdata_on_remote_server --destination-pgdata=path_to_local_dir --stream --threads=num_threads
+
+
+
+ For example, assume that a remote standby server with the PostgreSQL instance having /replica-pgdata data directory has fallen behind. To sync this instance with the one in /master-pgdata data directory, you can run
+ the catchup command in the PTRACK mode on four parallel threads as follows:
+
+pg_probackup catchup --source-pgdata=/master-pgdata --destination-pgdata=/replica-pgdata -p 5432 -d postgres -U remote-postgres-user --stream --backup-mode=PTRACK --remote-host=remote-hostname --remote-user=remote-unix-username -j 4
+
+
+
+ Another example shows how you can add a new remote standby server with the PostgreSQL data directory /replica-pgdata by running the catchup command in the FULL mode
+ on four parallel threads:
+
+pg_probackup catchup --source-pgdata=/master-pgdata --destination-pgdata=/replica-pgdata -p 5432 -d postgres -U remote-postgres-user --stream --backup-mode=FULL --remote-host=remote-hostname --remote-user=remote-unix-username -j 4
+
+
+
@@ -4262,6 +4421,121 @@ pg_probackup archive-get -B backup_dir --instance Archiving Options.
+
+
+ catchup
+
+pg_probackup catchup -b catchup_mode
+--source-pgdata=path_to_pgdata_on_remote_server
+--destination-pgdata=path_to_local_dir
+[--help] [--stream] [-j num_threads]
+[-T OLDDIR=NEWDIR]
+[connection_options] [remote_options]
+
+
+ Creates a copy of a PostgreSQL
+ instance without using the backup catalog.
+
+
+
+
+
+
+
+ Specifies the catchup mode to use. Possible values are:
+
+
+
+
+ FULL — creates a full copy of the PostgreSQL instance.
+
+
+
+
+ DELTA — reads all data files in the data directory and
+ creates an incremental copy for pages that have changed
+ since the destination database was shut down cleanly.
+
+
+
+
+ PTRACK — tracking page changes on the fly,
+ only copies pages that have changed since the point of divergence
+ of the source and destination databases.
+
+
+
+
+
+
+
+
+
+
+
+ Specifies the path to the data directory of the instance to be copied. The path can be local or remote.
+
+
+
+
+
+
+
+
+ Specifies the path to the local data directory to copy to.
+
+
+
+
+
+
+
+
+ Makes a STREAM backup, which
+ includes all the necessary WAL files by streaming them from
+ the database server via replication protocol.
+
+
+
+
+
+
+
+
+
+ Sets the number of parallel threads for
+ catchup process.
+
+
+
+
+
+
+
+
+
+ Relocates the tablespace from the OLDDIR to the NEWDIR
+ directory at the time of recovery. Both OLDDIR and NEWDIR must
+ be absolute paths. If the path contains the equals sign (=),
+ escape it with a backslash. This option can be specified
+ multiple times for multiple tablespaces.
+
+
+
+
+
+
+
+
+ Additionally, connection
+ options, remote
+ mode options can be used.
+
+
+ For details on usage, see the section
+ Cloning PostgreSQL Instance.
+
+
Options
@@ -4651,7 +4925,7 @@ pg_probackup archive-get -B backup_dir --instance
- Disable the coloring for console log messages of warning and error levels.
+ Disable coloring for console log messages of warning and error levels.
@@ -4804,7 +5078,8 @@ pg_probackup archive-get -B backup_dir --instance Connection Options
You can use these options together with
- and
+
+ , , and
commands.
@@ -5095,6 +5370,7 @@ pg_probackup archive-get -B backup_dir --instance ,
,
,
+ ,
,
, and
commands.
diff --git a/src/archive.c b/src/archive.c
index 4058cd0d4..7bb8c1c03 100644
--- a/src/archive.c
+++ b/src/archive.c
@@ -148,7 +148,7 @@ do_archive_push(InstanceState *instanceState, InstanceConfig *instance, char *wa
elog(ERROR, "getcwd() error");
/* verify that archive-push --instance parameter is valid */
- system_id = get_system_identifier(current_dir);
+ system_id = get_system_identifier(current_dir, FIO_DB_HOST);
if (instance->pgdata == NULL)
elog(ERROR, "Cannot read pg_probackup.conf for this instance");
diff --git a/src/backup.c b/src/backup.c
index 688afefca..2d834410a 100644
--- a/src/backup.c
+++ b/src/backup.c
@@ -94,7 +94,6 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
{
int i;
char external_prefix[MAXPGPATH]; /* Temp value. Used as template */
- char dst_backup_path[MAXPGPATH];
char label[1024];
XLogRecPtr prev_backup_start_lsn = InvalidXLogRecPtr;
@@ -137,7 +136,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
#if PG_VERSION_NUM >= 90600
current.tli = get_current_timeline(backup_conn);
#else
- current.tli = get_current_timeline_from_control(false);
+ current.tli = get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false);
#endif
/*
@@ -258,17 +257,19 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
/* start stream replication */
if (current.stream)
{
- join_path_components(dst_backup_path, current.database_dir, PG_XLOG_DIR);
- fio_mkdir(dst_backup_path, DIR_PERMISSION, FIO_BACKUP_HOST);
+ char stream_xlog_path[MAXPGPATH];
- start_WAL_streaming(backup_conn, dst_backup_path, &instance_config.conn_opt,
+ join_path_components(stream_xlog_path, current.database_dir, PG_XLOG_DIR);
+ fio_mkdir(stream_xlog_path, DIR_PERMISSION, FIO_BACKUP_HOST);
+
+ start_WAL_streaming(backup_conn, stream_xlog_path, &instance_config.conn_opt,
current.start_lsn, current.tli);
/* Make sure that WAL streaming is working
* PAGE backup in stream mode is waited twice, first for
* segment in WAL archive and then for streamed segment
*/
- wait_wal_lsn(dst_backup_path, current.start_lsn, true, current.tli, false, true, ERROR, true);
+ wait_wal_lsn(stream_xlog_path, current.start_lsn, true, current.tli, false, true, ERROR, true);
}
/* initialize backup's file list */
@@ -315,23 +316,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn,
elog(ERROR, "PGDATA is almost empty. Either it was concurrently deleted or "
"pg_probackup do not possess sufficient permissions to list PGDATA content");
- /* Calculate pgdata_bytes */
- for (i = 0; i < parray_num(backup_files_list); i++)
- {
- pgFile *file = (pgFile *) parray_get(backup_files_list, i);
-
- if (file->external_dir_num != 0)
- continue;
-
- if (S_ISDIR(file->mode))
- {
- current.pgdata_bytes += 4096;
- continue;
- }
-
- current.pgdata_bytes += file->size;
- }
-
+ current.pgdata_bytes += calculate_datasize_of_filelist(backup_files_list);
pretty_size(current.pgdata_bytes, pretty_bytes, lengthof(pretty_bytes));
elog(INFO, "PGDATA size: %s", pretty_bytes);
@@ -697,7 +682,7 @@ pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo)
if (nodeInfo->is_superuser)
elog(WARNING, "Current PostgreSQL role is superuser. "
- "It is not recommended to run backup or checkdb as superuser.");
+ "It is not recommended to run pg_probackup under superuser.");
strlcpy(current.server_version, nodeInfo->server_version_str,
sizeof(current.server_version));
@@ -786,7 +771,7 @@ do_backup(InstanceState *instanceState, pgSetBackupParams *set_backup_params,
// elog(WARNING, "ptrack_version_num %d", ptrack_version_num);
if (nodeInfo.ptrack_version_num > 0)
- nodeInfo.is_ptrack_enable = pg_ptrack_enable(backup_conn, nodeInfo.ptrack_version_num);
+ nodeInfo.is_ptrack_enabled = pg_is_ptrack_enabled(backup_conn, nodeInfo.ptrack_version_num);
if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
{
@@ -795,7 +780,7 @@ do_backup(InstanceState *instanceState, pgSetBackupParams *set_backup_params,
elog(ERROR, "This PostgreSQL instance does not support ptrack");
else
{
- if (!nodeInfo.is_ptrack_enable)
+ if (!nodeInfo.is_ptrack_enabled)
elog(ERROR, "Ptrack is disabled");
}
}
@@ -953,12 +938,12 @@ check_server_version(PGconn *conn, PGNodeInfo *nodeInfo)
* All system identifiers must be equal.
*/
void
-check_system_identifiers(PGconn *conn, char *pgdata)
+check_system_identifiers(PGconn *conn, const char *pgdata)
{
uint64 system_id_conn;
uint64 system_id_pgdata;
- system_id_pgdata = get_system_identifier(pgdata);
+ system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST);
system_id_conn = get_remote_system_identifier(conn);
/* for checkdb check only system_id_pgdata and system_id_conn */
@@ -1069,7 +1054,7 @@ pg_start_backup(const char *label, bool smooth, pgBackup *backup,
* Switch to a new WAL segment. It should be called only for master.
* For PG 9.5 it should be called only if pguser is superuser.
*/
-static void
+void
pg_switch_wal(PGconn *conn)
{
PGresult *res;
@@ -2282,7 +2267,7 @@ process_block_change(ForkNumber forknum, RelFileNode rnode, BlockNumber blkno)
}
-static void
+void
check_external_for_tablespaces(parray *external_list, PGconn *backup_conn)
{
PGresult *res;
@@ -2346,3 +2331,36 @@ check_external_for_tablespaces(parray *external_list, PGconn *backup_conn)
}
}
}
+
+/*
+ * Calculate pgdata_bytes
+ * accepts (parray *) of (pgFile *)
+ */
+int64
+calculate_datasize_of_filelist(parray *filelist)
+{
+ int64 bytes = 0;
+ int i;
+
+ /* parray_num don't check for NULL */
+ if (filelist == NULL)
+ return 0;
+
+ for (i = 0; i < parray_num(filelist); i++)
+ {
+ pgFile *file = (pgFile *) parray_get(filelist, i);
+
+ if (file->external_dir_num != 0)
+ continue;
+
+ if (S_ISDIR(file->mode))
+ {
+ // TODO is a dir always 4K?
+ bytes += 4096;
+ continue;
+ }
+
+ bytes += file->size;
+ }
+ return bytes;
+}
diff --git a/src/catalog.c b/src/catalog.c
index f8af2b72e..9775968b8 100644
--- a/src/catalog.c
+++ b/src/catalog.c
@@ -2883,7 +2883,7 @@ pgNodeInit(PGNodeInfo *node)
node->server_version_str[0] = '\0';
node->ptrack_version_num = 0;
- node->is_ptrack_enable = false;
+ node->is_ptrack_enabled = false;
node->ptrack_schema = NULL;
}
diff --git a/src/catchup.c b/src/catchup.c
new file mode 100644
index 000000000..f80a0f0f9
--- /dev/null
+++ b/src/catchup.c
@@ -0,0 +1,1020 @@
+/*-------------------------------------------------------------------------
+ *
+ * catchup.c: sync DB cluster
+ *
+ * Copyright (c) 2021, Postgres Professional
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "pg_probackup.h"
+
+#if PG_VERSION_NUM < 110000
+#include "catalog/catalog.h"
+#endif
+#include "catalog/pg_tablespace.h"
+#include "access/timeline.h"
+#include "pgtar.h"
+#include "streamutil.h"
+
+#include
+#include
+#include
+
+#include "utils/thread.h"
+#include "utils/file.h"
+
+/*
+ * Catchup routines
+ */
+static PGconn *catchup_collect_info(PGNodeInfo *source_node_info, const char *source_pgdata, const char *dest_pgdata);
+static void catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn, const char *source_pgdata,
+ const char *dest_pgdata);
+static void catchup_check_tablespaces_existance_in_tbsmapping(PGconn *conn);
+static parray* catchup_get_tli_history(ConnectionOptions *conn_opt, TimeLineID tli);
+
+//REVIEW The name of this function looks strange to me.
+//Maybe catchup_init_state() or catchup_setup() will do better?
+//I'd also suggest to wrap all these fields into some CatchupState, but it isn't urgent.
+/*
+ * Prepare for work: fill some globals, open connection to source database
+ */
+static PGconn *
+catchup_collect_info(PGNodeInfo *source_node_info, const char *source_pgdata, const char *dest_pgdata)
+{
+ PGconn *source_conn;
+
+ /* Initialize PGInfonode */
+ pgNodeInit(source_node_info);
+
+ /* Get WAL segments size and system ID of source PG instance */
+ instance_config.xlog_seg_size = get_xlog_seg_size(source_pgdata);
+ instance_config.system_identifier = get_system_identifier(source_pgdata, FIO_DB_HOST);
+ current.start_time = time(NULL);
+
+ StrNCpy(current.program_version, PROGRAM_VERSION, sizeof(current.program_version));
+
+ /* Do some compatibility checks and fill basic info about PG instance */
+ source_conn = pgdata_basic_setup(instance_config.conn_opt, source_node_info);
+
+#if PG_VERSION_NUM >= 110000
+ if (!RetrieveWalSegSize(source_conn))
+ elog(ERROR, "Failed to retrieve wal_segment_size");
+#endif
+
+ get_ptrack_version(source_conn, source_node_info);
+ if (source_node_info->ptrack_version_num > 0)
+ source_node_info->is_ptrack_enabled = pg_is_ptrack_enabled(source_conn, source_node_info->ptrack_version_num);
+
+ /* Obtain current timeline */
+#if PG_VERSION_NUM >= 90600
+ current.tli = get_current_timeline(source_conn);
+#else
+ instance_config.pgdata = source_pgdata;
+ current.tli = get_current_timeline_from_control(source_pgdata, FIO_DB_HOST, false);
+#endif
+
+ elog(INFO, "Catchup start, pg_probackup version: %s, "
+ "PostgreSQL version: %s, "
+ "remote: %s, source-pgdata: %s, destination-pgdata: %s",
+ PROGRAM_VERSION, source_node_info->server_version_str,
+ IsSshProtocol() ? "true" : "false",
+ source_pgdata, dest_pgdata);
+
+ if (current.from_replica)
+ elog(INFO, "Running catchup from standby");
+
+ return source_conn;
+}
+
+/*
+ * Check that catchup can be performed on source and dest
+ * this function is for checks, that can be performed without modification of data on disk
+ */
+static void
+catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn,
+ const char *source_pgdata, const char *dest_pgdata)
+{
+ /* TODO
+ * gsmol - fallback to FULL mode if dest PGDATA is empty
+ * kulaginm -- I think this is a harmful feature. If user requested an incremental catchup, then
+ * he expects that this will be done quickly and efficiently. If, for example, he made a mistake
+ * with dest_dir, then he will receive a second full copy instead of an error message, and I think
+ * that in some cases he would prefer the error.
+ * I propose in future versions to offer a backup_mode auto, in which we will look to the dest_dir
+ * and decide which of the modes will be the most effective.
+ * I.e.:
+ * if(requested_backup_mode == BACKUP_MODE_DIFF_AUTO)
+ * {
+ * if(dest_pgdata_is_empty)
+ * backup_mode = BACKUP_MODE_FULL;
+ * else
+ * if(ptrack supported and applicable)
+ * backup_mode = BACKUP_MODE_DIFF_PTRACK;
+ * else
+ * backup_mode = BACKUP_MODE_DIFF_DELTA;
+ * }
+ */
+
+ if (dir_is_empty(dest_pgdata, FIO_LOCAL_HOST))
+ {
+ if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK ||
+ current.backup_mode == BACKUP_MODE_DIFF_DELTA)
+ elog(ERROR, "\"%s\" is empty, but incremental catchup mode requested.",
+ dest_pgdata);
+ }
+ else /* dest dir not empty */
+ {
+ if (current.backup_mode == BACKUP_MODE_FULL)
+ elog(ERROR, "Can't perform full catchup into non-empty directory \"%s\".",
+ dest_pgdata);
+ }
+
+ /* check that postmaster is not running in destination */
+ if (current.backup_mode != BACKUP_MODE_FULL)
+ {
+ pid_t pid;
+ pid = fio_check_postmaster(dest_pgdata, FIO_LOCAL_HOST);
+ if (pid == 1) /* postmaster.pid is mangled */
+ {
+ char pid_filename[MAXPGPATH];
+ join_path_components(pid_filename, dest_pgdata, "postmaster.pid");
+ elog(ERROR, "Pid file \"%s\" is mangled, cannot determine whether postmaster is running or not",
+ pid_filename);
+ }
+ else if (pid > 1) /* postmaster is up */
+ {
+ elog(ERROR, "Postmaster with pid %u is running in destination directory \"%s\"",
+ pid, dest_pgdata);
+ }
+ }
+
+ /* check backup_label absence in dest */
+ if (current.backup_mode != BACKUP_MODE_FULL)
+ {
+ char backup_label_filename[MAXPGPATH];
+
+ join_path_components(backup_label_filename, dest_pgdata, PG_BACKUP_LABEL_FILE);
+ if (fio_access(backup_label_filename, F_OK, FIO_LOCAL_HOST) == 0)
+ elog(ERROR, "Destination directory contains \"" PG_BACKUP_LABEL_FILE "\" file");
+ }
+
+ /* check that destination database is shutdowned cleanly */
+ if (current.backup_mode != BACKUP_MODE_FULL)
+ {
+ DBState state;
+ state = get_system_dbstate(dest_pgdata, FIO_LOCAL_HOST);
+ /* see states in postgres sources (src/include/catalog/pg_control.h) */
+ if (state != DB_SHUTDOWNED && state != DB_SHUTDOWNED_IN_RECOVERY)
+ elog(ERROR, "Postmaster in destination directory \"%s\" must be stopped cleanly",
+ dest_pgdata);
+ }
+
+ /* Check that connected PG instance, source and destination PGDATA are the same */
+ {
+ uint64 source_conn_id, source_id, dest_id;
+
+ source_conn_id = get_remote_system_identifier(source_conn);
+ source_id = get_system_identifier(source_pgdata, FIO_DB_HOST); /* same as instance_config.system_identifier */
+
+ if (source_conn_id != source_id)
+ elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu",
+ source_conn_id, source_pgdata, source_id);
+
+ if (current.backup_mode != BACKUP_MODE_FULL)
+ {
+ dest_id = get_system_identifier(dest_pgdata, FIO_LOCAL_HOST);
+ if (source_conn_id != dest_id)
+ elog(ERROR, "Database identifiers mismatch: we connected to DB id %lu, but in \"%s\" we found id %lu",
+ source_conn_id, dest_pgdata, dest_id);
+ }
+ }
+
+ /* check PTRACK version */
+ if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
+ {
+ if (source_node_info->ptrack_version_num == 0)
+ elog(ERROR, "This PostgreSQL instance does not support ptrack");
+ else if (source_node_info->ptrack_version_num < 200)
+ elog(ERROR, "ptrack extension is too old.\n"
+ "Upgrade ptrack to version >= 2");
+ else if (!source_node_info->is_ptrack_enabled)
+ elog(ERROR, "Ptrack is disabled");
+ }
+
+ if (current.from_replica && exclusive_backup)
+ elog(ERROR, "Catchup from standby is only available for PostgreSQL >= 9.6");
+
+ /* check that we don't overwrite tablespace in source pgdata */
+ catchup_check_tablespaces_existance_in_tbsmapping(source_conn);
+
+ /* check timelines */
+ if (current.backup_mode != BACKUP_MODE_FULL)
+ {
+ RedoParams dest_redo = { 0, InvalidXLogRecPtr, 0 };
+
+ /* fill dest_redo.lsn and dest_redo.tli */
+ get_redo(dest_pgdata, FIO_LOCAL_HOST, &dest_redo);
+
+ if (current.tli != 1)
+ {
+ parray *source_timelines; /* parray* of TimeLineHistoryEntry* */
+ source_timelines = catchup_get_tli_history(&instance_config.conn_opt, current.tli);
+
+ if (source_timelines == NULL)
+ elog(ERROR, "Cannot get source timeline history");
+
+ if (!satisfy_timeline(source_timelines, dest_redo.tli, dest_redo.lsn))
+ elog(ERROR, "Destination is not in source timeline history");
+
+ parray_walk(source_timelines, pfree);
+ parray_free(source_timelines);
+ }
+ else /* special case -- no history files in source */
+ {
+ if (dest_redo.tli != 1)
+ elog(ERROR, "Source is behind destination in timeline history");
+ }
+ }
+}
+
+/*
+ * Check that all tablespaces exists in tablespace mapping (--tablespace-mapping option)
+ * Check that all local mapped directories is empty if it is local FULL catchup
+ * Emit fatal error if that (not existent in map or not empty) tablespace found
+ */
+static void
+catchup_check_tablespaces_existance_in_tbsmapping(PGconn *conn)
+{
+ PGresult *res;
+ int i;
+ char *tablespace_path = NULL;
+ const char *linked_path = NULL;
+ char *query = "SELECT pg_catalog.pg_tablespace_location(oid) "
+ "FROM pg_catalog.pg_tablespace "
+ "WHERE pg_catalog.pg_tablespace_location(oid) <> '';";
+
+ res = pgut_execute(conn, query, 0, NULL);
+
+ if (!res)
+ elog(ERROR, "Failed to get list of tablespaces");
+
+ for (i = 0; i < res->ntups; i++)
+ {
+ tablespace_path = PQgetvalue(res, i, 0);
+ Assert (strlen(tablespace_path) > 0);
+
+ canonicalize_path(tablespace_path);
+ linked_path = get_tablespace_mapping(tablespace_path);
+
+ if (strcmp(tablespace_path, linked_path) == 0)
+ /* same result -> not found in mapping */
+ {
+ if (!fio_is_remote(FIO_DB_HOST))
+ elog(ERROR, "Local catchup executed, but source database contains "
+ "tablespace (\"%s\"), that is not listed in the map", tablespace_path);
+ else
+ elog(WARNING, "Remote catchup executed and source database contains "
+ "tablespace (\"%s\"), that is not listed in the map", tablespace_path);
+ }
+
+ if (!is_absolute_path(linked_path))
+ elog(ERROR, "Tablespace directory path must be an absolute path: \"%s\"",
+ linked_path);
+
+ if (current.backup_mode == BACKUP_MODE_FULL
+ && !dir_is_empty(linked_path, FIO_LOCAL_HOST))
+ elog(ERROR, "Target mapped tablespace directory (\"%s\") is not empty in FULL catchup",
+ linked_path);
+ }
+ PQclear(res);
+}
+
+/*
+ * Get timeline history via replication connection
+ * returns parray* of TimeLineHistoryEntry*
+ */
+static parray*
+catchup_get_tli_history(ConnectionOptions *conn_opt, TimeLineID tli)
+{
+ PGresult *res;
+ PGconn *conn;
+ char *history;
+ char query[128];
+ parray *result = NULL;
+
+ snprintf(query, sizeof(query), "TIMELINE_HISTORY %u", tli);
+
+ /*
+ * Connect in replication mode to the server.
+ */
+ conn = pgut_connect_replication(conn_opt->pghost,
+ conn_opt->pgport,
+ conn_opt->pgdatabase,
+ conn_opt->pguser,
+ false);
+
+ if (!conn)
+ return NULL;
+
+ res = PQexec(conn, query);
+ PQfinish(conn);
+
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ elog(WARNING, "Could not send replication command \"%s\": %s",
+ query, PQresultErrorMessage(res));
+ PQclear(res);
+ return NULL;
+ }
+
+ /*
+ * The response to TIMELINE_HISTORY is a single row result set
+ * with two fields: filename and content
+ */
+ if (PQnfields(res) != 2 || PQntuples(res) != 1)
+ {
+ elog(ERROR, "Unexpected response to TIMELINE_HISTORY command: "
+ "got %d rows and %d fields, expected %d rows and %d fields",
+ PQntuples(res), PQnfields(res), 1, 2);
+ PQclear(res);
+ return NULL;
+ }
+
+ history = pgut_strdup(PQgetvalue(res, 0, 1));
+ result = parse_tli_history_buffer(history, tli);
+
+ /* some cleanup */
+ pg_free(history);
+ PQclear(res);
+
+ return result;
+}
+
+/*
+ * catchup multithreaded copy rountine and helper structure and function
+ */
+
+/* parameters for catchup_thread_runner() passed from catchup_multithreaded_copy() */
+typedef struct
+{
+ PGNodeInfo *nodeInfo;
+ const char *from_root;
+ const char *to_root;
+ parray *source_filelist;
+ parray *dest_filelist;
+ XLogRecPtr sync_lsn;
+ BackupMode backup_mode;
+ int thread_num;
+ bool completed;
+} catchup_thread_runner_arg;
+
+/* Catchup file copier executed in separate thread */
+static void *
+catchup_thread_runner(void *arg)
+{
+ int i;
+ char from_fullpath[MAXPGPATH];
+ char to_fullpath[MAXPGPATH];
+
+ catchup_thread_runner_arg *arguments = (catchup_thread_runner_arg *) arg;
+ int n_files = parray_num(arguments->source_filelist);
+
+ /* catchup a file */
+ for (i = 0; i < n_files; i++)
+ {
+ pgFile *file = (pgFile *) parray_get(arguments->source_filelist, i);
+ pgFile *dest_file = NULL;
+
+ /* We have already copied all directories */
+ if (S_ISDIR(file->mode))
+ continue;
+
+ if (!pg_atomic_test_set_flag(&file->lock))
+ continue;
+
+ /* check for interrupt */
+ if (interrupted || thread_interrupted)
+ elog(ERROR, "Interrupted during catchup");
+
+ if (progress)
+ elog(INFO, "Progress: (%d/%d). Process file \"%s\"",
+ i + 1, n_files, file->rel_path);
+
+ /* construct destination filepath */
+ Assert(file->external_dir_num == 0);
+ join_path_components(from_fullpath, arguments->from_root, file->rel_path);
+ join_path_components(to_fullpath, arguments->to_root, file->rel_path);
+
+ /* Encountered some strange beast */
+ if (!S_ISREG(file->mode))
+ elog(WARNING, "Unexpected type %d of file \"%s\", skipping",
+ file->mode, from_fullpath);
+
+ /* Check that file exist in dest pgdata */
+ if (arguments->backup_mode != BACKUP_MODE_FULL)
+ {
+ pgFile **dest_file_tmp = NULL;
+ dest_file_tmp = (pgFile **) parray_bsearch(arguments->dest_filelist,
+ file, pgFileCompareRelPathWithExternal);
+ if (dest_file_tmp)
+ {
+ /* File exists in destination PGDATA */
+ file->exists_in_prev = true;
+ dest_file = *dest_file_tmp;
+ }
+ }
+
+ /* Do actual work */
+ if (file->is_datafile && !file->is_cfs)
+ {
+ catchup_data_file(file, from_fullpath, to_fullpath,
+ arguments->sync_lsn,
+ arguments->backup_mode,
+ NONE_COMPRESS,
+ 0,
+ arguments->nodeInfo->checksum_version,
+ arguments->nodeInfo->ptrack_version_num,
+ arguments->nodeInfo->ptrack_schema,
+ false,
+ dest_file != NULL ? dest_file->size : 0);
+ }
+ else
+ {
+ backup_non_data_file(file, dest_file, from_fullpath, to_fullpath,
+ arguments->backup_mode, current.parent_backup, true);
+ }
+
+ if (file->write_size == FILE_NOT_FOUND)
+ continue;
+
+ if (file->write_size == BYTES_INVALID)
+ {
+ elog(VERBOSE, "Skipping the unchanged file: \"%s\", read %li bytes", from_fullpath, file->read_size);
+ continue;
+ }
+
+ elog(VERBOSE, "File \"%s\". Copied "INT64_FORMAT " bytes",
+ from_fullpath, file->write_size);
+ }
+
+ /* ssh connection to longer needed */
+ fio_disconnect();
+
+ /* Data files transferring is successful */
+ arguments->completed = true;
+
+ return NULL;
+}
+
+/*
+ * main multithreaded copier
+ */
+static bool
+catchup_multithreaded_copy(int num_threads,
+ PGNodeInfo *source_node_info,
+ const char *source_pgdata_path,
+ const char *dest_pgdata_path,
+ parray *source_filelist,
+ parray *dest_filelist,
+ XLogRecPtr sync_lsn,
+ BackupMode backup_mode)
+{
+ /* arrays with meta info for multi threaded catchup */
+ catchup_thread_runner_arg *threads_args;
+ pthread_t *threads;
+
+ bool all_threads_successful = true;
+ int i;
+
+ /* init thread args */
+ threads_args = (catchup_thread_runner_arg *) palloc(sizeof(catchup_thread_runner_arg) * num_threads);
+ for (i = 0; i < num_threads; i++)
+ threads_args[i] = (catchup_thread_runner_arg){
+ .nodeInfo = source_node_info,
+ .from_root = source_pgdata_path,
+ .to_root = dest_pgdata_path,
+ .source_filelist = source_filelist,
+ .dest_filelist = dest_filelist,
+ .sync_lsn = sync_lsn,
+ .backup_mode = backup_mode,
+ .thread_num = i + 1,
+ .completed = false,
+ };
+
+ /* Run threads */
+ thread_interrupted = false;
+ threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);
+ for (i = 0; i < num_threads; i++)
+ {
+ elog(VERBOSE, "Start thread num: %i", i);
+ pthread_create(&threads[i], NULL, &catchup_thread_runner, &(threads_args[i]));
+ }
+
+ /* Wait threads */
+ for (i = 0; i < num_threads; i++)
+ {
+ pthread_join(threads[i], NULL);
+ all_threads_successful &= threads_args[i].completed;
+ }
+
+ free(threads);
+ free(threads_args);
+ return all_threads_successful;
+}
+
+/*
+ *
+ */
+static void
+catchup_sync_destination_files(const char* pgdata_path, fio_location location, parray *filelist, pgFile *pg_control_file)
+{
+ char fullpath[MAXPGPATH];
+ time_t start_time, end_time;
+ char pretty_time[20];
+ int i;
+
+ elog(INFO, "Syncing copied files to disk");
+ time(&start_time);
+
+ for (i = 0; i < parray_num(filelist); i++)
+ {
+ pgFile *file = (pgFile *) parray_get(filelist, i);
+
+ /* TODO: sync directory ? */
+ if (S_ISDIR(file->mode))
+ continue;
+
+ Assert(file->external_dir_num == 0);
+ join_path_components(fullpath, pgdata_path, file->rel_path);
+ if (fio_sync(fullpath, location) != 0)
+ elog(ERROR, "Cannot sync file \"%s\": %s", fullpath, strerror(errno));
+ }
+
+ /*
+ * sync pg_control file
+ */
+ join_path_components(fullpath, pgdata_path, pg_control_file->rel_path);
+ if (fio_sync(fullpath, location) != 0)
+ elog(ERROR, "Cannot sync file \"%s\": %s", fullpath, strerror(errno));
+
+ time(&end_time);
+ pretty_time_interval(difftime(end_time, start_time),
+ pretty_time, lengthof(pretty_time));
+ elog(INFO, "Files are synced, time elapsed: %s", pretty_time);
+}
+
+/*
+ * Entry point of pg_probackup CATCHUP subcommand.
+ */
+int
+do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads, bool sync_dest_files)
+{
+ PGconn *source_conn = NULL;
+ PGNodeInfo source_node_info;
+ bool backup_logs = false;
+ parray *source_filelist = NULL;
+ pgFile *source_pg_control_file = NULL;
+ parray *dest_filelist = NULL;
+ char dest_xlog_path[MAXPGPATH];
+
+ RedoParams dest_redo = { 0, InvalidXLogRecPtr, 0 };
+ PGStopBackupResult stop_backup_result;
+ bool catchup_isok = true;
+
+ int i;
+
+ /* for fancy reporting */
+ time_t start_time, end_time;
+ char pretty_time[20];
+ char pretty_bytes[20];
+
+ source_conn = catchup_collect_info(&source_node_info, source_pgdata, dest_pgdata);
+ catchup_preflight_checks(&source_node_info, source_conn, source_pgdata, dest_pgdata);
+
+ elog(LOG, "Database catchup start");
+
+ {
+ char label[1024];
+ /* notify start of backup to PostgreSQL server */
+ time2iso(label, lengthof(label), current.start_time, false);
+ strncat(label, " with pg_probackup", lengthof(label) -
+ strlen(" with pg_probackup"));
+
+ /* Call pg_start_backup function in PostgreSQL connect */
+ pg_start_backup(label, smooth_checkpoint, ¤t, &source_node_info, source_conn);
+ elog(LOG, "pg_start_backup START LSN %X/%X", (uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn));
+ }
+
+ //REVIEW I wonder, if we can move this piece above and call before pg_start backup()?
+ //It seems to be a part of setup phase.
+ if (current.backup_mode != BACKUP_MODE_FULL)
+ {
+ dest_filelist = parray_new();
+ dir_list_file(dest_filelist, dest_pgdata,
+ true, true, false, backup_logs, true, 0, FIO_LOCAL_HOST);
+
+ // fill dest_redo.lsn and dest_redo.tli
+ get_redo(dest_pgdata, FIO_LOCAL_HOST, &dest_redo);
+ elog(INFO, "syncLSN = %X/%X", (uint32) (dest_redo.lsn >> 32), (uint32) dest_redo.lsn);
+
+ /*
+ * Future improvement to catch partial catchup:
+ * 1. rename dest pg_control into something like pg_control.pbk
+ * (so user can't start partial catchup'ed instance from this point)
+ * 2. try to read by get_redo() pg_control and pg_control.pbk (to detect partial catchup)
+ * 3. at the end (after copy of correct pg_control), remove pg_control.pbk
+ */
+ }
+
+ //REVIEW I wonder, if we can move this piece above and call before pg_start backup()?
+ //It seems to be a part of setup phase.
+ /*
+ * TODO: move to separate function to use in both backup.c and catchup.c
+ */
+ if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
+ {
+ XLogRecPtr ptrack_lsn = get_last_ptrack_lsn(source_conn, &source_node_info);
+
+ // new ptrack is more robust and checks Start LSN
+ if (ptrack_lsn > dest_redo.lsn || ptrack_lsn == InvalidXLogRecPtr)
+ elog(ERROR, "LSN from ptrack_control in source %X/%X is greater than checkpoint LSN in destination %X/%X.\n"
+ "You can perform only FULL catchup.",
+ (uint32) (ptrack_lsn >> 32), (uint32) (ptrack_lsn),
+ (uint32) (dest_redo.lsn >> 32),
+ (uint32) (dest_redo.lsn));
+ }
+
+ /* Check that dest_redo.lsn is less than current.start_lsn */
+ if (current.backup_mode != BACKUP_MODE_FULL &&
+ dest_redo.lsn > current.start_lsn)
+ elog(ERROR, "Current START LSN %X/%X is lower than SYNC LSN %X/%X, "
+ "it may indicate that we are trying to catchup with PostgreSQL instance from the past",
+ (uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn),
+ (uint32) (dest_redo.lsn >> 32), (uint32) (dest_redo.lsn));
+
+ /* Start stream replication */
+ join_path_components(dest_xlog_path, dest_pgdata, PG_XLOG_DIR);
+ fio_mkdir(dest_xlog_path, DIR_PERMISSION, FIO_LOCAL_HOST);
+ start_WAL_streaming(source_conn, dest_xlog_path, &instance_config.conn_opt,
+ current.start_lsn, current.tli);
+
+ source_filelist = parray_new();
+
+ /* list files with the logical path. omit $PGDATA */
+ if (fio_is_remote(FIO_DB_HOST))
+ fio_list_dir(source_filelist, source_pgdata,
+ true, true, false, backup_logs, true, 0);
+ else
+ dir_list_file(source_filelist, source_pgdata,
+ true, true, false, backup_logs, true, 0, FIO_LOCAL_HOST);
+
+ //REVIEW FIXME. Let's fix that before release.
+ // TODO filter pg_xlog/wal?
+ // TODO what if wal is not a dir (symlink to a dir)?
+
+ /* close ssh session in main thread */
+ fio_disconnect();
+
+ //REVIEW Do we want to do similar calculation for dest?
+ current.pgdata_bytes += calculate_datasize_of_filelist(source_filelist);
+ pretty_size(current.pgdata_bytes, pretty_bytes, lengthof(pretty_bytes));
+ elog(INFO, "Source PGDATA size: %s", pretty_bytes);
+
+ /*
+ * Sort pathname ascending. It is necessary to create intermediate
+ * directories sequentially.
+ *
+ * For example:
+ * 1 - create 'base'
+ * 2 - create 'base/1'
+ *
+ * Sorted array is used at least in parse_filelist_filenames(),
+ * extractPageMap(), make_pagemap_from_ptrack().
+ */
+ parray_qsort(source_filelist, pgFileCompareRelPathWithExternal);
+
+ /* Extract information about files in source_filelist parsing their names:*/
+ parse_filelist_filenames(source_filelist, source_pgdata);
+
+ elog(LOG, "Start LSN (source): %X/%X, TLI: %X",
+ (uint32) (current.start_lsn >> 32), (uint32) (current.start_lsn),
+ current.tli);
+ if (current.backup_mode != BACKUP_MODE_FULL)
+ elog(LOG, "LSN in destination: %X/%X, TLI: %X",
+ (uint32) (dest_redo.lsn >> 32), (uint32) (dest_redo.lsn),
+ dest_redo.tli);
+
+ /* Build page mapping in PTRACK mode */
+ if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK)
+ {
+ time(&start_time);
+ elog(INFO, "Extracting pagemap of changed blocks");
+
+ /* Build the page map from ptrack information */
+ make_pagemap_from_ptrack_2(source_filelist, source_conn,
+ source_node_info.ptrack_schema,
+ source_node_info.ptrack_version_num,
+ dest_redo.lsn);
+ time(&end_time);
+ elog(INFO, "Pagemap successfully extracted, time elapsed: %.0f sec",
+ difftime(end_time, start_time));
+ }
+
+ /*
+ * Make directories before catchup
+ */
+ /*
+ * We iterate over source_filelist and for every directory with parent 'pg_tblspc'
+ * we must lookup this directory name in tablespace map.
+ * If we got a match, we treat this directory as tablespace.
+ * It means that we create directory specified in tablespace_map and
+ * original directory created as symlink to it.
+ */
+ for (i = 0; i < parray_num(source_filelist); i++)
+ {
+ pgFile *file = (pgFile *) parray_get(source_filelist, i);
+ char parent_dir[MAXPGPATH];
+
+ if (!S_ISDIR(file->mode))
+ continue;
+
+ /*
+ * check if it is fake "directory" and is a tablespace link
+ * this is because we passed the follow_symlink when building the list
+ */
+ /* get parent dir of rel_path */
+ strncpy(parent_dir, file->rel_path, MAXPGPATH);
+ get_parent_directory(parent_dir);
+
+ /* check if directory is actually link to tablespace */
+ if (strcmp(parent_dir, PG_TBLSPC_DIR) != 0)
+ {
+ /* if the entry is a regular directory, create it in the destination */
+ char dirpath[MAXPGPATH];
+
+ join_path_components(dirpath, dest_pgdata, file->rel_path);
+
+ elog(VERBOSE, "Create directory '%s'", dirpath);
+ fio_mkdir(dirpath, DIR_PERMISSION, FIO_LOCAL_HOST);
+ }
+ else
+ {
+ /* this directory located in pg_tblspc */
+ const char *linked_path = NULL;
+ char to_path[MAXPGPATH];
+
+ // TODO perform additional check that this is actually symlink?
+ { /* get full symlink path and map this path to new location */
+ char source_full_path[MAXPGPATH];
+ char symlink_content[MAXPGPATH];
+ join_path_components(source_full_path, source_pgdata, file->rel_path);
+ fio_readlink(source_full_path, symlink_content, sizeof(symlink_content), FIO_DB_HOST);
+ /* we checked that mapping exists in preflight_checks for local catchup */
+ linked_path = get_tablespace_mapping(symlink_content);
+ elog(INFO, "Map tablespace full_path: \"%s\" old_symlink_content: \"%s\" new_symlink_content: \"%s\"\n",
+ source_full_path,
+ symlink_content,
+ linked_path);
+ }
+
+ if (!is_absolute_path(linked_path))
+ elog(ERROR, "Tablespace directory path must be an absolute path: %s\n",
+ linked_path);
+
+ join_path_components(to_path, dest_pgdata, file->rel_path);
+
+ elog(VERBOSE, "Create directory \"%s\" and symbolic link \"%s\"",
+ linked_path, to_path);
+
+ /* create tablespace directory */
+ if (fio_mkdir(linked_path, file->mode, FIO_LOCAL_HOST) != 0)
+ elog(ERROR, "Could not create tablespace directory \"%s\": %s",
+ linked_path, strerror(errno));
+
+ /* create link to linked_path */
+ if (fio_symlink(linked_path, to_path, true, FIO_LOCAL_HOST) < 0)
+ elog(ERROR, "Could not create symbolic link \"%s\" -> \"%s\": %s",
+ linked_path, to_path, strerror(errno));
+ }
+ }
+
+ /*
+ * find pg_control file (in already sorted source_filelist)
+ * and exclude it from list for future special processing
+ */
+ {
+ int control_file_elem_index;
+ pgFile search_key;
+ MemSet(&search_key, 0, sizeof(pgFile));
+ /* pgFileCompareRelPathWithExternal uses only .rel_path and .external_dir_num for comparision */
+ search_key.rel_path = XLOG_CONTROL_FILE;
+ search_key.external_dir_num = 0;
+ control_file_elem_index = parray_bsearch_index(source_filelist, &search_key, pgFileCompareRelPathWithExternal);
+ if(control_file_elem_index < 0)
+ elog(ERROR, "\"%s\" not found in \"%s\"\n", XLOG_CONTROL_FILE, source_pgdata);
+ source_pg_control_file = parray_remove(source_filelist, control_file_elem_index);
+ }
+
+ /*
+ * remove absent source files in dest (dropped tables, etc...)
+ * note: global/pg_control will also be deleted here
+ */
+ if (current.backup_mode != BACKUP_MODE_FULL)
+ {
+ elog(INFO, "Removing redundant files in destination directory");
+ parray_qsort(dest_filelist, pgFileCompareRelPathWithExternalDesc);
+ for (i = 0; i < parray_num(dest_filelist); i++)
+ {
+ bool redundant = true;
+ pgFile *file = (pgFile *) parray_get(dest_filelist, i);
+
+ //TODO optimize it and use some merge-like algorithm
+ //instead of bsearch for each file.
+ if (parray_bsearch(source_filelist, file, pgFileCompareRelPathWithExternal))
+ redundant = false;
+
+ /* pg_filenode.map are always restored, because it's crc cannot be trusted */
+ Assert(file->external_dir_num == 0);
+ if (pg_strcasecmp(file->name, RELMAPPER_FILENAME) == 0)
+ redundant = true;
+
+ //REVIEW This check seems unneded. Anyway we delete only redundant stuff below.
+ /* do not delete the useful internal directories */
+ if (S_ISDIR(file->mode) && !redundant)
+ continue;
+
+ /* if file does not exists in destination list, then we can safely unlink it */
+ if (redundant)
+ {
+ char fullpath[MAXPGPATH];
+
+ join_path_components(fullpath, dest_pgdata, file->rel_path);
+
+ fio_delete(file->mode, fullpath, FIO_DB_HOST);
+ elog(VERBOSE, "Deleted file \"%s\"", fullpath);
+
+ /* shrink pgdata list */
+ pgFileFree(file);
+ parray_remove(dest_filelist, i);
+ i--;
+ }
+ }
+ }
+
+ /* clear file locks */
+ pfilearray_clear_locks(source_filelist);
+
+ /* Sort by size for load balancing */
+ parray_qsort(source_filelist, pgFileCompareSizeDesc);
+
+ /* Sort the array for binary search */
+ if (dest_filelist)
+ parray_qsort(dest_filelist, pgFileCompareRelPathWithExternal);
+
+ /* run copy threads */
+ elog(INFO, "Start transferring data files");
+ time(&start_time);
+ catchup_isok = catchup_multithreaded_copy(num_threads, &source_node_info,
+ source_pgdata, dest_pgdata,
+ source_filelist, dest_filelist,
+ dest_redo.lsn, current.backup_mode);
+
+ /* at last copy control file */
+ if (catchup_isok)
+ {
+ char from_fullpath[MAXPGPATH];
+ char to_fullpath[MAXPGPATH];
+ join_path_components(from_fullpath, source_pgdata, source_pg_control_file->rel_path);
+ join_path_components(to_fullpath, dest_pgdata, source_pg_control_file->rel_path);
+ copy_pgcontrol_file(from_fullpath, FIO_DB_HOST,
+ to_fullpath, FIO_LOCAL_HOST, source_pg_control_file);
+ }
+
+ time(&end_time);
+ pretty_time_interval(difftime(end_time, start_time),
+ pretty_time, lengthof(pretty_time));
+ if (catchup_isok)
+ elog(INFO, "Data files are transferred, time elapsed: %s",
+ pretty_time);
+ else
+ elog(ERROR, "Data files transferring failed, time elapsed: %s",
+ pretty_time);
+
+ /* Notify end of backup */
+ {
+ //REVIEW Is it relevant to catchup? I suppose it isn't, since catchup is a new code.
+ //If we do need it, please write a comment explaining that.
+ /* kludge against some old bug in archive_timeout. TODO: remove in 3.0.0 */
+ int timeout = (instance_config.archive_timeout > 0) ?
+ instance_config.archive_timeout : ARCHIVE_TIMEOUT_DEFAULT;
+ char *stop_backup_query_text = NULL;
+
+ pg_silent_client_messages(source_conn);
+
+ //REVIEW. Do we want to support pg 9.5? I suppose we never test it...
+ //Maybe check it and error out early?
+ /* Create restore point
+ * Only if backup is from master.
+ * For PG 9.5 create restore point only if pguser is superuser.
+ */
+ if (!current.from_replica &&
+ !(source_node_info.server_version < 90600 &&
+ !source_node_info.is_superuser)) //TODO: check correctness
+ pg_create_restore_point(source_conn, current.start_time);
+
+ /* Execute pg_stop_backup using PostgreSQL connection */
+ pg_stop_backup_send(source_conn, source_node_info.server_version, current.from_replica, exclusive_backup, &stop_backup_query_text);
+
+ /*
+ * Wait for the result of pg_stop_backup(), but no longer than
+ * archive_timeout seconds
+ */
+ pg_stop_backup_consume(source_conn, source_node_info.server_version, exclusive_backup, timeout, stop_backup_query_text, &stop_backup_result);
+
+ /* Cleanup */
+ pg_free(stop_backup_query_text);
+ }
+
+ wait_wal_and_calculate_stop_lsn(dest_xlog_path, stop_backup_result.lsn, ¤t);
+
+#if PG_VERSION_NUM >= 90600
+ /* Write backup_label */
+ Assert(stop_backup_result.backup_label_content != NULL);
+ pg_stop_backup_write_file_helper(dest_pgdata, PG_BACKUP_LABEL_FILE, "backup label",
+ stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len,
+ NULL);
+ free(stop_backup_result.backup_label_content);
+ stop_backup_result.backup_label_content = NULL;
+ stop_backup_result.backup_label_content_len = 0;
+
+ /* tablespace_map */
+ if (stop_backup_result.tablespace_map_content != NULL)
+ {
+ // TODO what if tablespace is created during catchup?
+ /* Because we have already created symlinks in pg_tblspc earlier,
+ * we do not need to write the tablespace_map file.
+ * So this call is unnecessary:
+ * pg_stop_backup_write_file_helper(dest_pgdata, PG_TABLESPACE_MAP_FILE, "tablespace map",
+ * stop_backup_result.tablespace_map_content, stop_backup_result.tablespace_map_content_len,
+ * NULL);
+ */
+ free(stop_backup_result.tablespace_map_content);
+ stop_backup_result.tablespace_map_content = NULL;
+ stop_backup_result.tablespace_map_content_len = 0;
+ }
+#endif
+
+ if(wait_WAL_streaming_end(NULL))
+ elog(ERROR, "WAL streaming failed");
+
+ //REVIEW Please add a comment about these lsns. It is a crutial part of the algorithm.
+ current.recovery_xid = stop_backup_result.snapshot_xid;
+
+ elog(LOG, "Getting the Recovery Time from WAL");
+
+ /* iterate over WAL from stop_backup lsn to start_backup lsn */
+ if (!read_recovery_info(dest_xlog_path, current.tli,
+ instance_config.xlog_seg_size,
+ current.start_lsn, current.stop_lsn,
+ ¤t.recovery_time))
+ {
+ elog(LOG, "Failed to find Recovery Time in WAL, forced to trust current_timestamp");
+ current.recovery_time = stop_backup_result.invocation_time;
+ }
+
+ /*
+ * In case of backup from replica >= 9.6 we must fix minRecPoint
+ */
+ if (current.from_replica && !exclusive_backup)
+ {
+ set_min_recovery_point(source_pg_control_file, dest_pgdata, current.stop_lsn);
+ }
+
+ /* close ssh session in main thread */
+ fio_disconnect();
+
+ /* Sync all copied files unless '--no-sync' flag is used */
+ if (catchup_isok)
+ {
+ if (sync_dest_files)
+ catchup_sync_destination_files(dest_pgdata, FIO_LOCAL_HOST, source_filelist, source_pg_control_file);
+ else
+ elog(WARNING, "Files are not synced to disk");
+ }
+
+ /* Cleanup */
+ if (dest_filelist)
+ {
+ parray_walk(dest_filelist, pgFileFree);
+ parray_free(dest_filelist);
+ }
+ parray_walk(source_filelist, pgFileFree);
+ parray_free(source_filelist);
+ pgFileFree(source_pg_control_file);
+
+ //REVIEW: Are we going to do that before release?
+ /* TODO: show the amount of transfered data in bytes and calculate incremental ratio */
+
+ return 0;
+}
diff --git a/src/data.c b/src/data.c
index 314490585..49b696059 100644
--- a/src/data.c
+++ b/src/data.c
@@ -268,7 +268,7 @@ get_checksum_errormsg(Page page, char **errormsg, BlockNumber absolute_blkno)
* PageIsOk(0) if page was successfully retrieved
* PageIsTruncated(-1) if the page was truncated
* SkipCurrentPage(-2) if we need to skip this page,
- * only used for DELTA backup
+ * only used for DELTA and PTRACK backup
* PageIsCorrupted(-3) if the page checksum mismatch
* or header corruption,
* only used for checkdb
@@ -400,7 +400,12 @@ prepare_page(pgFile *file, XLogRecPtr prev_backup_start_lsn,
page_st->lsn > 0 &&
page_st->lsn < prev_backup_start_lsn)
{
- elog(VERBOSE, "Skipping blknum %u in file: \"%s\"", blknum, from_fullpath);
+ elog(VERBOSE, "Skipping blknum %u in file: \"%s\", file->exists_in_prev: %s, page_st->lsn: %X/%X, prev_backup_start_lsn: %X/%X",
+ blknum, from_fullpath,
+ file->exists_in_prev ? "true" : "false",
+ (uint32) (page_st->lsn >> 32), (uint32) page_st->lsn,
+ (uint32) (prev_backup_start_lsn >> 32), (uint32) prev_backup_start_lsn
+ );
return SkipCurrentPage;
}
@@ -458,6 +463,23 @@ compress_and_backup_page(pgFile *file, BlockNumber blknum,
return compressed_size;
}
+/* взята из compress_and_backup_page, но выпилена вся магия заголовков и компрессии, просто копирование 1-в-1 */
+static int
+copy_page(pgFile *file, BlockNumber blknum,
+ FILE *in, FILE *out, Page page,
+ const char *to_fullpath)
+{
+ /* write data page */
+ if (fio_fwrite(out, page, BLCKSZ) != BLCKSZ)
+ elog(ERROR, "File: \"%s\", cannot write at block %u: %s",
+ to_fullpath, blknum, strerror(errno));
+
+ file->write_size += BLCKSZ;
+ file->uncompressed_size += BLCKSZ;
+
+ return BLCKSZ;
+}
+
/*
* Backup data file in the from_root directory to the to_root directory with
* same relative path. If prev_backup_start_lsn is not NULL, only pages with
@@ -623,6 +645,169 @@ backup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpat
pg_free(headers);
}
+/*
+ * Backup data file in the from_root directory to the to_root directory with
+ * same relative path. If prev_backup_start_lsn is not NULL, only pages with
+ * higher lsn will be copied.
+ * Not just copy file, but read it block by block (use bitmap in case of
+ * incremental backup), validate checksum, optionally compress and write to
+ * backup with special header.
+ */
+void
+catchup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath,
+ XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
+ CompressAlg calg, int clevel, uint32 checksum_version,
+ int ptrack_version_num, const char *ptrack_schema,
+ bool is_merge, size_t prev_size)
+{
+ int rc;
+ bool use_pagemap;
+ char *errmsg = NULL;
+ BlockNumber err_blknum = 0;
+ /* page headers */
+ BackupPageHeader2 *headers = NULL;
+
+ /* sanity */
+ if (file->size % BLCKSZ != 0)
+ elog(WARNING, "File: \"%s\", invalid file size %zu", from_fullpath, file->size);
+
+ /*
+ * Compute expected number of blocks in the file.
+ * NOTE This is a normal situation, if the file size has changed
+ * since the moment we computed it.
+ */
+ file->n_blocks = file->size/BLCKSZ;
+
+ /*
+ * Skip unchanged file only if it exists in previous backup.
+ * This way we can correctly handle null-sized files which are
+ * not tracked by pagemap and thus always marked as unchanged.
+ */
+ if (backup_mode == BACKUP_MODE_DIFF_PTRACK &&
+ file->pagemap.bitmapsize == PageBitmapIsEmpty &&
+ file->exists_in_prev && file->size == prev_size && !file->pagemap_isabsent)
+ {
+ /*
+ * There are no changed blocks since last backup. We want to make
+ * incremental backup, so we should exit.
+ */
+ file->write_size = BYTES_INVALID;
+ return;
+ }
+
+ /* reset size summary */
+ file->read_size = 0;
+ file->write_size = 0;
+ file->uncompressed_size = 0;
+ INIT_FILE_CRC32(true, file->crc);
+
+ /*
+ * Read each page, verify checksum and write it to backup.
+ * If page map is empty or file is not present in previous backup
+ * backup all pages of the relation.
+ *
+ * In PTRACK 1.x there was a problem
+ * of data files with missing _ptrack map.
+ * Such files should be fully copied.
+ */
+
+ if (file->pagemap.bitmapsize == PageBitmapIsEmpty ||
+ file->pagemap_isabsent || !file->exists_in_prev ||
+ !file->pagemap.bitmap)
+ use_pagemap = false;
+ else
+ use_pagemap = true;
+
+ if (use_pagemap)
+ elog(VERBOSE, "Using pagemap for file \"%s\"", file->rel_path);
+
+ /* Remote mode */
+ if (fio_is_remote(FIO_DB_HOST))
+ {
+ rc = fio_copy_pages(to_fullpath, from_fullpath, file,
+ /* send prev backup START_LSN */
+ (backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
+ file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
+ calg, clevel, checksum_version,
+ /* send pagemap if any */
+ use_pagemap,
+ /* variables for error reporting */
+ &err_blknum, &errmsg, &headers);
+ }
+ else
+ {
+ /* TODO: stop handling errors internally */
+ rc = copy_pages(to_fullpath, from_fullpath, file,
+ /* send prev backup START_LSN */
+ (backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) &&
+ file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr,
+ checksum_version, use_pagemap,
+ backup_mode, ptrack_version_num, ptrack_schema);
+ }
+
+ /* check for errors */
+ if (rc == FILE_MISSING)
+ {
+ elog(is_merge ? ERROR : LOG, "File not found: \"%s\"", from_fullpath);
+ file->write_size = FILE_NOT_FOUND;
+ goto cleanup;
+ }
+
+ else if (rc == WRITE_FAILED)
+ elog(ERROR, "Cannot write block %u of \"%s\": %s",
+ err_blknum, to_fullpath, strerror(errno));
+
+ else if (rc == PAGE_CORRUPTION)
+ {
+ if (errmsg)
+ elog(ERROR, "Corruption detected in file \"%s\", block %u: %s",
+ from_fullpath, err_blknum, errmsg);
+ else
+ elog(ERROR, "Corruption detected in file \"%s\", block %u",
+ from_fullpath, err_blknum);
+ }
+ /* OPEN_FAILED and READ_FAILED */
+ else if (rc == OPEN_FAILED)
+ {
+ if (errmsg)
+ elog(ERROR, "%s", errmsg);
+ else
+ elog(ERROR, "Cannot open file \"%s\"", from_fullpath);
+ }
+ else if (rc == READ_FAILED)
+ {
+ if (errmsg)
+ elog(ERROR, "%s", errmsg);
+ else
+ elog(ERROR, "Cannot read file \"%s\"", from_fullpath);
+ }
+
+ file->read_size = rc * BLCKSZ;
+
+ /* refresh n_blocks for FULL and DELTA */
+ if (backup_mode == BACKUP_MODE_FULL ||
+ backup_mode == BACKUP_MODE_DIFF_DELTA)
+ file->n_blocks = file->read_size / BLCKSZ;
+
+ /* Determine that file didn`t changed in case of incremental catchup */
+ if (backup_mode != BACKUP_MODE_FULL &&
+ file->exists_in_prev &&
+ file->write_size == 0 &&
+ file->n_blocks > 0)
+ {
+ file->write_size = BYTES_INVALID;
+ }
+
+cleanup:
+
+ /* finish CRC calculation */
+ FIN_FILE_CRC32(true, file->crc);
+
+ pg_free(errmsg);
+ pg_free(file->pagemap.bitmap);
+ pg_free(headers);
+}
+
/*
* Backup non data file
* We do not apply compression to this file.
@@ -1992,6 +2177,7 @@ send_pages(const char *to_fullpath, const char *from_fullpath,
true, checksum_version,
ptrack_version_num, ptrack_schema,
from_fullpath, &page_st);
+
if (rc == PageIsTruncated)
break;
@@ -2068,6 +2254,130 @@ send_pages(const char *to_fullpath, const char *from_fullpath,
return n_blocks_read;
}
+/* copy local file (взята из send_pages, но используется простое копирование странички, без добавления заголовков и компрессии) */
+int
+copy_pages(const char *to_fullpath, const char *from_fullpath,
+ pgFile *file, XLogRecPtr sync_lsn,
+ uint32 checksum_version, bool use_pagemap,
+ BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema)
+{
+ FILE *in = NULL;
+ FILE *out = NULL;
+ char curr_page[BLCKSZ];
+ int n_blocks_read = 0;
+ BlockNumber blknum = 0;
+ datapagemap_iterator_t *iter = NULL;
+
+ /* stdio buffers */
+ char *in_buf = NULL;
+ char *out_buf = NULL;
+
+ /* open source file for read */
+ in = fopen(from_fullpath, PG_BINARY_R);
+ if (in == NULL)
+ {
+ /*
+ * If file is not found, this is not en error.
+ * It could have been deleted by concurrent postgres transaction.
+ */
+ if (errno == ENOENT)
+ return FILE_MISSING;
+
+ elog(ERROR, "Cannot open file \"%s\": %s", from_fullpath, strerror(errno));
+ }
+
+ /*
+ * Enable stdio buffering for local input file,
+ * unless the pagemap is involved, which
+ * imply a lot of random access.
+ */
+
+ if (use_pagemap)
+ {
+ iter = datapagemap_iterate(&file->pagemap);
+ datapagemap_next(iter, &blknum); /* set first block */
+
+ setvbuf(in, NULL, _IONBF, BUFSIZ);
+ }
+ else
+ {
+ in_buf = pgut_malloc(STDIO_BUFSIZE);
+ setvbuf(in, in_buf, _IOFBF, STDIO_BUFSIZE);
+ }
+
+ out = fio_fopen(to_fullpath, PG_BINARY_R "+", FIO_BACKUP_HOST);
+ if (out == NULL)
+ elog(ERROR, "Cannot open destination file \"%s\": %s",
+ to_fullpath, strerror(errno));
+
+ /* update file permission */
+ if (fio_chmod(to_fullpath, file->mode, FIO_BACKUP_HOST) == -1)
+ elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
+ strerror(errno));
+
+ elog(VERBOSE, "ftruncate file \"%s\" to size %lu",
+ to_fullpath, file->size);
+ if (fio_ftruncate(out, file->size) == -1)
+ elog(ERROR, "Cannot ftruncate file \"%s\" to size %lu: %s",
+ to_fullpath, file->size, strerror(errno));
+
+ if (!fio_is_remote_file(out))
+ {
+ out_buf = pgut_malloc(STDIO_BUFSIZE);
+ setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE);
+ }
+
+ while (blknum < file->n_blocks)
+ {
+ PageState page_st;
+ int rc = prepare_page(file, sync_lsn,
+ blknum, in, backup_mode, curr_page,
+ true, checksum_version,
+ ptrack_version_num, ptrack_schema,
+ from_fullpath, &page_st);
+ if (rc == PageIsTruncated)
+ break;
+
+ else if (rc == PageIsOk)
+ {
+ if (fio_fseek(out, blknum * BLCKSZ) < 0)
+ {
+ elog(ERROR, "Cannot seek block %u of \"%s\": %s",
+ blknum, to_fullpath, strerror(errno));
+ }
+ copy_page(file, blknum, in, out, curr_page, to_fullpath);
+ }
+
+ n_blocks_read++;
+
+ /* next block */
+ if (use_pagemap)
+ {
+ /* exit if pagemap is exhausted */
+ if (!datapagemap_next(iter, &blknum))
+ break;
+ }
+ else
+ blknum++;
+ }
+
+ /* cleanup */
+ if (in && fclose(in))
+ elog(ERROR, "Cannot close the source file \"%s\": %s",
+ to_fullpath, strerror(errno));
+
+ /* close local output file */
+ if (out && fio_fclose(out))
+ elog(ERROR, "Cannot close the destination file \"%s\": %s",
+ to_fullpath, strerror(errno));
+
+ pg_free(iter);
+ pg_free(in_buf);
+ pg_free(out_buf);
+
+ return n_blocks_read;
+}
+
/*
* Attempt to open header file, read content and return as
* array of headers.
diff --git a/src/dir.c b/src/dir.c
index ce255d0ad..473534c8b 100644
--- a/src/dir.c
+++ b/src/dir.c
@@ -485,6 +485,13 @@ pgFileCompareSize(const void *f1, const void *f2)
return 0;
}
+/* Compare two pgFile with their size in descending order */
+int
+pgFileCompareSizeDesc(const void *f1, const void *f2)
+{
+ return -1 * pgFileCompareSize(f1, f2);
+}
+
static int
pgCompareString(const void *str1, const void *str2)
{
@@ -887,7 +894,7 @@ dir_list_file_internal(parray *files, pgFile *parent, const char *parent_dir,
*
* Copy of function get_tablespace_mapping() from pg_basebackup.c.
*/
-static const char *
+const char *
get_tablespace_mapping(const char *dir)
{
TablespaceListCell *cell;
diff --git a/src/help.c b/src/help.c
index e1c8d6833..921feaec0 100644
--- a/src/help.c
+++ b/src/help.c
@@ -2,7 +2,7 @@
*
* help.c
*
- * Copyright (c) 2017-2019, Postgres Professional
+ * Copyright (c) 2017-2021, Postgres Professional
*
*-------------------------------------------------------------------------
*/
@@ -29,6 +29,7 @@ static void help_archive_get(void);
static void help_checkdb(void);
static void help_help(void);
static void help_version(void);
+static void help_catchup(void);
void
help_print_version(void)
@@ -70,6 +71,7 @@ help_command(ProbackupSubcmd const subcmd)
&help_internal, // AGENT_CMD
&help_help,
&help_version,
+ &help_catchup,
};
Assert((int)subcmd < sizeof(help_functions) / sizeof(help_functions[0]));
@@ -246,6 +248,19 @@ help_pg_probackup(void)
printf(_(" [--ssh-options]\n"));
printf(_(" [--help]\n"));
+ printf(_("\n %s catchup -b catchup-mode\n"), PROGRAM_NAME);
+ printf(_(" --source-pgdata=path_to_pgdata_on_remote_server\n"));
+ printf(_(" --destination-pgdata=path_to_local_dir\n"));
+ printf(_(" [--stream [-S slot-name]] [--temp-slot]\n"));
+ printf(_(" [-j num-threads]\n"));
+ printf(_(" [-T OLDDIR=NEWDIR]\n"));
+ printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n"));
+ printf(_(" [-w --no-password] [-W --password]\n"));
+ printf(_(" [--remote-proto] [--remote-host]\n"));
+ printf(_(" [--remote-port] [--remote-path] [--remote-user]\n"));
+ printf(_(" [--ssh-options]\n"));
+ printf(_(" [--help]\n"));
+
if ((PROGRAM_URL || PROGRAM_EMAIL))
{
printf("\n");
@@ -1009,3 +1024,49 @@ help_version(void)
printf(_("\n%s version\n"), PROGRAM_NAME);
printf(_("%s --version\n\n"), PROGRAM_NAME);
}
+
+static void
+help_catchup(void)
+{
+ printf(_("\n%s catchup -b catchup-mode\n"), PROGRAM_NAME);
+ printf(_(" --source-pgdata=path_to_pgdata_on_remote_server\n"));
+ printf(_(" --destination-pgdata=path_to_local_dir\n"));
+ printf(_(" [--stream [-S slot-name]] [--temp-slot]\n"));
+ printf(_(" [-j num-threads]\n"));
+ printf(_(" [-T OLDDIR=NEWDIR]\n"));
+ printf(_(" [-d dbname] [-h host] [-p port] [-U username]\n"));
+ printf(_(" [-w --no-password] [-W --password]\n"));
+ printf(_(" [--remote-proto] [--remote-host]\n"));
+ printf(_(" [--remote-port] [--remote-path] [--remote-user]\n"));
+ printf(_(" [--ssh-options]\n"));
+ printf(_(" [--help]\n\n"));
+
+ printf(_(" -b, --backup-mode=catchup-mode catchup mode=FULL|DELTA|PTRACK\n"));
+ printf(_(" --stream stream the transaction log (only supported mode)\n"));
+ printf(_(" -S, --slot=SLOTNAME replication slot to use\n"));
+ printf(_(" --temp-slot use temporary replication slot\n"));
+
+ printf(_(" -j, --threads=NUM number of parallel threads\n"));
+
+ printf(_(" -T, --tablespace-mapping=OLDDIR=NEWDIR\n"));
+ printf(_(" relocate the tablespace from directory OLDDIR to NEWDIR\n"));
+
+ printf(_("\n Connection options:\n"));
+ printf(_(" -U, --pguser=USERNAME user name to connect as (default: current local user)\n"));
+ printf(_(" -d, --pgdatabase=DBNAME database to connect (default: username)\n"));
+ printf(_(" -h, --pghost=HOSTNAME database server host or socket directory(default: 'local socket')\n"));
+ printf(_(" -p, --pgport=PORT database server port (default: 5432)\n"));
+ printf(_(" -w, --no-password never prompt for password\n"));
+ printf(_(" -W, --password force password prompt\n\n"));
+
+ printf(_("\n Remote options:\n"));
+ printf(_(" --remote-proto=protocol remote protocol to use\n"));
+ printf(_(" available options: 'ssh', 'none' (default: ssh)\n"));
+ printf(_(" --remote-host=hostname remote host address or hostname\n"));
+ printf(_(" --remote-port=port remote host port (default: 22)\n"));
+ printf(_(" --remote-path=path path to directory with pg_probackup binary on remote host\n"));
+ printf(_(" (default: current binary path)\n"));
+ printf(_(" --remote-user=username user name for ssh connection (default: current user)\n"));
+ printf(_(" --ssh-options=ssh_options additional ssh options (default: none)\n"));
+ printf(_(" (example: --ssh-options='-c cipher_spec -F configfile')\n\n"));
+}
diff --git a/src/init.c b/src/init.c
index dc821325a..a4911cb5c 100644
--- a/src/init.c
+++ b/src/init.c
@@ -57,7 +57,7 @@ do_add_instance(InstanceState *instanceState, InstanceConfig *instance)
"(-D, --pgdata)");
/* Read system_identifier from PGDATA */
- instance->system_identifier = get_system_identifier(instance->pgdata);
+ instance->system_identifier = get_system_identifier(instance->pgdata, FIO_DB_HOST);
/* Starting from PostgreSQL 11 read WAL segment size from PGDATA */
instance->xlog_seg_size = get_xlog_seg_size(instance->pgdata);
diff --git a/src/pg_probackup.c b/src/pg_probackup.c
index 3150900b6..00796be04 100644
--- a/src/pg_probackup.c
+++ b/src/pg_probackup.c
@@ -88,6 +88,9 @@ bool backup_logs = false;
bool smooth_checkpoint;
char *remote_agent;
static char *backup_note = NULL;
+/* catchup options */
+static char *catchup_source_pgdata = NULL;
+static char *catchup_destination_pgdata = NULL;
/* restore options */
static char *target_time = NULL;
static char *target_xid = NULL;
@@ -201,6 +204,9 @@ static ConfigOption cmd_options[] =
{ 'b', 184, "merge-expired", &merge_expired, SOURCE_CMD_STRICT },
{ 'b', 185, "dry-run", &dry_run, SOURCE_CMD_STRICT },
{ 's', 238, "note", &backup_note, SOURCE_CMD_STRICT },
+ /* catchup options */
+ { 's', 239, "source-pgdata", &catchup_source_pgdata, SOURCE_CMD_STRICT },
+ { 's', 240, "destination-pgdata", &catchup_destination_pgdata, SOURCE_CMD_STRICT },
/* restore options */
{ 's', 136, "recovery-target-time", &target_time, SOURCE_CMD_STRICT },
{ 's', 137, "recovery-target-xid", &target_xid, SOURCE_CMD_STRICT },
@@ -445,11 +451,12 @@ main(int argc, char *argv[])
catalogState->catalog_path, WAL_SUBDIR);
}
- /* backup_path is required for all pg_probackup commands except help, version and checkdb */
+ /* backup_path is required for all pg_probackup commands except help, version, checkdb and catchup */
if (backup_path == NULL &&
backup_subcmd != CHECKDB_CMD &&
backup_subcmd != HELP_CMD &&
- backup_subcmd != VERSION_CMD)
+ backup_subcmd != VERSION_CMD &&
+ backup_subcmd != CATCHUP_CMD)
elog(ERROR, "required parameter not specified: BACKUP_PATH (-B, --backup-path)");
/* ===== catalogState (END) ======*/
@@ -458,12 +465,12 @@ main(int argc, char *argv[])
/*
* Option --instance is required for all commands except
- * init, show, checkdb and validate
+ * init, show, checkdb, validate and catchup
*/
if (instance_name == NULL)
{
if (backup_subcmd != INIT_CMD && backup_subcmd != SHOW_CMD &&
- backup_subcmd != VALIDATE_CMD && backup_subcmd != CHECKDB_CMD)
+ backup_subcmd != VALIDATE_CMD && backup_subcmd != CHECKDB_CMD && backup_subcmd != CATCHUP_CMD)
elog(ERROR, "required parameter not specified: --instance");
}
else
@@ -545,6 +552,10 @@ main(int argc, char *argv[])
setMyLocation(backup_subcmd);
}
}
+ else if (backup_subcmd == CATCHUP_CMD)
+ {
+ config_get_opt_env(instance_options);
+ }
/*
* Disable logging into file for archive-push and archive-get.
@@ -587,6 +598,13 @@ main(int argc, char *argv[])
"You must specify --log-directory option when running checkdb with "
"--log-level-file option enabled.");
+ if (backup_subcmd == CATCHUP_CMD &&
+ instance_config.logger.log_level_file != LOG_OFF &&
+ instance_config.logger.log_directory == NULL)
+ elog(ERROR, "Cannot save catchup logs to a file. "
+ "You must specify --log-directory option when running catchup with "
+ "--log-level-file option enabled.");
+
/* Initialize logger */
init_logger(backup_path, &instance_config.logger);
@@ -745,6 +763,25 @@ main(int argc, char *argv[])
}
}
+ /* checking required options */
+ if (backup_subcmd == CATCHUP_CMD)
+ {
+ if (catchup_source_pgdata == NULL)
+ elog(ERROR, "You must specify \"--source-pgdata\" option with the \"%s\" command", get_subcmd_name(backup_subcmd));
+ if (catchup_destination_pgdata == NULL)
+ elog(ERROR, "You must specify \"--destination-pgdata\" option with the \"%s\" command", get_subcmd_name(backup_subcmd));
+ if (current.backup_mode == BACKUP_MODE_INVALID)
+ elog(ERROR, "Required parameter not specified: BACKUP_MODE (-b, --backup-mode)");
+ if (current.backup_mode != BACKUP_MODE_FULL && current.backup_mode != BACKUP_MODE_DIFF_PTRACK && current.backup_mode != BACKUP_MODE_DIFF_DELTA)
+ elog(ERROR, "Only \"FULL\", \"PTRACK\" and \"DELTA\" modes are supported with the \"%s\" command", get_subcmd_name(backup_subcmd));
+ if (!stream_wal)
+ elog(INFO, "--stream is required, forcing stream mode");
+ current.stream = stream_wal = true;
+ if (instance_config.external_dir_str)
+ elog(ERROR, "external directories not supported fom \"%s\" command", get_subcmd_name(backup_subcmd));
+ // TODO проверить instance_config.conn_opt
+ }
+
/* sanity */
if (backup_subcmd == VALIDATE_CMD && restore_params->no_validate)
elog(ERROR, "You cannot specify \"--no-validate\" option with the \"%s\" command",
@@ -787,6 +824,8 @@ main(int argc, char *argv[])
return do_backup(instanceState, set_backup_params,
no_validate, no_sync, backup_logs);
}
+ case CATCHUP_CMD:
+ return do_catchup(catchup_source_pgdata, catchup_destination_pgdata, num_threads, !no_sync);
case RESTORE_CMD:
return do_restore_or_validate(instanceState, current.backup_id,
recovery_target_options,
diff --git a/src/pg_probackup.h b/src/pg_probackup.h
index ccbf803fd..1cad526dd 100644
--- a/src/pg_probackup.h
+++ b/src/pg_probackup.h
@@ -17,6 +17,7 @@
#include "access/xlog_internal.h"
#include "utils/pg_crc.h"
+#include "catalog/pg_control.h"
#if PG_VERSION_NUM >= 120000
#include "common/logging.h"
@@ -420,7 +421,7 @@ typedef struct PGNodeInfo
char server_version_str[100];
int ptrack_version_num;
- bool is_ptrack_enable;
+ bool is_ptrack_enabled;
const char *ptrack_schema; /* used only for ptrack 2.x */
} PGNodeInfo;
@@ -840,13 +841,16 @@ extern const char *deparse_backup_mode(BackupMode mode);
extern void process_block_change(ForkNumber forknum, RelFileNode rnode,
BlockNumber blkno);
+/* in catchup.c */
+extern int do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads, bool sync_dest_files);
+
/* in restore.c */
extern int do_restore_or_validate(InstanceState *instanceState,
time_t target_backup_id,
pgRecoveryTarget *rt,
pgRestoreParams *params,
bool no_sync);
-extern bool satisfy_timeline(const parray *timelines, const pgBackup *backup);
+extern bool satisfy_timeline(const parray *timelines, TimeLineID tli, XLogRecPtr lsn);
extern bool satisfy_recovery_target(const pgBackup *backup,
const pgRecoveryTarget *rt);
extern pgRecoveryTarget *parseRecoveryTargetOptions(
@@ -861,6 +865,8 @@ extern parray *get_dbOid_exclude_list(pgBackup *backup, parray *datname_list,
extern parray *get_backup_filelist(pgBackup *backup, bool strict);
extern parray *read_timeline_history(const char *arclog_path, TimeLineID targetTLI, bool strict);
extern bool tliIsPartOfHistory(const parray *timelines, TimeLineID tli);
+extern DestDirIncrCompatibility check_incremental_compatibility(const char *pgdata, uint64 system_identifier,
+ IncrRestoreMode incremental_mode);
/* in merge.c */
extern void do_merge(InstanceState *instanceState, time_t backup_id, bool no_validate, bool no_sync);
@@ -1002,6 +1008,7 @@ extern void dir_list_file(parray *files, const char *root, bool exclude,
bool follow_symlink, bool add_root, bool backup_logs,
bool skip_hidden, int external_dir_num, fio_location location);
+extern const char *get_tablespace_mapping(const char *dir);
extern void create_data_directories(parray *dest_files,
const char *data_dir,
const char *backup_dir,
@@ -1054,6 +1061,7 @@ extern int pgFileCompareRelPathWithExternal(const void *f1, const void *f2);
extern int pgFileCompareRelPathWithExternalDesc(const void *f1, const void *f2);
extern int pgFileCompareLinked(const void *f1, const void *f2);
extern int pgFileCompareSize(const void *f1, const void *f2);
+extern int pgFileCompareSizeDesc(const void *f1, const void *f2);
extern int pgCompareOid(const void *f1, const void *f2);
extern void pfilearray_clear_locks(parray *file_list);
@@ -1061,6 +1069,12 @@ extern void pfilearray_clear_locks(parray *file_list);
extern bool check_data_file(ConnectionArgs *arguments, pgFile *file,
const char *from_fullpath, uint32 checksum_version);
+
+extern void catchup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath,
+ XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
+ CompressAlg calg, int clevel, uint32 checksum_version,
+ int ptrack_version_num, const char *ptrack_schema,
+ bool is_merge, size_t prev_size);
extern void backup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpath,
XLogRecPtr prev_backup_start_lsn, BackupMode backup_mode,
CompressAlg calg, int clevel, uint32 checksum_version,
@@ -1129,14 +1143,15 @@ extern XLogRecPtr get_next_record_lsn(const char *archivedir, XLogSegNo segno, T
/* in util.c */
extern TimeLineID get_current_timeline(PGconn *conn);
-extern TimeLineID get_current_timeline_from_control(bool safe);
+extern TimeLineID get_current_timeline_from_control(const char *pgdata_path, fio_location location, bool safe);
extern XLogRecPtr get_checkpoint_location(PGconn *conn);
-extern uint64 get_system_identifier(const char *pgdata_path);
+extern uint64 get_system_identifier(const char *pgdata_path, fio_location location);
extern uint64 get_remote_system_identifier(PGconn *conn);
extern uint32 get_data_checksum_version(bool safe);
extern pg_crc32c get_pgcontrol_checksum(const char *pgdata_path);
-extern uint32 get_xlog_seg_size(char *pgdata_path);
-extern void get_redo(const char *pgdata_path, RedoParams *redo);
+extern DBState get_system_dbstate(const char *pgdata_path, fio_location location);
+extern uint32 get_xlog_seg_size(const char *pgdata_path);
+extern void get_redo(const char *pgdata_path, fio_location pgdata_location, RedoParams *redo);
extern void set_min_recovery_point(pgFile *file, const char *backup_path,
XLogRecPtr stop_backup_lsn);
extern void copy_pgcontrol_file(const char *from_fullpath, fio_location from_location,
@@ -1161,7 +1176,7 @@ extern void pretty_size(int64 size, char *buf, size_t len);
extern void pretty_time_interval(double time, char *buf, size_t len);
extern PGconn *pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo);
-extern void check_system_identifiers(PGconn *conn, char *pgdata);
+extern void check_system_identifiers(PGconn *conn, const char *pgdata);
extern void parse_filelist_filenames(parray *files, const char *root);
/* in ptrack.c */
@@ -1170,7 +1185,8 @@ extern void make_pagemap_from_ptrack_2(parray* files, PGconn* backup_conn,
int ptrack_version_num,
XLogRecPtr lsn);
extern void get_ptrack_version(PGconn *backup_conn, PGNodeInfo *nodeInfo);
-extern bool pg_ptrack_enable(PGconn *backup_conn, int ptrack_version_num);
+extern bool pg_is_ptrack_enabled(PGconn *backup_conn, int ptrack_version_num);
+
extern XLogRecPtr get_last_ptrack_lsn(PGconn *backup_conn, PGNodeInfo *nodeInfo);
extern parray * pg_ptrack_get_pagemapset(PGconn *backup_conn, const char *ptrack_schema,
int ptrack_version_num, XLogRecPtr lsn);
@@ -1182,6 +1198,10 @@ extern int send_pages(const char *to_fullpath, const char *from_fullpath,
pgFile *file, XLogRecPtr prev_backup_start_lsn, CompressAlg calg, int clevel,
uint32 checksum_version, bool use_pagemap, BackupPageHeader2 **headers,
BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema);
+extern int copy_pages(const char *to_fullpath, const char *from_fullpath,
+ pgFile *file, XLogRecPtr prev_backup_start_lsn,
+ uint32 checksum_version, bool use_pagemap,
+ BackupMode backup_mode, int ptrack_version_num, const char *ptrack_schema);
/* FIO */
extern void setMyLocation(ProbackupSubcmd const subcmd);
@@ -1190,6 +1210,10 @@ extern int fio_send_pages(const char *to_fullpath, const char *from_fullpath, pg
XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version,
bool use_pagemap, BlockNumber *err_blknum, char **errormsg,
BackupPageHeader2 **headers);
+extern int fio_copy_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file,
+ XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version,
+ bool use_pagemap, BlockNumber *err_blknum, char **errormsg,
+ BackupPageHeader2 **headers);
/* return codes for fio_send_pages */
extern int fio_send_file_gz(const char *from_fullpath, const char *to_fullpath, FILE* out, char **errormsg);
extern int fio_send_file(const char *from_fullpath, const char *to_fullpath, FILE* out,
@@ -1243,6 +1267,7 @@ extern void start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path,
ConnectionOptions *conn_opt,
XLogRecPtr startpos, TimeLineID starttli);
extern int wait_WAL_streaming_end(parray *backup_files_list);
+extern parray* parse_tli_history_buffer(char *history, TimeLineID tli);
/* external variables and functions, implemented in backup.c */
typedef struct PGStopBackupResult
@@ -1280,5 +1305,6 @@ extern XLogRecPtr wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr lsn, bool
bool in_prev_segment, bool segment_only,
int timeout_elevel, bool in_stream_dir);
extern void wait_wal_and_calculate_stop_lsn(const char *xlog_path, XLogRecPtr stop_lsn, pgBackup *backup);
+extern int64 calculate_datasize_of_filelist(parray *filelist);
#endif /* PG_PROBACKUP_H */
diff --git a/src/ptrack.c b/src/ptrack.c
index 6825686c6..c631d7386 100644
--- a/src/ptrack.c
+++ b/src/ptrack.c
@@ -118,7 +118,7 @@ get_ptrack_version(PGconn *backup_conn, PGNodeInfo *nodeInfo)
* Check if ptrack is enabled in target instance
*/
bool
-pg_ptrack_enable(PGconn *backup_conn, int ptrack_version_num)
+pg_is_ptrack_enabled(PGconn *backup_conn, int ptrack_version_num)
{
PGresult *res_db;
bool result = false;
diff --git a/src/restore.c b/src/restore.c
index 7f5df1a00..005984aed 100644
--- a/src/restore.c
+++ b/src/restore.c
@@ -67,8 +67,6 @@ static void restore_chain(pgBackup *dest_backup, parray *parent_chain,
parray *dbOid_exclude_list, pgRestoreParams *params,
const char *pgdata_path, bool no_sync, bool cleanup_pgdata,
bool backup_has_tblspc);
-static DestDirIncrCompatibility check_incremental_compatibility(const char *pgdata, uint64 system_identifier,
- IncrRestoreMode incremental_mode);
/*
* Iterate over backup list to find all ancestors of the broken parent_backup
@@ -293,7 +291,7 @@ do_restore_or_validate(InstanceState *instanceState, time_t target_backup_id, pg
if (!timelines)
elog(ERROR, "Failed to get history file for target timeline %i", rt->target_tli);
- if (!satisfy_timeline(timelines, current_backup))
+ if (!satisfy_timeline(timelines, current_backup->tli, current_backup->stop_lsn))
{
if (target_backup_id != INVALID_BACKUP_ID)
elog(ERROR, "target backup %s does not satisfy target timeline",
@@ -487,7 +485,7 @@ do_restore_or_validate(InstanceState *instanceState, time_t target_backup_id, pg
{
RedoParams redo;
parray *timelines = NULL;
- get_redo(instance_config.pgdata, &redo);
+ get_redo(instance_config.pgdata, FIO_DB_HOST, &redo);
if (redo.checksum_version == 0)
elog(ERROR, "Incremental restore in 'lsn' mode require "
@@ -1819,7 +1817,7 @@ satisfy_recovery_target(const pgBackup *backup, const pgRecoveryTarget *rt)
/* TODO description */
bool
-satisfy_timeline(const parray *timelines, const pgBackup *backup)
+satisfy_timeline(const parray *timelines, TimeLineID tli, XLogRecPtr lsn)
{
int i;
@@ -1828,9 +1826,9 @@ satisfy_timeline(const parray *timelines, const pgBackup *backup)
TimeLineHistoryEntry *timeline;
timeline = (TimeLineHistoryEntry *) parray_get(timelines, i);
- if (backup->tli == timeline->tli &&
+ if (tli == timeline->tli &&
(XLogRecPtrIsInvalid(timeline->end) ||
- backup->stop_lsn <= timeline->end))
+ lsn <= timeline->end))
return true;
}
return false;
@@ -2186,9 +2184,9 @@ check_incremental_compatibility(const char *pgdata, uint64 system_identifier,
* data files content, because based on pg_control information we will
* choose a backup suitable for lsn based incremental restore.
*/
- elog(INFO, "Trying to read pg_control file in destination direstory");
+ elog(INFO, "Trying to read pg_control file in destination directory");
- system_id_pgdata = get_system_identifier(pgdata);
+ system_id_pgdata = get_system_identifier(pgdata, FIO_DB_HOST);
if (system_id_pgdata == instance_config.system_identifier)
system_id_match = true;
diff --git a/src/stream.c b/src/stream.c
index 01161f720..615d25281 100644
--- a/src/stream.c
+++ b/src/stream.c
@@ -70,7 +70,6 @@ static void add_walsegment_to_filelist(parray *filelist, uint32 timeline,
uint32 xlog_seg_size);
static void add_history_file_to_filelist(parray *filelist, uint32 timeline,
char *basedir);
-static parray* parse_tli_history_buffer(char *history, TimeLineID tli);
/*
* Run IDENTIFY_SYSTEM through a given connection and
@@ -173,7 +172,7 @@ StreamLog(void *arg)
*/
stream_arg->startpos -= stream_arg->startpos % instance_config.xlog_seg_size;
- xlog_files_list = parray_new();
+ xlog_files_list = parray_new();
/* Initialize timeout */
stream_stop_begin = 0;
@@ -308,14 +307,14 @@ stop_streaming(XLogRecPtr xlogpos, uint32 timeline, bool segment_finished)
/* we assume that we get called once at the end of each segment */
if (segment_finished)
- {
- elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"),
- (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline);
+ {
+ elog(VERBOSE, _("finished segment at %X/%X (timeline %u)"),
+ (uint32) (xlogpos >> 32), (uint32) xlogpos, timeline);
- add_walsegment_to_filelist(xlog_files_list, timeline, xlogpos,
- (char*) stream_thread_arg.basedir,
- instance_config.xlog_seg_size);
- }
+ add_walsegment_to_filelist(xlog_files_list, timeline, xlogpos,
+ (char*) stream_thread_arg.basedir,
+ instance_config.xlog_seg_size);
+ }
/*
* Note that we report the previous, not current, position here. After a
@@ -588,20 +587,25 @@ start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOption
/* Set error exit code as default */
stream_thread_arg.ret = 1;
/* we must use startpos as start_lsn from start_backup */
- stream_thread_arg.startpos = current.start_lsn;
- stream_thread_arg.starttli = current.tli;
+ stream_thread_arg.startpos = startpos;
+ stream_thread_arg.starttli = starttli;
thread_interrupted = false;
pthread_create(&stream_thread, NULL, StreamLog, &stream_thread_arg);
}
-/* Wait for the completion of stream */
+/*
+ * Wait for the completion of stream
+ * append list of streamed xlog files
+ * into backup_files_list (if it is not NULL)
+ */
int
wait_WAL_streaming_end(parray *backup_files_list)
{
pthread_join(stream_thread, NULL);
- parray_concat(backup_files_list, xlog_files_list);
+ if(backup_files_list != NULL)
+ parray_concat(backup_files_list, xlog_files_list);
parray_free(xlog_files_list);
return stream_thread_arg.ret;
}
diff --git a/src/util.c b/src/util.c
index c0a1dc9e4..4e32e0639 100644
--- a/src/util.c
+++ b/src/util.c
@@ -10,8 +10,6 @@
#include "pg_probackup.h"
-#include "catalog/pg_control.h"
-
#include
#include
@@ -174,7 +172,7 @@ get_current_timeline(PGconn *conn)
if (PQresultStatus(res) == PGRES_TUPLES_OK)
val = PQgetvalue(res, 0, 0);
else
- return get_current_timeline_from_control(false);
+ return get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false);
if (!parse_uint32(val, &tli, 0))
{
@@ -182,7 +180,7 @@ get_current_timeline(PGconn *conn)
elog(WARNING, "Invalid value of timeline_id %s", val);
/* TODO 3.0 remove it and just error out */
- return get_current_timeline_from_control(false);
+ return get_current_timeline_from_control(instance_config.pgdata, FIO_DB_HOST, false);
}
return tli;
@@ -190,15 +188,15 @@ get_current_timeline(PGconn *conn)
/* Get timeline from pg_control file */
TimeLineID
-get_current_timeline_from_control(bool safe)
+get_current_timeline_from_control(const char *pgdata_path, fio_location location, bool safe)
{
ControlFileData ControlFile;
char *buffer;
size_t size;
/* First fetch file... */
- buffer = slurpFile(instance_config.pgdata, XLOG_CONTROL_FILE, &size,
- safe, FIO_DB_HOST);
+ buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size,
+ safe, location);
if (safe && buffer == NULL)
return 0;
@@ -249,14 +247,14 @@ get_checkpoint_location(PGconn *conn)
}
uint64
-get_system_identifier(const char *pgdata_path)
+get_system_identifier(const char *pgdata_path, fio_location location)
{
ControlFileData ControlFile;
char *buffer;
size_t size;
/* First fetch file... */
- buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, FIO_DB_HOST);
+ buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, location);
if (buffer == NULL)
return 0;
digestControlFile(&ControlFile, buffer, size);
@@ -299,7 +297,7 @@ get_remote_system_identifier(PGconn *conn)
}
uint32
-get_xlog_seg_size(char *pgdata_path)
+get_xlog_seg_size(const char *pgdata_path)
{
#if PG_VERSION_NUM >= 110000
ControlFileData ControlFile;
@@ -351,15 +349,31 @@ get_pgcontrol_checksum(const char *pgdata_path)
return ControlFile.crc;
}
+DBState
+get_system_dbstate(const char *pgdata_path, fio_location location)
+{
+ ControlFileData ControlFile;
+ char *buffer;
+ size_t size;
+
+ buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, location);
+ if (buffer == NULL)
+ return 0;
+ digestControlFile(&ControlFile, buffer, size);
+ pg_free(buffer);
+
+ return ControlFile.state;
+}
+
void
-get_redo(const char *pgdata_path, RedoParams *redo)
+get_redo(const char *pgdata_path, fio_location pgdata_location, RedoParams *redo)
{
ControlFileData ControlFile;
char *buffer;
size_t size;
/* First fetch file... */
- buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, FIO_DB_HOST);
+ buffer = slurpFile(pgdata_path, XLOG_CONTROL_FILE, &size, false, pgdata_location);
digestControlFile(&ControlFile, buffer, size);
pg_free(buffer);
diff --git a/src/utils/configuration.c b/src/utils/configuration.c
index afc1bc056..04bfbbe3b 100644
--- a/src/utils/configuration.c
+++ b/src/utils/configuration.c
@@ -110,6 +110,7 @@ static char const * const subcmd_names[] =
"agent",
"help",
"version",
+ "catchup",
};
ProbackupSubcmd
diff --git a/src/utils/configuration.h b/src/utils/configuration.h
index 4ed4e0e61..3a5de4b83 100644
--- a/src/utils/configuration.h
+++ b/src/utils/configuration.h
@@ -38,7 +38,8 @@ typedef enum ProbackupSubcmd
SSH_CMD,
AGENT_CMD,
HELP_CMD,
- VERSION_CMD
+ VERSION_CMD,
+ CATCHUP_CMD,
} ProbackupSubcmd;
typedef enum OptionSource
diff --git a/src/utils/file.c b/src/utils/file.c
index e9792dd9c..b808d6293 100644
--- a/src/utils/file.c
+++ b/src/utils/file.c
@@ -94,7 +94,7 @@ setMyLocation(ProbackupSubcmd const subcmd)
MyLocation = IsSshProtocol()
? (subcmd == ARCHIVE_PUSH_CMD || subcmd == ARCHIVE_GET_CMD)
? FIO_DB_HOST
- : (subcmd == BACKUP_CMD || subcmd == RESTORE_CMD || subcmd == ADD_INSTANCE_CMD)
+ : (subcmd == BACKUP_CMD || subcmd == RESTORE_CMD || subcmd == ADD_INSTANCE_CMD || subcmd == CATCHUP_CMD)
? FIO_BACKUP_HOST
: FIO_LOCAL_HOST
: FIO_LOCAL_HOST;
@@ -1139,6 +1139,46 @@ fio_stat(char const* path, struct stat* st, bool follow_symlink, fio_location lo
}
}
+/*
+ * Read value of a symbolic link
+ * this is a wrapper about readlink() syscall
+ * side effects: string truncation occur (and it
+ * can be checked by caller by comparing
+ * returned value >= valsiz)
+ */
+ssize_t
+fio_readlink(const char *path, char *value, size_t valsiz, fio_location location)
+{
+ if (!fio_is_remote(location))
+ {
+ /* readlink don't place trailing \0 */
+ ssize_t len = readlink(path, value, valsiz);
+ value[len < valsiz ? len : valsiz] = '\0';
+ return len;
+ }
+ else
+ {
+ fio_header hdr;
+ size_t path_len = strlen(path) + 1;
+
+ hdr.cop = FIO_READLINK;
+ hdr.handle = -1;
+ Assert(valsiz <= UINT_MAX); /* max value of fio_header.arg */
+ hdr.arg = valsiz;
+ hdr.size = path_len;
+
+ IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
+ IO_CHECK(fio_write_all(fio_stdout, path, path_len), path_len);
+
+ IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
+ Assert(hdr.cop == FIO_READLINK);
+ Assert(hdr.size <= valsiz);
+ IO_CHECK(fio_read_all(fio_stdin, value, hdr.size), hdr.size);
+ value[hdr.size < valsiz ? hdr.size : valsiz] = '\0';
+ return hdr.size;
+ }
+}
+
/* Check presence of the file */
int
fio_access(char const* path, int mode, fio_location location)
@@ -1769,7 +1809,7 @@ fio_send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file,
/* send message with header
- 8bytes 24bytes var var
+ 16bytes 24bytes var var
--------------------------------------------------------------
| fio_header | fio_send_request | FILE PATH | BITMAP(if any) |
--------------------------------------------------------------
@@ -1903,6 +1943,198 @@ fio_send_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file,
return n_blocks_read;
}
+/*
+ * Return number of actually(!) readed blocks, attempts or
+ * half-readed block are not counted.
+ * Return values in case of error:
+ * FILE_MISSING
+ * OPEN_FAILED
+ * READ_ERROR
+ * PAGE_CORRUPTION
+ * WRITE_FAILED
+ *
+ * If none of the above, this function return number of blocks
+ * readed by remote agent.
+ *
+ * In case of DELTA mode horizonLsn must be a valid lsn,
+ * otherwise it should be set to InvalidXLogRecPtr.
+ * Взято из fio_send_pages
+ */
+int
+fio_copy_pages(const char *to_fullpath, const char *from_fullpath, pgFile *file,
+ XLogRecPtr horizonLsn, int calg, int clevel, uint32 checksum_version,
+ bool use_pagemap, BlockNumber* err_blknum, char **errormsg,
+ BackupPageHeader2 **headers)
+{
+ FILE *out = NULL;
+ char *out_buf = NULL;
+ struct {
+ fio_header hdr;
+ fio_send_request arg;
+ } req;
+ BlockNumber n_blocks_read = 0;
+ BlockNumber blknum = 0;
+
+ /* send message with header
+
+ 16bytes 24bytes var var
+ --------------------------------------------------------------
+ | fio_header | fio_send_request | FILE PATH | BITMAP(if any) |
+ --------------------------------------------------------------
+ */
+
+ req.hdr.cop = FIO_SEND_PAGES;
+
+ if (use_pagemap)
+ {
+ req.hdr.size = sizeof(fio_send_request) + (*file).pagemap.bitmapsize + strlen(from_fullpath) + 1;
+ req.arg.bitmapsize = (*file).pagemap.bitmapsize;
+
+ /* TODO: add optimization for the case of pagemap
+ * containing small number of blocks with big serial numbers:
+ * https://github.com/postgrespro/pg_probackup/blob/remote_page_backup/src/utils/file.c#L1211
+ */
+ }
+ else
+ {
+ req.hdr.size = sizeof(fio_send_request) + strlen(from_fullpath) + 1;
+ req.arg.bitmapsize = 0;
+ }
+
+ req.arg.nblocks = file->size/BLCKSZ;
+ req.arg.segmentno = file->segno * RELSEG_SIZE;
+ req.arg.horizonLsn = horizonLsn;
+ req.arg.checksumVersion = checksum_version;
+ req.arg.calg = calg;
+ req.arg.clevel = clevel;
+ req.arg.path_len = strlen(from_fullpath) + 1;
+
+ file->compress_alg = calg; /* TODO: wtf? why here? */
+
+//<-----
+// datapagemap_iterator_t *iter;
+// BlockNumber blkno;
+// iter = datapagemap_iterate(pagemap);
+// while (datapagemap_next(iter, &blkno))
+// elog(INFO, "block %u", blkno);
+// pg_free(iter);
+//<-----
+
+ /* send header */
+ IO_CHECK(fio_write_all(fio_stdout, &req, sizeof(req)), sizeof(req));
+
+ /* send file path */
+ IO_CHECK(fio_write_all(fio_stdout, from_fullpath, req.arg.path_len), req.arg.path_len);
+
+ /* send pagemap if any */
+ if (use_pagemap)
+ IO_CHECK(fio_write_all(fio_stdout, (*file).pagemap.bitmap, (*file).pagemap.bitmapsize), (*file).pagemap.bitmapsize);
+
+ out = fio_fopen(to_fullpath, PG_BINARY_R "+", FIO_BACKUP_HOST);
+ if (out == NULL)
+ elog(ERROR, "Cannot open restore target file \"%s\": %s", to_fullpath, strerror(errno));
+
+ /* update file permission */
+ if (fio_chmod(to_fullpath, file->mode, FIO_BACKUP_HOST) == -1)
+ elog(ERROR, "Cannot change mode of \"%s\": %s", to_fullpath,
+ strerror(errno));
+
+ elog(VERBOSE, "ftruncate file \"%s\" to size %lu",
+ to_fullpath, file->size);
+ if (fio_ftruncate(out, file->size) == -1)
+ elog(ERROR, "Cannot ftruncate file \"%s\" to size %lu: %s",
+ to_fullpath, file->size, strerror(errno));
+
+ if (!fio_is_remote_file(out))
+ {
+ out_buf = pgut_malloc(STDIO_BUFSIZE);
+ setvbuf(out, out_buf, _IOFBF, STDIO_BUFSIZE);
+ }
+
+ while (true)
+ {
+ fio_header hdr;
+ char buf[BLCKSZ + sizeof(BackupPageHeader)];
+ IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
+
+ if (interrupted)
+ elog(ERROR, "Interrupted during page reading");
+
+ if (hdr.cop == FIO_ERROR)
+ {
+ /* FILE_MISSING, OPEN_FAILED and READ_FAILED */
+ if (hdr.size > 0)
+ {
+ IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
+ *errormsg = pgut_malloc(hdr.size);
+ snprintf(*errormsg, hdr.size, "%s", buf);
+ }
+
+ return hdr.arg;
+ }
+ else if (hdr.cop == FIO_SEND_FILE_CORRUPTION)
+ {
+ *err_blknum = hdr.arg;
+
+ if (hdr.size > 0)
+ {
+ IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
+ *errormsg = pgut_malloc(hdr.size);
+ snprintf(*errormsg, hdr.size, "%s", buf);
+ }
+ return PAGE_CORRUPTION;
+ }
+ else if (hdr.cop == FIO_SEND_FILE_EOF)
+ {
+ /* n_blocks_read reported by EOF */
+ n_blocks_read = hdr.arg;
+
+ /* receive headers if any */
+ if (hdr.size > 0)
+ {
+ *headers = pgut_malloc(hdr.size);
+ IO_CHECK(fio_read_all(fio_stdin, *headers, hdr.size), hdr.size);
+ file->n_headers = (hdr.size / sizeof(BackupPageHeader2)) -1;
+ }
+
+ break;
+ }
+ else if (hdr.cop == FIO_PAGE)
+ {
+ blknum = hdr.arg;
+
+ Assert(hdr.size <= sizeof(buf));
+ IO_CHECK(fio_read_all(fio_stdin, buf, hdr.size), hdr.size);
+
+ COMP_FILE_CRC32(true, file->crc, buf, hdr.size);
+
+ if (fio_fseek(out, blknum * BLCKSZ) < 0)
+ {
+ elog(ERROR, "Cannot seek block %u of \"%s\": %s",
+ blknum, to_fullpath, strerror(errno));
+ }
+ // должен прилетать некомпрессированный блок с заголовком
+ // Вставить assert?
+ if (fio_fwrite(out, buf + sizeof(BackupPageHeader), hdr.size - sizeof(BackupPageHeader)) != BLCKSZ)
+ {
+ fio_fclose(out);
+ *err_blknum = blknum;
+ return WRITE_FAILED;
+ }
+ file->write_size += BLCKSZ;
+ file->uncompressed_size += BLCKSZ;
+ }
+ else
+ elog(ERROR, "Remote agent returned message of unexpected type: %i", hdr.cop);
+ }
+
+ if (out)
+ fclose(out);
+ pg_free(out_buf);
+
+ return n_blocks_read;
+}
+
/* TODO: read file using large buffer
* Return codes:
* FIO_ERROR:
@@ -3147,6 +3379,26 @@ fio_communicate(int in, int out)
case FIO_GET_ASYNC_ERROR:
fio_get_async_error_impl(out);
break;
+ case FIO_READLINK: /* Read content of a symbolic link */
+ {
+ /*
+ * We need a buf for a arguments and for a result at the same time
+ * hdr.size = strlen(symlink_name) + 1
+ * hdr.arg = bufsize for a answer (symlink content)
+ */
+ size_t filename_size = (size_t)hdr.size;
+ if (filename_size + hdr.arg > buf_size) {
+ buf_size = hdr.arg;
+ buf = (char*)realloc(buf, buf_size);
+ }
+ rc = readlink(buf, buf + filename_size, hdr.arg);
+ hdr.cop = FIO_READLINK;
+ hdr.size = rc > 0 ? rc : 0;
+ IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
+ if (hdr.size != 0)
+ IO_CHECK(fio_write_all(out, buf + filename_size, hdr.size), hdr.size);
+ }
+ break;
default:
Assert(false);
}
diff --git a/src/utils/file.h b/src/utils/file.h
index ad65b9901..edb5ea0f9 100644
--- a/src/utils/file.h
+++ b/src/utils/file.h
@@ -55,7 +55,8 @@ typedef enum
FIO_LIST_DIR,
FIO_CHECK_POSTMASTER,
FIO_GET_ASYNC_ERROR,
- FIO_WRITE_ASYNC
+ FIO_WRITE_ASYNC,
+ FIO_READLINK
} fio_operations;
typedef enum
@@ -128,6 +129,7 @@ extern int fio_mkdir(char const* path, int mode, fio_location location);
extern int fio_chmod(char const* path, int mode, fio_location location);
extern int fio_access(char const* path, int mode, fio_location location);
extern int fio_stat(char const* path, struct stat* st, bool follow_symlinks, fio_location location);
+extern ssize_t fio_readlink(const char *path, char *value, size_t valsiz, fio_location location);
extern DIR* fio_opendir(char const* path, fio_location location);
extern struct dirent * fio_readdir(DIR *dirp);
extern int fio_closedir(DIR *dirp);
diff --git a/src/utils/parray.c b/src/utils/parray.c
index 95b83365d..792e26907 100644
--- a/src/utils/parray.c
+++ b/src/utils/parray.c
@@ -198,6 +198,13 @@ parray_bsearch(parray *array, const void *key, int(*compare)(const void *, const
return bsearch(&key, array->data, array->used, sizeof(void *), compare);
}
+int
+parray_bsearch_index(parray *array, const void *key, int(*compare)(const void *, const void *))
+{
+ void **elem = parray_bsearch(array, key, compare);
+ return elem != NULL ? elem - array->data : -1;
+}
+
/* checks that parray contains element */
bool parray_contains(parray *array, void *elem)
{
diff --git a/src/utils/parray.h b/src/utils/parray.h
index 85d7383f3..e92ad728c 100644
--- a/src/utils/parray.h
+++ b/src/utils/parray.h
@@ -29,6 +29,7 @@ extern bool parray_rm(parray *array, const void *key, int(*compare)(const void *
extern size_t parray_num(const parray *array);
extern void parray_qsort(parray *array, int(*compare)(const void *, const void *));
extern void *parray_bsearch(parray *array, const void *key, int(*compare)(const void *, const void *));
+extern int parray_bsearch_index(parray *array, const void *key, int(*compare)(const void *, const void *));
extern void parray_walk(parray *array, void (*action)(void *));
extern bool parray_contains(parray *array, void *elem);
diff --git a/tests/__init__.py b/tests/__init__.py
index dbf84feea..080512760 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -6,7 +6,8 @@
retention, pgpro560, pgpro589, pgpro2068, false_positive, replica, \
compression, page, ptrack, archive, exclude, cfs_backup, cfs_restore, \
cfs_validate_backup, auth_test, time_stamp, snapfs, logging, \
- locking, remote, external, config, checkdb, set_backup, incr_restore
+ locking, remote, external, config, checkdb, set_backup, incr_restore, \
+ catchup
def load_tests(loader, tests, pattern):
@@ -23,6 +24,7 @@ def load_tests(loader, tests, pattern):
# suite.addTests(loader.loadTestsFromModule(auth_test))
suite.addTests(loader.loadTestsFromModule(archive))
suite.addTests(loader.loadTestsFromModule(backup))
+ suite.addTests(loader.loadTestsFromModule(catchup))
suite.addTests(loader.loadTestsFromModule(compatibility))
suite.addTests(loader.loadTestsFromModule(checkdb))
suite.addTests(loader.loadTestsFromModule(config))
diff --git a/tests/catchup.py b/tests/catchup.py
new file mode 100644
index 000000000..5df538e42
--- /dev/null
+++ b/tests/catchup.py
@@ -0,0 +1,977 @@
+import os
+import signal
+import unittest
+from .helpers.ptrack_helpers import ProbackupTest, ProbackupException
+
+module_name = 'catchup'
+
+class CatchupTest(ProbackupTest, unittest.TestCase):
+ def setUp(self):
+ self.fname = self.id().split('.')[3]
+
+#########################################
+# Basic tests
+#########################################
+ def test_basic_full_catchup(self):
+ """
+ Test 'multithreaded basebackup' mode (aka FULL catchup)
+ """
+ # preparation
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True
+ )
+ src_pg.slow_start()
+ src_pg.safe_psql(
+ "postgres",
+ "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
+ src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+
+ # do full catchup
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+
+ # 1st check: compare data directories
+ self.compare_pgdata(
+ self.pgdata_content(src_pg.data_dir),
+ self.pgdata_content(dst_pg.data_dir)
+ )
+
+ # run&recover catchup'ed instance
+ src_pg.stop()
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+
+ # 2nd check: run verification query
+ dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+ self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
+
+ # Cleanup
+ dst_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_full_catchup_with_tablespace(self):
+ """
+ Test tablespace transfers
+ """
+ # preparation
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True
+ )
+ src_pg.slow_start()
+ tblspace1_old_path = self.get_tblspace_path(src_pg, 'tblspace1_old')
+ self.create_tblspace_in_node(src_pg, 'tblspace1', tblspc_path = tblspace1_old_path)
+ src_pg.safe_psql(
+ "postgres",
+ "CREATE TABLE ultimate_question TABLESPACE tblspace1 AS SELECT 42 AS answer")
+ src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+
+ # do full catchup with tablespace mapping
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ tblspace1_new_path = self.get_tblspace_path(dst_pg, 'tblspace1_new')
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = [
+ '-d', 'postgres',
+ '-p', str(src_pg.port),
+ '--stream',
+ '-T', '{0}={1}'.format(tblspace1_old_path, tblspace1_new_path)
+ ]
+ )
+
+ # 1st check: compare data directories
+ self.compare_pgdata(
+ self.pgdata_content(src_pg.data_dir),
+ self.pgdata_content(dst_pg.data_dir)
+ )
+
+ # make changes in master tablespace
+ src_pg.safe_psql(
+ "postgres",
+ "UPDATE ultimate_question SET answer = -1")
+ src_pg.stop()
+
+ # run&recover catchup'ed instance
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+
+ # 2nd check: run verification query
+ dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+ self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
+
+ # Cleanup
+ dst_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_basic_delta_catchup(self):
+ """
+ Test delta catchup
+ """
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ pg_options = { 'wal_log_hints': 'on' }
+ )
+ src_pg.slow_start()
+ src_pg.safe_psql(
+ "postgres",
+ "CREATE TABLE ultimate_question(answer int)")
+
+ # preparation 2: make clean shutdowned lagging behind replica
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ self.set_replica(src_pg, dst_pg)
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start(replica = True)
+ dst_pg.stop()
+
+ # preparation 3: make changes on master (source)
+ src_pg.pgbench_init(scale = 10)
+ pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
+ pgbench.wait()
+ src_pg.safe_psql("postgres", "INSERT INTO ultimate_question VALUES(42)")
+ src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+
+ # do delta catchup
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+
+ # 1st check: compare data directories
+ self.compare_pgdata(
+ self.pgdata_content(src_pg.data_dir),
+ self.pgdata_content(dst_pg.data_dir)
+ )
+
+ # run&recover catchup'ed instance
+ src_pg.stop()
+ self.set_replica(master = src_pg, replica = dst_pg)
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start(replica = True)
+
+ # 2nd check: run verification query
+ dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+ self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
+
+ # Cleanup
+ dst_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_basic_ptrack_catchup(self):
+ """
+ Test ptrack catchup
+ """
+ if not self.ptrack:
+ return unittest.skip('Skipped because ptrack support is disabled')
+
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ ptrack_enable = True,
+ initdb_params = ['--data-checksums']
+ )
+ src_pg.slow_start()
+ src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
+ src_pg.safe_psql(
+ "postgres",
+ "CREATE TABLE ultimate_question(answer int)")
+
+ # preparation 2: make clean shutdowned lagging behind replica
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ self.set_replica(src_pg, dst_pg)
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start(replica = True)
+ dst_pg.stop()
+
+ # preparation 3: make changes on master (source)
+ src_pg.pgbench_init(scale = 10)
+ pgbench = src_pg.pgbench(options=['-T', '10', '--no-vacuum'])
+ pgbench.wait()
+ src_pg.safe_psql("postgres", "INSERT INTO ultimate_question VALUES(42)")
+ src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+
+ # do ptrack catchup
+ self.catchup_node(
+ backup_mode = 'PTRACK',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+
+ # 1st check: compare data directories
+ self.compare_pgdata(
+ self.pgdata_content(src_pg.data_dir),
+ self.pgdata_content(dst_pg.data_dir)
+ )
+
+ # run&recover catchup'ed instance
+ src_pg.stop()
+ self.set_replica(master = src_pg, replica = dst_pg)
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start(replica = True)
+
+ # 2nd check: run verification query
+ dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+ self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
+
+ # Cleanup
+ dst_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_tli_delta_catchup(self):
+ """
+ Test that we correctly follow timeline change with delta catchup
+ """
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ pg_options = { 'wal_log_hints': 'on' }
+ )
+ src_pg.slow_start()
+
+ # preparation 2: destination
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ dst_pg.stop()
+
+ # preparation 3: promote source
+ src_pg.stop()
+ self.set_replica(dst_pg, src_pg) # fake replication
+ src_pg.slow_start(replica = True)
+ src_pg.promote()
+ src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
+ src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+
+ # do catchup
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+
+ # 1st check: compare data directories
+ self.compare_pgdata(
+ self.pgdata_content(src_pg.data_dir),
+ self.pgdata_content(dst_pg.data_dir)
+ )
+
+ # run&recover catchup'ed instance
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+
+ # 2nd check: run verification query
+ dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+ self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
+
+ # Cleanup
+ src_pg.stop()
+ dst_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_tli_ptrack_catchup(self):
+ """
+ Test that we correctly follow timeline change with ptrack catchup
+ """
+ if not self.ptrack:
+ return unittest.skip('Skipped because ptrack support is disabled')
+
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ ptrack_enable = True,
+ initdb_params = ['--data-checksums']
+ )
+ src_pg.slow_start()
+ src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
+
+ # preparation 2: destination
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ dst_pg.stop()
+
+ # preparation 3: promote source
+ src_pg.stop()
+ self.set_replica(dst_pg, src_pg) # fake replication
+ src_pg.slow_start(replica = True)
+ src_pg.promote()
+ src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
+ src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+
+ # do catchup
+ self.catchup_node(
+ backup_mode = 'PTRACK',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+
+ # 1st check: compare data directories
+ self.compare_pgdata(
+ self.pgdata_content(src_pg.data_dir),
+ self.pgdata_content(dst_pg.data_dir)
+ )
+
+ # run&recover catchup'ed instance
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+
+ # 2nd check: run verification query
+ dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+ self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
+
+ # Cleanup
+ src_pg.stop()
+ dst_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+#########################################
+# Test various corner conditions
+#########################################
+ def test_table_drop_with_delta(self):
+ """
+ Test that dropped table in source will be dropped in delta catchup'ed instance too
+ """
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ pg_options = { 'wal_log_hints': 'on' }
+ )
+ src_pg.slow_start()
+ src_pg.safe_psql(
+ "postgres",
+ "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
+
+ # preparation 2: make clean shutdowned lagging behind replica
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ dst_pg.stop()
+
+ # preparation 3: make changes on master (source)
+ # perform checkpoint twice to ensure, that datafile is actually deleted on filesystem
+ src_pg.safe_psql("postgres", "DROP TABLE ultimate_question")
+ src_pg.safe_psql("postgres", "CHECKPOINT")
+ src_pg.safe_psql("postgres", "CHECKPOINT")
+
+ # do delta catchup
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+
+ # Check: compare data directories
+ self.compare_pgdata(
+ self.pgdata_content(src_pg.data_dir),
+ self.pgdata_content(dst_pg.data_dir)
+ )
+
+ # Cleanup
+ src_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_table_drop_with_ptrack(self):
+ """
+ Test that dropped table in source will be dropped in ptrack catchup'ed instance too
+ """
+ if not self.ptrack:
+ return unittest.skip('Skipped because ptrack support is disabled')
+
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ ptrack_enable = True,
+ initdb_params = ['--data-checksums']
+ )
+ src_pg.slow_start()
+ src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
+ src_pg.safe_psql(
+ "postgres",
+ "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
+
+ # preparation 2: make clean shutdowned lagging behind replica
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ dst_pg.stop()
+
+ # preparation 3: make changes on master (source)
+ # perform checkpoint twice to ensure, that datafile is actually deleted on filesystem
+ src_pg.safe_psql("postgres", "DROP TABLE ultimate_question")
+ src_pg.safe_psql("postgres", "CHECKPOINT")
+ src_pg.safe_psql("postgres", "CHECKPOINT")
+
+ # do ptrack catchup
+ self.catchup_node(
+ backup_mode = 'PTRACK',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+
+ # Check: compare data directories
+ self.compare_pgdata(
+ self.pgdata_content(src_pg.data_dir),
+ self.pgdata_content(dst_pg.data_dir)
+ )
+
+ # Cleanup
+ src_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_tablefile_truncation_with_delta(self):
+ """
+ Test that truncated table in source will be truncated in delta catchup'ed instance too
+ """
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ pg_options = { 'wal_log_hints': 'on' }
+ )
+ src_pg.slow_start()
+ src_pg.safe_psql(
+ "postgres",
+ "CREATE SEQUENCE t_seq; "
+ "CREATE TABLE t_heap AS SELECT i AS id, "
+ "md5(i::text) AS text, "
+ "md5(repeat(i::text, 10))::tsvector AS tsvector "
+ "FROM generate_series(0, 1024) i")
+ src_pg.safe_psql("postgres", "VACUUM t_heap")
+
+ # preparation 2: make clean shutdowned lagging behind replica
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dest_options = {}
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ dst_pg.stop()
+
+ # preparation 3: make changes on master (source)
+ src_pg.safe_psql("postgres", "DELETE FROM t_heap WHERE ctid >= '(11,0)'")
+ src_pg.safe_psql("postgres", "VACUUM t_heap")
+
+ # do delta catchup
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+
+ # Check: compare data directories
+ self.compare_pgdata(
+ self.pgdata_content(src_pg.data_dir),
+ self.pgdata_content(dst_pg.data_dir)
+ )
+
+ # Cleanup
+ src_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_tablefile_truncation_with_ptrack(self):
+ """
+ Test that truncated table in source will be truncated in ptrack catchup'ed instance too
+ """
+ if not self.ptrack:
+ return unittest.skip('Skipped because ptrack support is disabled')
+
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ ptrack_enable = True,
+ initdb_params = ['--data-checksums']
+ )
+ src_pg.slow_start()
+ src_pg.safe_psql("postgres", "CREATE EXTENSION ptrack")
+ src_pg.safe_psql(
+ "postgres",
+ "CREATE SEQUENCE t_seq; "
+ "CREATE TABLE t_heap AS SELECT i AS id, "
+ "md5(i::text) AS text, "
+ "md5(repeat(i::text, 10))::tsvector AS tsvector "
+ "FROM generate_series(0, 1024) i")
+ src_pg.safe_psql("postgres", "VACUUM t_heap")
+
+ # preparation 2: make clean shutdowned lagging behind replica
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dest_options = {}
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ dst_pg.stop()
+
+ # preparation 3: make changes on master (source)
+ src_pg.safe_psql("postgres", "DELETE FROM t_heap WHERE ctid >= '(11,0)'")
+ src_pg.safe_psql("postgres", "VACUUM t_heap")
+
+ # do ptrack catchup
+ self.catchup_node(
+ backup_mode = 'PTRACK',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+
+ # Check: compare data directories
+ self.compare_pgdata(
+ self.pgdata_content(src_pg.data_dir),
+ self.pgdata_content(dst_pg.data_dir)
+ )
+
+ # Cleanup
+ src_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+#########################################
+# Test reaction on user errors
+#########################################
+ def test_local_tablespace_without_mapping(self):
+ """
+ Test that we detect absence of needed --tablespace-mapping option
+ """
+ if self.remote:
+ return unittest.skip('Skipped because this test tests local catchup error handling')
+
+ src_pg = self.make_simple_node(base_dir = os.path.join(module_name, self.fname, 'src'))
+ src_pg.slow_start()
+
+ tblspace_path = self.get_tblspace_path(src_pg, 'tblspace')
+ self.create_tblspace_in_node(
+ src_pg, 'tblspace',
+ tblspc_path = tblspace_path)
+
+ src_pg.safe_psql(
+ "postgres",
+ "CREATE TABLE ultimate_question TABLESPACE tblspace AS SELECT 42 AS answer")
+
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ try:
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = [
+ '-d', 'postgres',
+ '-p', str(src_pg.port),
+ '--stream',
+ ]
+ )
+ self.assertEqual(1, 0, "Expecting Error because '-T' parameter is not specified.\n Output: {0} \n CMD: {1}".format(
+ repr(self.output), self.cmd))
+ except ProbackupException as e:
+ self.assertIn(
+ 'ERROR: Local catchup executed, but source database contains tablespace',
+ e.message,
+ '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
+
+ # Cleanup
+ src_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_running_dest_postmaster(self):
+ """
+ Test that we detect running postmaster in destination
+ """
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ pg_options = { 'wal_log_hints': 'on' }
+ )
+ src_pg.slow_start()
+
+ # preparation 2: destination
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ # leave running destination postmaster
+ # so don't call dst_pg.stop()
+
+ # try delta catchup
+ try:
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ self.assertEqual(1, 0, "Expecting Error because postmaster in destination is running.\n Output: {0} \n CMD: {1}".format(
+ repr(self.output), self.cmd))
+ except ProbackupException as e:
+ self.assertIn(
+ 'ERROR: Postmaster with pid ',
+ e.message,
+ '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
+
+ # Cleanup
+ src_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_same_db_id(self):
+ """
+ Test that we detect different id's of source and destination
+ """
+ # preparation:
+ # source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True
+ )
+ src_pg.slow_start()
+ # destination
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ dst_pg.stop()
+ # fake destination
+ fake_dst_pg = self.make_simple_node(base_dir = os.path.join(module_name, self.fname, 'fake_dst'))
+ # fake source
+ fake_src_pg = self.make_simple_node(base_dir = os.path.join(module_name, self.fname, 'fake_src'))
+
+ # try delta catchup (src (with correct src conn), fake_dst)
+ try:
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = src_pg.data_dir,
+ destination_node = fake_dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ self.assertEqual(1, 0, "Expecting Error because database identifiers mismatch.\n Output: {0} \n CMD: {1}".format(
+ repr(self.output), self.cmd))
+ except ProbackupException as e:
+ self.assertIn(
+ 'ERROR: Database identifiers mismatch: ',
+ e.message,
+ '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
+
+ # try delta catchup (fake_src (with wrong src conn), dst)
+ try:
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = fake_src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ self.assertEqual(1, 0, "Expecting Error because database identifiers mismatch.\n Output: {0} \n CMD: {1}".format(
+ repr(self.output), self.cmd))
+ except ProbackupException as e:
+ self.assertIn(
+ 'ERROR: Database identifiers mismatch: ',
+ e.message,
+ '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
+
+ # Cleanup
+ src_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_destination_dbstate(self):
+ """
+ Test that we detect that destination pg is not cleanly shutdowned
+ """
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ pg_options = { 'wal_log_hints': 'on' }
+ )
+ src_pg.slow_start()
+
+ # preparation 2: destination
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+
+ # try #1
+ try:
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ self.assertEqual(1, 0, "Expecting Error because destination pg is not cleanly shutdowned.\n Output: {0} \n CMD: {1}".format(
+ repr(self.output), self.cmd))
+ except ProbackupException as e:
+ self.assertIn(
+ 'ERROR: Destination directory contains "backup_label" file',
+ e.message,
+ '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
+
+ # try #2
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ self.assertNotEqual(dst_pg.pid, 0, "Cannot detect pid of running postgres")
+ os.kill(dst_pg.pid, signal.SIGKILL)
+ try:
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ self.assertEqual(1, 0, "Expecting Error because destination pg is not cleanly shutdowned.\n Output: {0} \n CMD: {1}".format(
+ repr(self.output), self.cmd))
+ except ProbackupException as e:
+ self.assertIn(
+ 'must be stopped cleanly',
+ e.message,
+ '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
+
+ # Cleanup
+ src_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_tli_destination_mismatch(self):
+ """
+ Test that we detect TLI mismatch in destination
+ """
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ pg_options = { 'wal_log_hints': 'on' }
+ )
+ src_pg.slow_start()
+
+ # preparation 2: destination
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ self.set_replica(src_pg, dst_pg)
+ dst_pg.slow_start(replica = True)
+ dst_pg.promote()
+ dst_pg.stop()
+
+ # preparation 3: "useful" changes
+ src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
+ src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+
+ # try catchup
+ try:
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+ dst_pg.stop()
+ self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
+ except ProbackupException as e:
+ self.assertIn(
+ 'ERROR: Source is behind destination in timeline history',
+ e.message,
+ '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
+
+ # Cleanup
+ src_pg.stop()
+ self.del_test_dir(module_name, self.fname)
+
+ def test_tli_source_mismatch(self):
+ """
+ Test that we detect TLI mismatch in source history
+ """
+ # preparation 1: source
+ src_pg = self.make_simple_node(
+ base_dir = os.path.join(module_name, self.fname, 'src'),
+ set_replication = True,
+ pg_options = { 'wal_log_hints': 'on' }
+ )
+ src_pg.slow_start()
+
+ # preparation 2: fake source (promouted copy)
+ fake_src_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'fake_src'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = fake_src_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ fake_src_options = {}
+ fake_src_options['port'] = str(fake_src_pg.port)
+ self.set_auto_conf(fake_src_pg, fake_src_options)
+ self.set_replica(src_pg, fake_src_pg)
+ fake_src_pg.slow_start(replica = True)
+ fake_src_pg.promote()
+ self.switch_wal_segment(fake_src_pg)
+ fake_src_pg.safe_psql(
+ "postgres",
+ "CREATE TABLE t_heap AS SELECT i AS id, "
+ "md5(i::text) AS text, "
+ "md5(repeat(i::text, 10))::tsvector AS tsvector "
+ "FROM generate_series(0, 256) i")
+ self.switch_wal_segment(fake_src_pg)
+ fake_src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 'trash' AS garbage")
+
+ # preparation 3: destination
+ dst_pg = self.make_empty_node(os.path.join(module_name, self.fname, 'dst'))
+ self.catchup_node(
+ backup_mode = 'FULL',
+ source_pgdata = src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(src_pg.port), '--stream']
+ )
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ dst_pg.stop()
+
+ # preparation 4: "useful" changes
+ src_pg.safe_psql("postgres", "CREATE TABLE ultimate_question AS SELECT 42 AS answer")
+ src_query_result = src_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+
+ # try catchup
+ try:
+ self.catchup_node(
+ backup_mode = 'DELTA',
+ source_pgdata = fake_src_pg.data_dir,
+ destination_node = dst_pg,
+ options = ['-d', 'postgres', '-p', str(fake_src_pg.port), '--stream']
+ )
+ dst_options = {}
+ dst_options['port'] = str(dst_pg.port)
+ self.set_auto_conf(dst_pg, dst_options)
+ dst_pg.slow_start()
+ dst_query_result = dst_pg.safe_psql("postgres", "SELECT * FROM ultimate_question")
+ dst_pg.stop()
+ self.assertEqual(src_query_result, dst_query_result, 'Different answer from copy')
+ except ProbackupException as e:
+ self.assertIn(
+ 'ERROR: Destination is not in source timeline history',
+ e.message,
+ '\n Unexpected Error Message: {0}\n CMD: {1}'.format(repr(e.message), self.cmd))
+
+ # Cleanup
+ src_pg.stop()
+ fake_src_pg.stop()
+ self.del_test_dir(module_name, self.fname)
diff --git a/tests/helpers/ptrack_helpers.py b/tests/helpers/ptrack_helpers.py
index af27669b1..1de004250 100644
--- a/tests/helpers/ptrack_helpers.py
+++ b/tests/helpers/ptrack_helpers.py
@@ -345,14 +345,9 @@ def pg_config_version(self):
# print('PGPROBACKUP_SSH_USER is not set')
# exit(1)
- def make_simple_node(
+ def make_empty_node(
self,
- base_dir=None,
- set_replication=False,
- ptrack_enable=False,
- initdb_params=[],
- pg_options={}):
-
+ base_dir=None):
real_base_dir = os.path.join(self.tmp_path, base_dir)
shutil.rmtree(real_base_dir, ignore_errors=True)
os.makedirs(real_base_dir)
@@ -361,6 +356,17 @@ def make_simple_node(
# bound method slow_start() to 'node' class instance
node.slow_start = slow_start.__get__(node)
node.should_rm_dirs = True
+ return node
+
+ def make_simple_node(
+ self,
+ base_dir=None,
+ set_replication=False,
+ ptrack_enable=False,
+ initdb_params=[],
+ pg_options={}):
+
+ node = self.make_empty_node(base_dir)
node.init(
initdb_params=initdb_params, allow_streaming=set_replication)
@@ -1036,6 +1042,28 @@ def restore_node(
return self.run_pb(cmd_list + options, gdb=gdb, old_binary=old_binary)
+ def catchup_node(
+ self,
+ backup_mode, source_pgdata, destination_node,
+ options = []
+ ):
+
+ cmd_list = [
+ 'catchup',
+ '--backup-mode={0}'.format(backup_mode),
+ '--source-pgdata={0}'.format(source_pgdata),
+ '--destination-pgdata={0}'.format(destination_node.data_dir)
+ ]
+ if self.remote:
+ cmd_list += ['--remote-proto=ssh', '--remote-host=localhost']
+ if self.verbose:
+ cmd_list += [
+ '--log-level-file=VERBOSE',
+ '--log-directory={0}'.format(destination_node.logs_dir)
+ ]
+
+ return self.run_pb(cmd_list + options)
+
def show_pb(
self, backup_dir, instance=None, backup_id=None,
options=[], as_text=False, as_json=True, old_binary=False,
@@ -1736,10 +1764,10 @@ def compare_pgdata(self, original_pgdata, restored_pgdata):
):
fail = True
error_message += '\nFile permissions mismatch:\n'
- error_message += ' File_old: {0} Permissions: {1}\n'.format(
+ error_message += ' File_old: {0} Permissions: {1:o}\n'.format(
os.path.join(original_pgdata['pgdata'], file),
original_pgdata['files'][file]['mode'])
- error_message += ' File_new: {0} Permissions: {1}\n'.format(
+ error_message += ' File_new: {0} Permissions: {1:o}\n'.format(
os.path.join(restored_pgdata['pgdata'], file),
restored_pgdata['files'][file]['mode'])