diff --git a/Makefile b/Makefile index e449c9e5..2621ddf3 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ OBJS = src/utils/configuration.o src/utils/json.o src/utils/logger.o \ OBJS += src/archive.o src/backup.o src/catalog.o src/checkdb.o src/configure.o src/data.o \ src/delete.o src/dir.o src/fetch.o src/help.o src/init.o src/merge.o \ src/parsexlog.o src/ptrack.o src/pg_probackup.o src/restore.o src/show.o src/stream.o \ - src/util.o src/validate.o src/datapagemap.o src/catchup.o + src/util.o src/validate.o src/datapagemap.o src/catchup.o src/walsummary.o # borrowed files OBJS += src/pg_crc.o src/receivelog.o src/streamutil.o \ diff --git a/src/backup.c b/src/backup.c index 3cbd4fbf..d1fe709f 100644 --- a/src/backup.c +++ b/src/backup.c @@ -153,7 +153,8 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, */ if (current.backup_mode == BACKUP_MODE_DIFF_PAGE || current.backup_mode == BACKUP_MODE_DIFF_PTRACK || - current.backup_mode == BACKUP_MODE_DIFF_DELTA) + current.backup_mode == BACKUP_MODE_DIFF_DELTA || + current.backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) { /* get list of backups already taken */ backup_list = catalog_get_backup_list(instanceState, INVALID_BACKUP_ID); @@ -231,6 +232,34 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, } } + /* + * For SUMMARIZE backup mode, verify that WAL summarize is enabled + * and wait for the summarizer to catch up to the required LSN. + * + * We need to wait for WAL summary up to prev_backup->start_lsn to be + * available before starting the incremental backup. If the summarizer + * hasn't caught up within the timeout period, the backup will fail. + */ + if (current.backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) + { + if (!pg_is_walsummary_enabled(backup_conn)) + elog(ERROR, "WAL summarize backup mode requires summarize_wal to be enabled"); + + /* + * Wait for WAL summarizer to catch up to the previous backup start LSN. + * This ensures that all WAL summary files needed for this incremental + * backup are available before we start. + * + * If the summarizer hasn't caught up within 60 seconds, the backup + * will fail with an error, preventing a backup that would miss data. + */ + if (!wait_wal_summarization(backup_conn, prev_backup->start_lsn)) + elog(ERROR, "WAL summarizer did not catch up to %X/%X within timeout period. " + "Incremental backup cannot proceed without complete WAL summaries.", + (uint32) (prev_backup->start_lsn >> 32), + (uint32) (prev_backup->start_lsn)); + } + /* For incremental backup check that start_lsn is not from the past * Though it will not save us if PostgreSQL instance is actually * restored STREAM backup. @@ -357,7 +386,8 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, */ if (current.backup_mode == BACKUP_MODE_DIFF_PAGE || - current.backup_mode == BACKUP_MODE_DIFF_PTRACK) + current.backup_mode == BACKUP_MODE_DIFF_PTRACK || + current.backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) { bool pagemap_isok = true; @@ -386,6 +416,16 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, nodeInfo->ptrack_version_num, prev_backup_start_lsn); } + else if (current.backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) + { + /* + * Build the page map from WAL summary information. + */ + make_pagemap_from_walsummary(backup_files_list, backup_conn, + prev_backup->start_lsn, + current.start_lsn, + current.tli); + } time(&end_time); diff --git a/src/catalog.c b/src/catalog.c index 409d9141..a8fb1b01 100644 --- a/src/catalog.c +++ b/src/catalog.c @@ -21,7 +21,7 @@ static pgBackup* get_closest_backup(timelineInfo *tlinfo); static pgBackup* get_oldest_backup(timelineInfo *tlinfo); -static const char *backupModes[] = {"", "PAGE", "PTRACK", "DELTA", "FULL"}; +static const char *backupModes[] = {"", "PAGE", "PTRACK", "DELTA", "SUMMARIZE", "FULL"}; static pgBackup *readBackupControlFile(const char *path); static int create_backup_dir(pgBackup *backup, const char *backup_instance_path); @@ -2839,6 +2839,8 @@ parse_backup_mode(const char *value) return BACKUP_MODE_DIFF_PTRACK; else if (len > 0 && pg_strncasecmp("delta", v, len) == 0) return BACKUP_MODE_DIFF_DELTA; + else if (len > 0 && pg_strncasecmp("summarize", v, len) == 0) + return BACKUP_MODE_DIFF_SUMMARIZE; /* Backup mode is invalid, so leave with an error */ elog(ERROR, "Invalid backup-mode \"%s\"", value); @@ -2858,6 +2860,8 @@ deparse_backup_mode(BackupMode mode) return "ptrack"; case BACKUP_MODE_DIFF_DELTA: return "delta"; + case BACKUP_MODE_DIFF_SUMMARIZE: + return "summarize"; case BACKUP_MODE_INVALID: return "invalid"; } diff --git a/src/catchup.c b/src/catchup.c index 39fd37d2..cf589eae 100644 --- a/src/catchup.c +++ b/src/catchup.c @@ -118,7 +118,8 @@ catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn, if (dir_is_empty(dest_pgdata, FIO_LOCAL_HOST)) { if (current.backup_mode == BACKUP_MODE_DIFF_PTRACK || - current.backup_mode == BACKUP_MODE_DIFF_DELTA) + current.backup_mode == BACKUP_MODE_DIFF_DELTA || + current.backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) elog(ERROR, "\"%s\" is empty, but incremental catchup mode requested.", dest_pgdata); } @@ -193,6 +194,14 @@ catchup_preflight_checks(PGNodeInfo *source_node_info, PGconn *source_conn, elog(ERROR, "Ptrack is disabled"); } + /* check WAL summarize support */ + if (current.backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) + { + PGconn *conn = pgdata_basic_setup(instance_config.conn_opt, source_node_info); + if (!pg_is_walsummary_enabled(conn)) + elog(ERROR, "WAL summarize backup mode requires summarize_wal to be enabled in PostgreSQL 17+"); + } + if (current.from_replica && exclusive_backup) elog(ERROR, "Catchup from standby is only available for PostgreSQL >= 9.6"); @@ -693,6 +702,22 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads, (uint32) (dest_redo.lsn)); } + /* + * Make sure that sync point is within WAL summarize tracking range + */ + if (current.backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) + { + XLogRecPtr summarized_lsn = get_walsummary_summarized_lsn(source_conn); + + if (summarized_lsn == InvalidXLogRecPtr) + elog(ERROR, "WAL summarizer state is not available"); + if (summarized_lsn < dest_redo.lsn) + elog(WARNING, "WAL summarizer has only reached %X/%X, which is before destination checkpoint LSN %X/%X. " + "Some changes may be missed.", + (uint32) (summarized_lsn >> 32), (uint32) (summarized_lsn), + (uint32) (dest_redo.lsn >> 32), (uint32) (dest_redo.lsn)); + } + { char label[1024]; /* notify start of backup to PostgreSQL server */ @@ -801,6 +826,22 @@ do_catchup(const char *source_pgdata, const char *dest_pgdata, int num_threads, difftime(end_time, start_time)); } + /* Build page mapping in SUMMARIZE mode */ + if (current.backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) + { + time(&start_time); + elog(INFO, "Extracting pagemap of changed blocks from WAL summary"); + + /* Build the page map from WAL summary information */ + make_pagemap_from_walsummary(source_filelist, source_conn, + dest_redo.lsn, + current.start_lsn, + current.tli); + time(&end_time); + elog(INFO, "Pagemap successfully extracted, time elapsed: %.0f sec", + difftime(end_time, start_time)); + } + /* * Make directories before catchup */ diff --git a/src/data.c b/src/data.c index 544adf18..c74dfb59 100644 --- a/src/data.c +++ b/src/data.c @@ -329,8 +329,10 @@ prepare_page(pgFile *file, XLogRecPtr prev_backup_start_lsn, return PageIsOk; case PAGE_IS_VALID: - /* in DELTA or PTRACK modes we must compare lsn */ - if (backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) + /* in DELTA, PTRACK or SUMMARIZE modes we must compare lsn */ + if (backup_mode == BACKUP_MODE_DIFF_DELTA || + backup_mode == BACKUP_MODE_DIFF_PTRACK || + backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) page_is_valid = true; else return PageIsOk; @@ -394,7 +396,9 @@ prepare_page(pgFile *file, XLogRecPtr prev_backup_start_lsn, * Skip page if page lsn is less than START_LSN of parent backup. * Nullified pages must be copied by DELTA backup, just to be safe. */ - if ((backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) && + if ((backup_mode == BACKUP_MODE_DIFF_DELTA || + backup_mode == BACKUP_MODE_DIFF_PTRACK || + backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) && file->exists_in_prev && page_st->lsn > 0 && page_st->lsn < prev_backup_start_lsn) @@ -513,7 +517,8 @@ backup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpat * not tracked by pagemap and thus always marked as unchanged. */ if ((backup_mode == BACKUP_MODE_DIFF_PAGE || - backup_mode == BACKUP_MODE_DIFF_PTRACK) && + backup_mode == BACKUP_MODE_DIFF_PTRACK || + backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) && file->pagemap.bitmapsize == PageBitmapIsEmpty && file->exists_in_prev && !file->pagemap_isabsent) { @@ -553,7 +558,9 @@ backup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpat { rc = fio_send_pages(to_fullpath, from_fullpath, file, /* send prev backup START_LSN */ - (backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) && + (backup_mode == BACKUP_MODE_DIFF_DELTA || + backup_mode == BACKUP_MODE_DIFF_PTRACK || + backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) && file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr, calg, clevel, checksum_version, /* send pagemap if any */ @@ -566,7 +573,9 @@ backup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpat /* TODO: stop handling errors internally */ rc = send_pages(to_fullpath, from_fullpath, file, /* send prev backup START_LSN */ - (backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) && + (backup_mode == BACKUP_MODE_DIFF_DELTA || + backup_mode == BACKUP_MODE_DIFF_PTRACK || + backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) && file->exists_in_prev ? prev_backup_start_lsn : InvalidXLogRecPtr, calg, clevel, checksum_version, use_pagemap, &headers, backup_mode); @@ -667,7 +676,8 @@ catchup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpa * This way we can correctly handle null-sized files which are * not tracked by pagemap and thus always marked as unchanged. */ - if (backup_mode == BACKUP_MODE_DIFF_PTRACK && + if ((backup_mode == BACKUP_MODE_DIFF_PTRACK || + backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) && file->pagemap.bitmapsize == PageBitmapIsEmpty && file->exists_in_prev && file->size == prev_size && !file->pagemap_isabsent) { @@ -703,7 +713,9 @@ catchup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpa { rc = fio_copy_pages(to_fullpath, from_fullpath, file, /* send prev backup START_LSN */ - ((backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) && + ((backup_mode == BACKUP_MODE_DIFF_DELTA || + backup_mode == BACKUP_MODE_DIFF_PTRACK || + backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) && file->exists_in_prev) ? sync_lsn : InvalidXLogRecPtr, NONE_COMPRESS, 1, checksum_version, /* send pagemap if any */ @@ -716,7 +728,9 @@ catchup_data_file(pgFile *file, const char *from_fullpath, const char *to_fullpa /* TODO: stop handling errors internally */ rc = copy_pages(to_fullpath, from_fullpath, file, /* send prev backup START_LSN */ - ((backup_mode == BACKUP_MODE_DIFF_DELTA || backup_mode == BACKUP_MODE_DIFF_PTRACK) && + ((backup_mode == BACKUP_MODE_DIFF_DELTA || + backup_mode == BACKUP_MODE_DIFF_PTRACK || + backup_mode == BACKUP_MODE_DIFF_SUMMARIZE) && file->exists_in_prev) ? sync_lsn : InvalidXLogRecPtr, checksum_version, use_pagemap, backup_mode); } diff --git a/src/help.c b/src/help.c index eacef9a4..1183e60c 100644 --- a/src/help.c +++ b/src/help.c @@ -334,7 +334,7 @@ help_backup(void) printf(_(" [--ttl=interval] [--expire-time=timestamp] [--note=text]\n\n")); printf(_(" -B, --backup-path=backup-dir location of the backup storage area\n")); - printf(_(" -b, --backup-mode=backup-mode backup mode=FULL|PAGE|DELTA|PTRACK\n")); + printf(_(" -b, --backup-mode=backup-mode backup mode=FULL|PAGE|DELTA|PTRACK|SUMMARIZE\n")); printf(_(" --instance=instance-name name of the instance\n")); printf(_(" -D, --pgdata=pgdata-path location of the database storage area\n")); printf(_(" -C, --smooth-checkpoint do smooth checkpoint before backup\n")); @@ -1183,7 +1183,7 @@ help_catchup(void) printf(_(" [--dry-run]\n")); printf(_(" [--help]\n\n")); - printf(_(" -b, --backup-mode=catchup-mode catchup mode=FULL|DELTA|PTRACK\n")); + printf(_(" -b, --backup-mode=catchup-mode catchup mode=FULL|DELTA|PTRACK|SUMMARIZE\n")); printf(_(" --stream stream the transaction log (only supported mode)\n")); printf(_(" -S, --slot=SLOTNAME replication slot to use\n")); printf(_(" --temp-slot use temporary replication slot\n")); diff --git a/src/pg_probackup.c b/src/pg_probackup.c index 030d64b0..21876270 100644 --- a/src/pg_probackup.c +++ b/src/pg_probackup.c @@ -843,8 +843,9 @@ main(int argc, char *argv[]) elog(ERROR, "No backup mode specified.\n" "Please specify it either using environment variable BACKUP_MODE or\n" "command line option --backup-mode (-b)"); - if (current.backup_mode != BACKUP_MODE_FULL && current.backup_mode != BACKUP_MODE_DIFF_PTRACK && current.backup_mode != BACKUP_MODE_DIFF_DELTA) - elog(ERROR, "Only \"FULL\", \"PTRACK\" and \"DELTA\" modes are supported with the \"%s\" command", get_subcmd_name(backup_subcmd)); + if (current.backup_mode != BACKUP_MODE_FULL && current.backup_mode != BACKUP_MODE_DIFF_PTRACK && + current.backup_mode != BACKUP_MODE_DIFF_DELTA && current.backup_mode != BACKUP_MODE_DIFF_SUMMARIZE) + elog(ERROR, "Only \"FULL\", \"PTRACK\", \"DELTA\" and \"SUMMARIZE\" modes are supported with the \"%s\" command", get_subcmd_name(backup_subcmd)); if (!stream_wal) elog(INFO, "--stream is required, forcing stream mode"); current.stream = stream_wal = true; diff --git a/src/pg_probackup.h b/src/pg_probackup.h index e5d03495..6bcc4a04 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -341,6 +341,7 @@ typedef enum BackupMode BACKUP_MODE_DIFF_PAGE, /* incremental page backup */ BACKUP_MODE_DIFF_PTRACK, /* incremental page backup with ptrack system */ BACKUP_MODE_DIFF_DELTA, /* incremental page backup with lsn comparison */ + BACKUP_MODE_DIFF_SUMMARIZE, /* incremental page backup using PostgreSQL native WAL summarize */ BACKUP_MODE_FULL /* full backup */ } BackupMode; @@ -1270,6 +1271,14 @@ extern XLogRecPtr get_last_ptrack_lsn(PGconn *backup_conn, PGNodeInfo *nodeInfo) extern parray * pg_ptrack_get_pagemapset(PGconn *backup_conn, const char *ptrack_schema, int ptrack_version_num, XLogRecPtr lsn); +/* in walsummary.c */ +extern void make_pagemap_from_walsummary(parray* files, PGconn* backup_conn, + XLogRecPtr start_lsn, XLogRecPtr end_lsn, + TimeLineID tli); +extern bool pg_is_walsummary_enabled(PGconn *backup_conn); +extern XLogRecPtr get_walsummary_summarized_lsn(PGconn *backup_conn); +extern bool wait_wal_summarization(PGconn *backup_conn, XLogRecPtr target_lsn); + /* open local file to writing */ extern FILE* open_local_file_rw(const char *to_fullpath, char **out_buf, uint32 buf_size); diff --git a/src/validate.c b/src/validate.c index 3bff3f75..2982b7f8 100644 --- a/src/validate.c +++ b/src/validate.c @@ -111,7 +111,8 @@ pgBackupValidate(pgBackup *backup, pgRestoreParams *params) if (backup->backup_mode != BACKUP_MODE_FULL && backup->backup_mode != BACKUP_MODE_DIFF_PAGE && backup->backup_mode != BACKUP_MODE_DIFF_PTRACK && - backup->backup_mode != BACKUP_MODE_DIFF_DELTA) + backup->backup_mode != BACKUP_MODE_DIFF_DELTA && + backup->backup_mode != BACKUP_MODE_DIFF_SUMMARIZE) elog(WARNING, "Invalid backup_mode of backup %s", backup_id_of(backup)); join_path_components(external_prefix, backup->root_dir, EXTERNAL_DIR); diff --git a/src/walsummary.c b/src/walsummary.c new file mode 100644 index 00000000..20ff8c6c --- /dev/null +++ b/src/walsummary.c @@ -0,0 +1,526 @@ +/*------------------------------------------------------------------------- + * + * walsummary.c: support functions for WAL summarize backups + * + * Uses PostgreSQL 17+ native WAL summarize feature for incremental backup. + * + * Portions Copyright (c) 2025, Postgres Professional + * + *------------------------------------------------------------------------- + */ + +#include "pg_probackup.h" + +#if PG_VERSION_NUM < 110000 +#include "catalog/catalog.h" +#endif +#include "catalog/pg_tablespace.h" + +/* + * Check if the server has WAL summarize enabled. + * Returns true if summarize_wal is enabled, false otherwise. + */ +bool +pg_is_walsummary_enabled(PGconn *backup_conn) +{ + PGresult *res; + char *setting; + bool enabled = false; + + /* Check PostgreSQL version - WAL summarize is only available in PG 17+ */ + res = pgut_execute(backup_conn, + "SELECT setting::text FROM pg_settings WHERE name = 'server_version_num'", + 0, NULL); + + if (PQntuples(res) == 0) + { + PQclear(res); + return false; + } + + { + int version_num = atoi(PQgetvalue(res, 0, 0)); + PQclear(res); + + if (version_num < 170000) + { + elog(WARNING, "WAL summarize backup mode requires PostgreSQL 17 or higher"); + return false; + } + } + + /* Check if summarize_wal is enabled */ + res = pgut_execute(backup_conn, + "SELECT setting FROM pg_settings WHERE name = 'summarize_wal'", + 0, NULL); + + if (PQntuples(res) == 0) + { + PQclear(res); + return false; + } + + setting = PQgetvalue(res, 0, 0); + + if (strcmp(setting, "on") == 0) + enabled = true; + + PQclear(res); + return enabled; +} + +/* + * Get the current summarized LSN from the WAL summarizer. + * Returns the LSN that has been summarized to disk, or InvalidXLogRecPtr if not available. + */ +XLogRecPtr +get_walsummary_summarized_lsn(PGconn *backup_conn) +{ + PGresult *res; + XLogRecPtr summarized_lsn = InvalidXLogRecPtr; + uint32 hi, + lo; + + /* Check if we can get the summarizer state */ + res = pgut_execute(backup_conn, + "SELECT summarized_lsn FROM pg_get_wal_summarizer_state() " + "WHERE summarized_lsn IS NOT NULL", + 0, NULL); + + if (PQntuples(res) == 0) + { + PQclear(res); + return InvalidXLogRecPtr; + } + + if (sscanf(PQgetvalue(res, 0, 0), "%X/%X", &hi, &lo) == 2) + summarized_lsn = ((uint64) hi) << 32 | lo; + + PQclear(res); + return summarized_lsn; +} + +/* + * Wait for the WAL summarizer to catch up to the specified LSN. + * + * This is important after backup completes to ensure that all WAL up to + * the backup stop_lsn has been summarized, so the next incremental backup + * will have complete information. + * + * Parameters: + * backup_conn - connection to the PostgreSQL server + * target_lsn - the LSN we need the summarizer to reach + * + * Returns true if summarizer caught up, false if it was disabled or failed + */ +bool +wait_wal_summarization(PGconn *backup_conn, XLogRecPtr target_lsn) +{ + int wait_seconds = 0; + int max_wait_seconds = 60; /* Maximum wait time: 60 seconds */ + int check_interval = 1; /* Check every second */ + XLogRecPtr last_summarized_lsn = InvalidXLogRecPtr; + time_t start_time; + char target_lsn_str[17 + 1]; + + if (XLogRecPtrIsInvalid(target_lsn)) + return true; /* Nothing to wait for */ + + snprintf(target_lsn_str, sizeof(target_lsn_str), "%X/%X", + (uint32) (target_lsn >> 32), (uint32) target_lsn); + + time(&start_time); + elog(INFO, "Waiting for WAL summarizer to catch up to %s", target_lsn_str); + + while (wait_seconds < max_wait_seconds) + { + PGresult *res; + XLogRecPtr summarized_lsn = InvalidXLogRecPtr; + uint32 hi, lo; + bool summarizer_running = false; + + /* Check summarizer state */ + res = pgut_execute(backup_conn, + "SELECT summarized_lsn, pending_lsn, summarizer_pid " + "FROM pg_get_wal_summarizer_state()", + 0, NULL); + + if (PQntuples(res) > 0) + { + /* Check if summarizer is running */ + if (!PQgetisnull(res, 0, 2)) + { + int pid = atoi(PQgetvalue(res, 0, 2)); + summarizer_running = (pid > 0); + } + + /* Get summarized LSN */ + if (!PQgetisnull(res, 0, 0)) + { + if (sscanf(PQgetvalue(res, 0, 0), "%X/%X", &hi, &lo) == 2) + summarized_lsn = ((uint64) hi) << 32 | lo; + } + } + + PQclear(res); + + /* Check if summarizer is disabled */ + if (!summarizer_running) + { + elog(WARNING, "WAL summarizer is not running"); + return false; + } + + /* Check if we've caught up */ + if (!XLogRecPtrIsInvalid(summarized_lsn) && summarized_lsn >= target_lsn) + { + elog(INFO, "WAL summarizer has caught up to %s after %d seconds", + target_lsn_str, wait_seconds); + return true; + } + + /* Show progress if LSN has advanced */ + if (!XLogRecPtrIsInvalid(summarized_lsn) && + XLogRecPtrIsInvalid(last_summarized_lsn)) + { + char current_lsn_str[17 + 1]; + snprintf(current_lsn_str, sizeof(current_lsn_str), "%X/%X", + (uint32) (summarized_lsn >> 32), (uint32) summarized_lsn); + elog(INFO, "WAL summarizer at %s, waiting to reach %s...", + current_lsn_str, target_lsn_str); + } + + last_summarized_lsn = summarized_lsn; + + /* Wait before checking again */ + usleep(check_interval * 1000000); /* microseconds */ + wait_seconds++; + } + + /* Timeout */ + elog(WARNING, "Timed out waiting for WAL summarizer to catch up to %s after %d seconds", + target_lsn_str, max_wait_seconds); + return false; +} + +/* + * Structure to hold changed block information for a file + */ +typedef struct BlockMapEntry +{ + Oid dbOid; + Oid tblspcOid; + Oid relOid; + ForkName forkName; + parray *blocknums; /* array of BlockNumber */ +} BlockMapEntry; + +/* + * Compare function for sorting BlockMapEntry by (dbOid, tblspcOid, relOid, forkName) + */ +static int +blockmap_compare(const void *a, const void *b) +{ + const BlockMapEntry *ea = *(const BlockMapEntry **) a; + const BlockMapEntry *eb = *(const BlockMapEntry **) b; + + if (ea->dbOid != eb->dbOid) + return ea->dbOid < eb->dbOid ? -1 : 1; + if (ea->tblspcOid != eb->tblspcOid) + return ea->tblspcOid < eb->tblspcOid ? -1 : 1; + if (ea->relOid != eb->relOid) + return ea->relOid < eb->relOid ? -1 : 1; + return ea->forkName - eb->forkName; +} + +/* + * Build page maps from WAL summary information. + * + * This function queries pg_available_wal_summaries() to get available summary files, + * then queries pg_wal_summary_contents() for each file to get the list of + * changed blocks between start_lsn and end_lsn, and builds pagemap bitmaps for each data file. + * + * IMPORTANT: pg_wal_summary_contents() must run on the server where the data directory + * exists. This function works for local backups. For remote backups, pg_probackup's + * remote connection handling ensures the query runs on the server side. + * + * Parameters: + * files - array of pgFile structures to update + * backup_conn - connection to the PostgreSQL server + * start_lsn - start LSN of the range to query + * end_lsn - end LSN of the range to query + * tli - timeline ID + */ +void +make_pagemap_from_walsummary(parray *files, + PGconn *backup_conn, + XLogRecPtr start_lsn, + XLogRecPtr end_lsn, + TimeLineID tli) +{ + PGresult *res; + int i; + int file_i; + char start_lsn_str[17 + 1]; + char end_lsn_str[17 + 1]; + char query[1024]; + parray *blockmap_list; + int total_blocks = 0; + int num_summaries; + + /* Build LSN strings for the query */ + snprintf(start_lsn_str, sizeof(start_lsn_str), "%X/%X", + (uint32) (start_lsn >> 32), (uint32) start_lsn); + snprintf(end_lsn_str, sizeof(end_lsn_str), "%X/%X", + (uint32) (end_lsn >> 32), (uint32) end_lsn); + + elog(INFO, "Querying WAL summaries from %s to %s on timeline %u", + start_lsn_str, end_lsn_str, tli); + + /* + * First, get all available WAL summary files that overlap with our range + * We query summaries that intersect with [start_lsn, end_lsn] + * + * Note: WAL summary files use half-open interval semantics [start_lsn, end_lsn), + * meaning they include changes from start_lsn up to but NOT including end_lsn. + * Therefore we use > instead of >= for the end_lsn comparison to exclude + * summary files that end exactly at our start LSN. + */ + snprintf(query, sizeof(query), + "SELECT tli, start_lsn::text, end_lsn::text " + "FROM pg_available_wal_summaries() " + "WHERE tli = %u " + "AND end_lsn > '%s'::pg_lsn " + "AND start_lsn < '%s'::pg_lsn " + "ORDER BY start_lsn", + tli, start_lsn_str, end_lsn_str); + + res = pgut_execute(backup_conn, query, 0, NULL); + + if (PQntuples(res) == 0) + { + PQclear(res); + elog(WARNING, "No WAL summaries found for the given range"); + return; + } + + num_summaries = PQntuples(res); + elog(INFO, "Found %d WAL summary files covering the requested range", num_summaries); + + /* For each summary file, get the changed blocks */ + blockmap_list = parray_new(); + + for (i = 0; i < num_summaries; i++) + { + char *summary_start_lsn = PQgetvalue(res, i, 1); + char *summary_end_lsn = PQgetvalue(res, i, 2); + PGresult *block_res; + int j; + + elog(VERBOSE, "Processing summary file: %s to %s", + summary_start_lsn, summary_end_lsn); + + /* + * Calculate the overlap range between: + * - Summary file range: [summary_start_lsn, summary_end_lsn] + * - Requested range: [start_lsn, end_lsn] + * We want to query the intersection of these ranges. + */ + snprintf(query, sizeof(query), + "SELECT " + "reldatabase, " + "reltablespace, " + "relfilenode, " + "relforknumber, " + "relblocknumber " + "FROM pg_wal_summary_contents(" + "%u, GREATEST('%s'::pg_lsn, '%s'::pg_lsn), " + "LEAST('%s'::pg_lsn, '%s'::pg_lsn)) " + "WHERE NOT is_limit_block " + "ORDER BY reldatabase, reltablespace, relfilenode, relforknumber, relblocknumber", + tli, + summary_start_lsn, start_lsn_str, + summary_end_lsn, end_lsn_str); + + block_res = pgut_execute(backup_conn, query, 0, NULL); + + if (PQntuples(block_res) == 0) + continue; + + elog(VERBOSE, "Found %d changed blocks in this summary", PQntuples(block_res)); + + /* Process all blocks from this summary file */ + for (j = 0; j < PQntuples(block_res);) + { + Oid curr_db_oid; + Oid curr_tblspc_oid; + Oid curr_relfilenode; + int curr_fork_number; + ForkName curr_fork_name; + + curr_db_oid = atoi(PQgetvalue(block_res, j, 0)); + curr_tblspc_oid = atoi(PQgetvalue(block_res, j, 1)); + curr_relfilenode = atoi(PQgetvalue(block_res, j, 2)); + curr_fork_number = atoi(PQgetvalue(block_res, j, 3)); + + /* Map fork number to ForkName */ + switch (curr_fork_number) + { + case 0: + curr_fork_name = none; /* main fork */ + break; + case 1: + curr_fork_name = fsm; + break; + case 2: + curr_fork_name = vm; + break; + case 3: + curr_fork_name = init; + break; + default: + elog(WARNING, "Unknown fork number %d for relfilenode %u", + curr_fork_number, curr_relfilenode); + /* Skip to next file */ + while (j < PQntuples(block_res) && + atoi(PQgetvalue(block_res, j, 0)) == curr_db_oid && + atoi(PQgetvalue(block_res, j, 1)) == curr_tblspc_oid && + atoi(PQgetvalue(block_res, j, 2)) == curr_relfilenode && + atoi(PQgetvalue(block_res, j, 3)) == curr_fork_number) + j++; + continue; + } + + /* Check if we already have an entry for this file */ + BlockMapEntry key; + BlockMapEntry **found_entry; + BlockMapEntry *map = NULL; + + key.dbOid = curr_db_oid; + key.tblspcOid = curr_tblspc_oid; + key.relOid = curr_relfilenode; + key.forkName = curr_fork_name; + key.blocknums = NULL; + + found_entry = (BlockMapEntry **) parray_bsearch(blockmap_list, &key, blockmap_compare); + if (found_entry) + map = *found_entry; + + /* Create new entry if not found */ + if (!map) + { + map = pgut_malloc(sizeof(BlockMapEntry)); + map->dbOid = curr_db_oid; + map->tblspcOid = curr_tblspc_oid; + map->relOid = curr_relfilenode; + map->forkName = curr_fork_name; + map->blocknums = parray_new(); + parray_append(blockmap_list, map); + } + + /* Collect all blocks for this file */ + while (j < PQntuples(block_res)) + { + Oid db_oid = atoi(PQgetvalue(block_res, j, 0)); + Oid tblspc_oid = atoi(PQgetvalue(block_res, j, 1)); + Oid relfilenode = atoi(PQgetvalue(block_res, j, 2)); + int fork_number = atoi(PQgetvalue(block_res, j, 3)); + BlockNumber block_number = atoi(PQgetvalue(block_res, j, 4)); + + if (db_oid != curr_db_oid || + tblspc_oid != curr_tblspc_oid || + relfilenode != curr_relfilenode || + fork_number != curr_fork_number) + break; + + BlockNumber *blk = palloc(sizeof(BlockNumber)); + *blk = block_number; + parray_append(map->blocknums, blk); + total_blocks++; + j++; + } + } + + PQclear(block_res); + } + + PQclear(res); + + elog(INFO, "Mapped %d changed blocks to %d files", total_blocks, parray_num(blockmap_list)); + + /* Sort the blockmap list for binary search */ + if (parray_num(blockmap_list) > 0) + parray_qsort(blockmap_list, blockmap_compare); + + /* Iterate over files and match with WAL summary data */ + for (file_i = 0; file_i < parray_num(files); file_i++) + { + pgFile *file = (pgFile *) parray_get(files, file_i); + int j; + + if (!file->is_datafile || file->is_cfs) + continue; + if (file->external_dir_num != 0) + continue; + + /* Binary search for matching blockmap entry */ + BlockMapEntry key; + BlockMapEntry **found_entry; + BlockMapEntry *map = NULL; + + key.dbOid = file->dbOid; + key.tblspcOid = file->tblspcOid; + key.relOid = file->relOid; + key.forkName = file->forkName; + key.blocknums = NULL; + + found_entry = (BlockMapEntry **) parray_bsearch(blockmap_list, &key, blockmap_compare); + if (found_entry) + map = *found_entry; + + /* Found matching entry */ + if (map && parray_num(map->blocknums) > 0) + { + int nblocks; + + elog(VERBOSE, "Building pagemap for file \"%s\" with %d changed blocks", + file->rel_path, parray_num(map->blocknums)); + + /* Determine file size in blocks */ + if (file->size == 0) + nblocks = 0; + else + nblocks = (file->size + BLCKSZ - 1) / BLCKSZ; + + /* Initialize pagemap */ + file->pagemap.bitmapsize = 0; + file->pagemap.bitmap = NULL; + + /* Set bits for changed blocks */ + for (j = 0; j < parray_num(map->blocknums); j++) + { + BlockNumber blknum = *(BlockNumber *) parray_get(map->blocknums, j); + + if (blknum < nblocks) + datapagemap_add(&file->pagemap, blknum); + else + elog(WARNING, "Block number %u exceeds file size (%d blocks) for \"%s\"", + blknum, nblocks, file->rel_path); + } + } + } + + /* Free the blockmap list */ + for (i = 0; i < parray_num(blockmap_list); i++) + { + BlockMapEntry *entry = (BlockMapEntry *) parray_get(blockmap_list, i); + int j; + + for (j = 0; j < parray_num(entry->blocknums); j++) + pfree(parray_get(entry->blocknums, j)); + + parray_free(entry->blocknums); + pfree(entry); + } + parray_free(blockmap_list); +}