diff --git a/src/backup.c b/src/backup.c index 46e3ba482..21df1d95e 100644 --- a/src/backup.c +++ b/src/backup.c @@ -27,7 +27,7 @@ //const char *progname = "pg_probackup"; /* list of files contained in backup */ -static parray *backup_files_list = NULL; +parray *backup_files_list = NULL; /* We need critical section for datapagemap_add() in case of using threads */ static pthread_mutex_t backup_pagemap_mutex = PTHREAD_MUTEX_INITIALIZER; @@ -36,21 +36,7 @@ static pthread_mutex_t backup_pagemap_mutex = PTHREAD_MUTEX_INITIALIZER; bool exclusive_backup = false; /* Is pg_start_backup() was executed */ -static bool backup_in_progress = false; - -struct pg_stop_backup_result { - /* - * We will use values of snapshot_xid and invocation_time if there are - * no transactions between start_lsn and stop_lsn. - */ - TransactionId snapshot_xid; - time_t invocation_time; - XLogRecPtr lsn; - size_t backup_label_content_len; - char *backup_label_content; - size_t tablespace_map_content_len; - char *tablespace_map_content; -}; +bool backup_in_progress = false; /* * Backup routines @@ -62,18 +48,9 @@ static void *backup_files(void *arg); static void do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, PGNodeInfo *nodeInfo, bool no_sync, bool backup_logs); -static void pg_start_backup(InstanceState *instanceState, const char *label, bool smooth, pgBackup *backup, - PGNodeInfo *nodeInfo, PGconn *conn); static void pg_switch_wal(PGconn *conn); -static void pg_silent_client_messages(PGconn *conn); -static void pg_create_restore_point(PGconn *conn, time_t backup_start_time); static void pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startbackup_conn, PGNodeInfo *nodeInfo); -static void pg_stop_backup_send(PGconn *conn, int server_version, bool is_started_on_replica, bool is_exclusive, char **query_text); - -static XLogRecPtr wait_wal_lsn(InstanceState *instanceState, XLogRecPtr lsn, bool is_start_lsn, TimeLineID tli, - bool in_prev_segment, bool segment_only, - int timeout_elevel, bool in_stream_dir, pgBackup *backup); static void check_external_for_tablespaces(parray *external_list, PGconn *backup_conn); @@ -83,19 +60,19 @@ static parray *get_database_map(PGconn *pg_startbackup_conn); static bool pgpro_support(PGconn *conn); /* Check functions */ -static bool pg_checksum_enable(PGconn *conn); +static bool pg_is_checksum_enabled(PGconn *conn); static bool pg_is_in_recovery(PGconn *conn); static bool pg_is_superuser(PGconn *conn); static void check_server_version(PGconn *conn, PGNodeInfo *nodeInfo); static void confirm_block_size(PGconn *conn, const char *name, int blcksz); static void set_cfs_datafiles(parray *files, const char *root, char *relative, size_t i); -static StopBackupCallbackState stop_callback_state; +static StopBackupCallbackParams stop_callback_params; static void backup_stopbackup_callback(bool fatal, void *userdata) { - StopBackupCallbackState *st = (StopBackupCallbackState *) userdata; + StopBackupCallbackParams *st = (StopBackupCallbackParams *) userdata; /* * If backup is in progress, notify stop of backup to PostgreSQL */ @@ -158,7 +135,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, strlen(" with pg_probackup")); /* Call pg_start_backup function in PostgreSQL connect */ - pg_start_backup(instanceState, label, smooth_checkpoint, ¤t, nodeInfo, backup_conn); + pg_start_backup(label, smooth_checkpoint, ¤t, nodeInfo, backup_conn); /* Obtain current timeline */ #if PG_VERSION_NUM >= 90600 @@ -214,11 +191,11 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, if (prev_backup) { - if (parse_program_version(prev_backup->program_version) > parse_program_version(PROGRAM_VERSION)) - elog(ERROR, "pg_probackup binary version is %s, but backup %s version is %s. " - "pg_probackup do not guarantee to be forward compatible. " - "Please upgrade pg_probackup binary.", - PROGRAM_VERSION, base36enc(prev_backup->start_time), prev_backup->program_version); + if (parse_program_version(prev_backup->program_version) > parse_program_version(PROGRAM_VERSION)) + elog(ERROR, "pg_probackup binary version is %s, but backup %s version is %s. " + "pg_probackup do not guarantee to be forward compatible. " + "Please upgrade pg_probackup binary.", + PROGRAM_VERSION, base36enc(prev_backup->start_time), prev_backup->program_version); elog(INFO, "Parent backup: %s", base36enc(prev_backup->start_time)); @@ -282,7 +259,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, write_backup(¤t, true); /* In PAGE mode or in ARCHIVE wal-mode wait for current segment */ - if (current.backup_mode == BACKUP_MODE_DIFF_PAGE || !stream_wal) + if (current.backup_mode == BACKUP_MODE_DIFF_PAGE || !current.stream) { /* Check that archive_dir can be reached */ if (fio_access(instanceState->instance_wal_subdir_path, F_OK, FIO_BACKUP_HOST) != 0) @@ -294,11 +271,11 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, * Because WAL streaming will start after pg_start_backup() in stream * mode. */ - wait_wal_lsn(instanceState, current.start_lsn, true, current.tli, false, true, ERROR, false, ¤t); + wait_wal_lsn(instanceState->instance_wal_subdir_path, current.start_lsn, true, current.tli, false, true, ERROR, false); } /* start stream replication */ - if (stream_wal) + if (current.stream) { join_path_components(dst_backup_path, current.database_dir, PG_XLOG_DIR); fio_mkdir(dst_backup_path, DIR_PERMISSION, FIO_BACKUP_HOST); @@ -310,7 +287,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, * PAGE backup in stream mode is waited twice, first for * segment in WAL archive and then for streamed segment */ - wait_wal_lsn(instanceState, current.start_lsn, true, current.tli, false, true, ERROR, true, ¤t); + wait_wal_lsn(dst_backup_path, current.start_lsn, true, current.tli, false, true, ERROR, true); } /* initialize backup's file list */ @@ -453,7 +430,7 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, } /* - * Make directories before backup and setup threads at the same time + * Make directories before backup */ for (i = 0; i < parray_num(backup_files_list); i++) { @@ -478,10 +455,11 @@ do_backup_pg(InstanceState *instanceState, PGconn *backup_conn, fio_mkdir(dirpath, DIR_PERMISSION, FIO_BACKUP_HOST); } - /* setup threads */ - pg_atomic_clear_flag(&file->lock); } + /* setup thread locks */ + pfilearray_clear_locks(backup_files_list); + /* Sort by size for load balancing */ parray_qsort(backup_files_list, pgFileCompareSize); /* Sort the array for binary search */ @@ -728,7 +706,7 @@ pgdata_basic_setup(ConnectionOptions conn_opt, PGNodeInfo *nodeInfo) /* Confirm that this server version is supported */ check_server_version(cur_conn, nodeInfo); - if (pg_checksum_enable(cur_conn)) + if (pg_is_checksum_enabled(cur_conn)) current.checksum_version = 1; else current.checksum_version = 0; @@ -1058,8 +1036,8 @@ confirm_block_size(PGconn *conn, const char *name, int blcksz) /* * Notify start of backup to PostgreSQL server. */ -static void -pg_start_backup(InstanceState *instanceState, const char *label, bool smooth, pgBackup *backup, +void +pg_start_backup(const char *label, bool smooth, pgBackup *backup, PGNodeInfo *nodeInfo, PGconn *conn) { PGresult *res; @@ -1088,9 +1066,9 @@ pg_start_backup(InstanceState *instanceState, const char *label, bool smooth, pg * is necessary to call pg_stop_backup() in backup_cleanup(). */ backup_in_progress = true; - stop_callback_state.conn = conn; - stop_callback_state.server_version = nodeInfo->server_version; - pgut_atexit_push(backup_stopbackup_callback, &stop_callback_state); + stop_callback_params.conn = conn; + stop_callback_params.server_version = nodeInfo->server_version; + pgut_atexit_push(backup_stopbackup_callback, &stop_callback_params); /* Extract timeline and LSN from results of pg_start_backup() */ XLogDataFromLSN(PQgetvalue(res, 0, 0), &lsn_hi, &lsn_lo); @@ -1099,7 +1077,7 @@ pg_start_backup(InstanceState *instanceState, const char *label, bool smooth, pg PQclear(res); - if ((!stream_wal || current.backup_mode == BACKUP_MODE_DIFF_PAGE) && + if ((!backup->stream || backup->backup_mode == BACKUP_MODE_DIFF_PAGE) && !backup->from_replica && !(nodeInfo->server_version < 90600 && !nodeInfo->is_superuser)) @@ -1218,7 +1196,7 @@ get_database_map(PGconn *conn) /* Check if ptrack is enabled in target instance */ static bool -pg_checksum_enable(PGconn *conn) +pg_is_checksum_enabled(PGconn *conn) { PGresult *res_db; @@ -1284,7 +1262,7 @@ pg_is_superuser(PGconn *conn) * previous segment. * * Flag 'in_stream_dir' determine whether we looking for WAL in 'pg_wal' directory or - * in archive. Do note, that we cannot rely sorely on global variable 'stream_wal' because, + * in archive. Do note, that we cannot rely sorely on global variable 'stream_wal' (current.stream) because, * for example, PAGE backup must(!) look for start_lsn in archive regardless of wal_mode. * * 'timeout_elevel' determine the elevel for timeout elog message. If elevel lighter than @@ -1293,15 +1271,13 @@ pg_is_superuser(PGconn *conn) * Returns target LSN if such is found, failing that returns LSN of record prior to target LSN. * Returns InvalidXLogRecPtr if 'segment_only' flag is used. */ -static XLogRecPtr -wait_wal_lsn(InstanceState *instanceState, XLogRecPtr target_lsn, bool is_start_lsn, TimeLineID tli, +XLogRecPtr +wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr target_lsn, bool is_start_lsn, TimeLineID tli, bool in_prev_segment, bool segment_only, - int timeout_elevel, bool in_stream_dir, pgBackup *backup) + int timeout_elevel, bool in_stream_dir) { XLogSegNo targetSegNo; - char pg_wal_dir[MAXPGPATH]; char wal_segment_path[MAXPGPATH], - *wal_segment_dir, wal_segment[MAXFNAMELEN]; bool file_exists = false; uint32 try_count = 0, @@ -1319,6 +1295,7 @@ wait_wal_lsn(InstanceState *instanceState, XLogRecPtr target_lsn, bool is_start_ GetXLogFileName(wal_segment, tli, targetSegNo, instance_config.xlog_seg_size); + join_path_components(wal_segment_path, wal_segment_dir, wal_segment); /* * In pg_start_backup we wait for 'target_lsn' in 'pg_wal' directory if it is * stream and non-page backup. Page backup needs archived WAL files, so we @@ -1326,17 +1303,6 @@ wait_wal_lsn(InstanceState *instanceState, XLogRecPtr target_lsn, bool is_start_ * * In pg_stop_backup it depends only on stream_wal. */ - if (in_stream_dir) - { - join_path_components(pg_wal_dir, backup->database_dir, PG_XLOG_DIR); - join_path_components(wal_segment_path, pg_wal_dir, wal_segment); - wal_segment_dir = pg_wal_dir; - } - else - { - join_path_components(wal_segment_path, instanceState->instance_wal_subdir_path, wal_segment); - wal_segment_dir = instanceState->instance_wal_subdir_path; - } /* TODO: remove this in 3.0 (it is a cludge against some old bug with archive_timeout) */ if (instance_config.archive_timeout > 0) @@ -1442,7 +1408,7 @@ wait_wal_lsn(InstanceState *instanceState, XLogRecPtr target_lsn, bool is_start_ wal_delivery_str, wal_segment_path); } - if (!stream_wal && is_start_lsn && try_count == 30) + if (!current.stream && is_start_lsn && try_count == 30) elog(WARNING, "By default pg_probackup assume WAL delivery method to be ARCHIVE. " "If continuous archiving is not set up, use '--stream' option to make autonomous backup. " "Otherwise check that continuous archiving works correctly."); @@ -1466,8 +1432,144 @@ wait_wal_lsn(InstanceState *instanceState, XLogRecPtr target_lsn, bool is_start_ } } +/* + * Check stop_lsn (returned from pg_stop_backup()) and update backup->stop_lsn + */ +void +wait_wal_and_calculate_stop_lsn(const char *xlog_path, XLogRecPtr stop_lsn, pgBackup *backup) +{ + bool stop_lsn_exists = false; + + /* It is ok for replica to return invalid STOP LSN + * UPD: Apparently it is ok even for a master. + */ + if (!XRecOffIsValid(stop_lsn)) + { + XLogSegNo segno = 0; + XLogRecPtr lsn_tmp = InvalidXLogRecPtr; + + /* + * Even though the value is invalid, it's expected postgres behaviour + * and we're trying to fix it below. + */ + elog(LOG, "Invalid offset in stop_lsn value %X/%X, trying to fix", + (uint32) (stop_lsn >> 32), (uint32) (stop_lsn)); + + /* + * Note: even with gdb it is very hard to produce automated tests for + * contrecord + invalid LSN, so emulate it for manual testing. + */ + //lsn = lsn - XLOG_SEG_SIZE; + //elog(WARNING, "New Invalid stop_backup_lsn value %X/%X", + // (uint32) (stop_lsn >> 32), (uint32) (stop_lsn)); + + GetXLogSegNo(stop_lsn, segno, instance_config.xlog_seg_size); + + /* + * Note, that there is no guarantee that corresponding WAL file even exists. + * Replica may return LSN from future and keep staying in present. + * Or it can return invalid LSN. + * + * That's bad, since we want to get real LSN to save it in backup label file + * and to use it in WAL validation. + * + * So we try to do the following: + * 1. Wait 'archive_timeout' seconds for segment containing stop_lsn and + * look for the first valid record in it. + * It solves the problem of occasional invalid LSN on write-busy system. + * 2. Failing that, look for record in previous segment with endpoint + * equal or greater than stop_lsn. It may(!) solve the problem of invalid LSN + * on write-idle system. If that fails too, error out. + */ + + /* stop_lsn is pointing to a 0 byte of xlog segment */ + if (stop_lsn % instance_config.xlog_seg_size == 0) + { + /* Wait for segment with current stop_lsn, it is ok for it to never arrive */ + wait_wal_lsn(xlog_path, stop_lsn, false, backup->tli, + false, true, WARNING, backup->stream); + + /* Get the first record in segment with current stop_lsn */ + lsn_tmp = get_first_record_lsn(xlog_path, segno, backup->tli, + instance_config.xlog_seg_size, + instance_config.archive_timeout); + + /* Check that returned LSN is valid and greater than stop_lsn */ + if (XLogRecPtrIsInvalid(lsn_tmp) || + !XRecOffIsValid(lsn_tmp) || + lsn_tmp < stop_lsn) + { + /* Backup from master should error out here */ + if (!backup->from_replica) + elog(ERROR, "Failed to get next WAL record after %X/%X", + (uint32) (stop_lsn >> 32), + (uint32) (stop_lsn)); + + /* No luck, falling back to looking up for previous record */ + elog(WARNING, "Failed to get next WAL record after %X/%X, " + "looking for previous WAL record", + (uint32) (stop_lsn >> 32), + (uint32) (stop_lsn)); + + /* Despite looking for previous record there is not guarantee of success + * because previous record can be the contrecord. + */ + lsn_tmp = wait_wal_lsn(xlog_path, stop_lsn, false, backup->tli, + true, false, ERROR, backup->stream); + + /* sanity */ + if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp)) + elog(ERROR, "Failed to get WAL record prior to %X/%X", + (uint32) (stop_lsn >> 32), + (uint32) (stop_lsn)); + } + } + /* stop lsn is aligned to xlog block size, just find next lsn */ + else if (stop_lsn % XLOG_BLCKSZ == 0) + { + /* Wait for segment with current stop_lsn */ + wait_wal_lsn(xlog_path, stop_lsn, false, backup->tli, + false, true, ERROR, backup->stream); + + /* Get the next closest record in segment with current stop_lsn */ + lsn_tmp = get_next_record_lsn(xlog_path, segno, backup->tli, + instance_config.xlog_seg_size, + instance_config.archive_timeout, + stop_lsn); + + /* sanity */ + if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp)) + elog(ERROR, "Failed to get WAL record next to %X/%X", + (uint32) (stop_lsn >> 32), + (uint32) (stop_lsn)); + } + /* PostgreSQL returned something very illegal as STOP_LSN, error out */ + else + elog(ERROR, "Invalid stop_backup_lsn value %X/%X", + (uint32) (stop_lsn >> 32), (uint32) (stop_lsn)); + + /* Setting stop_backup_lsn will set stop point for streaming */ + stop_backup_lsn = lsn_tmp; + stop_lsn_exists = true; + } + + elog(LOG, "stop_lsn: %X/%X", + (uint32) (stop_lsn >> 32), (uint32) (stop_lsn)); + + /* + * Wait for stop_lsn to be archived or streamed. + * If replica returned valid STOP_LSN of not actually existing record, + * look for previous record with endpoint >= STOP_LSN. + */ + if (!stop_lsn_exists) + stop_backup_lsn = wait_wal_lsn(xlog_path, stop_lsn, false, backup->tli, + false, false, ERROR, backup->stream); + + backup->stop_lsn = stop_backup_lsn; +} + /* Remove annoying NOTICE messages generated by backend */ -static void +void pg_silent_client_messages(PGconn *conn) { PGresult *res; @@ -1476,7 +1578,7 @@ pg_silent_client_messages(PGconn *conn) PQclear(res); } -static void +void pg_create_restore_point(PGconn *conn, time_t backup_start_time) { PGresult *res; @@ -1573,7 +1675,7 @@ pg_stop_backup_send(PGconn *conn, int server_version, bool is_started_on_replica elog(ERROR, "Failed to send pg_stop_backup query"); /* After we have sent pg_stop_backup, we don't need this callback anymore */ - pgut_atexit_pop(backup_stopbackup_callback, &stop_callback_state); + pgut_atexit_pop(backup_stopbackup_callback, &stop_callback_params); if (query_text) *query_text = pgut_strdup(stop_backup_query); @@ -1586,10 +1688,10 @@ pg_stop_backup_send(PGconn *conn, int server_version, bool is_started_on_replica * parameters: * - */ -static void +void pg_stop_backup_consume(PGconn *conn, int server_version, bool is_exclusive, uint32 timeout, const char *query_text, - struct pg_stop_backup_result *result) + PGStopBackupResult *result) { PGresult *query_result; uint32 pg_stop_backup_timeout = 0; @@ -1717,7 +1819,7 @@ pg_stop_backup_consume(PGconn *conn, int server_version, /* * helper routine used to write backup_label and tablespace_map in pg_stop_backup() */ -static void +void pg_stop_backup_write_file_helper(const char *path, const char *filename, const char *error_msg_filename, const void *data, size_t len, parray *file_list) { @@ -1738,7 +1840,7 @@ pg_stop_backup_write_file_helper(const char *path, const char *filename, const c error_msg_filename, full_filename, strerror(errno)); /* - * It's vital to check if backup_files_list is initialized, + * It's vital to check if files_list is initialized, * because we could get here because the backup was interrupted */ if (file_list) @@ -1764,10 +1866,8 @@ static void pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startbackup_conn, PGNodeInfo *nodeInfo) { - PGconn *conn; - bool stop_lsn_exists = false; - struct pg_stop_backup_result stop_backup_result; - char *xlog_path,stream_xlog_path[MAXPGPATH]; + PGStopBackupResult stop_backup_result; + char *xlog_path, stream_xlog_path[MAXPGPATH]; /* kludge against some old bug in archive_timeout. TODO: remove in 3.0.0 */ int timeout = (instance_config.archive_timeout > 0) ? instance_config.archive_timeout : ARCHIVE_TIMEOUT_DEFAULT; @@ -1777,9 +1877,7 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb if (!backup_in_progress) elog(ERROR, "backup is not in progress"); - conn = pg_startbackup_conn; - - pg_silent_client_messages(conn); + pg_silent_client_messages(pg_startbackup_conn); /* Create restore point * Only if backup is from master. @@ -1788,157 +1886,34 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb if (!backup->from_replica && !(nodeInfo->server_version < 90600 && !nodeInfo->is_superuser)) //TODO: check correctness - pg_create_restore_point(conn, backup->start_time); + pg_create_restore_point(pg_startbackup_conn, backup->start_time); /* Execute pg_stop_backup using PostgreSQL connection */ - pg_stop_backup_send(conn, nodeInfo->server_version, current.from_replica, exclusive_backup, &query_text); + pg_stop_backup_send(pg_startbackup_conn, nodeInfo->server_version, backup->from_replica, exclusive_backup, &query_text); /* * Wait for the result of pg_stop_backup(), but no longer than * archive_timeout seconds */ - pg_stop_backup_consume(conn, nodeInfo->server_version, exclusive_backup, timeout, query_text, &stop_backup_result); + pg_stop_backup_consume(pg_startbackup_conn, nodeInfo->server_version, exclusive_backup, timeout, query_text, &stop_backup_result); - /* It is ok for replica to return invalid STOP LSN - * UPD: Apparently it is ok even for a master. - */ - if (!XRecOffIsValid(stop_backup_result.lsn)) + if (backup->stream) { - char *xlog_path, - stream_xlog_path[MAXPGPATH]; - XLogSegNo segno = 0; - XLogRecPtr lsn_tmp = InvalidXLogRecPtr; - - /* - * Even though the value is invalid, it's expected postgres behaviour - * and we're trying to fix it below. - */ - elog(LOG, "Invalid offset in stop_lsn value %X/%X, trying to fix", - (uint32) (stop_backup_result.lsn >> 32), (uint32) (stop_backup_result.lsn)); - - /* - * Note: even with gdb it is very hard to produce automated tests for - * contrecord + invalid LSN, so emulate it for manual testing. - */ - //stop_backup_result.lsn = stop_backup_result.lsn - XLOG_SEG_SIZE; - //elog(WARNING, "New Invalid stop_backup_lsn value %X/%X", - // (uint32) (stop_backup_result.lsn >> 32), (uint32) (stop_backup_result.lsn)); - - if (stream_wal) - { - snprintf(stream_xlog_path, lengthof(stream_xlog_path), - "%s/%s/%s/%s", instanceState->instance_backup_subdir_path, - base36enc(backup->start_time), - DATABASE_DIR, PG_XLOG_DIR); - xlog_path = stream_xlog_path; - } - else - xlog_path = instanceState->instance_wal_subdir_path; - - GetXLogSegNo(stop_backup_result.lsn, segno, instance_config.xlog_seg_size); - - /* - * Note, that there is no guarantee that corresponding WAL file even exists. - * Replica may return LSN from future and keep staying in present. - * Or it can return invalid LSN. - * - * That's bad, since we want to get real LSN to save it in backup label file - * and to use it in WAL validation. - * - * So we try to do the following: - * 1. Wait 'archive_timeout' seconds for segment containing stop_lsn and - * look for the first valid record in it. - * It solves the problem of occasional invalid LSN on write-busy system. - * 2. Failing that, look for record in previous segment with endpoint - * equal or greater than stop_lsn. It may(!) solve the problem of invalid LSN - * on write-idle system. If that fails too, error out. - */ - - /* stop_lsn is pointing to a 0 byte of xlog segment */ - if (stop_backup_result.lsn % instance_config.xlog_seg_size == 0) - { - /* Wait for segment with current stop_lsn, it is ok for it to never arrive */ - wait_wal_lsn(instanceState, stop_backup_result.lsn, false, backup->tli, - false, true, WARNING, stream_wal, backup); - - /* Get the first record in segment with current stop_lsn */ - lsn_tmp = get_first_record_lsn(xlog_path, segno, backup->tli, - instance_config.xlog_seg_size, - instance_config.archive_timeout); - - /* Check that returned LSN is valid and greater than stop_lsn */ - if (XLogRecPtrIsInvalid(lsn_tmp) || - !XRecOffIsValid(lsn_tmp) || - lsn_tmp < stop_backup_result.lsn) - { - /* Backup from master should error out here */ - if (!backup->from_replica) - elog(ERROR, "Failed to get next WAL record after %X/%X", - (uint32) (stop_backup_result.lsn >> 32), - (uint32) (stop_backup_result.lsn)); - - /* No luck, falling back to looking up for previous record */ - elog(WARNING, "Failed to get next WAL record after %X/%X, " - "looking for previous WAL record", - (uint32) (stop_backup_result.lsn >> 32), - (uint32) (stop_backup_result.lsn)); - - /* Despite looking for previous record there is not guarantee of success - * because previous record can be the contrecord. - */ - lsn_tmp = wait_wal_lsn(instanceState, stop_backup_result.lsn, false, backup->tli, - true, false, ERROR, stream_wal, backup); - - /* sanity */ - if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp)) - elog(ERROR, "Failed to get WAL record prior to %X/%X", - (uint32) (stop_backup_result.lsn >> 32), - (uint32) (stop_backup_result.lsn)); - } - } - /* stop lsn is aligned to xlog block size, just find next lsn */ - else if (stop_backup_result.lsn % XLOG_BLCKSZ == 0) - { - /* Wait for segment with current stop_lsn */ - wait_wal_lsn(instanceState, stop_backup_result.lsn, false, backup->tli, - false, true, ERROR, stream_wal, backup); - - /* Get the next closest record in segment with current stop_lsn */ - lsn_tmp = get_next_record_lsn(xlog_path, segno, backup->tli, - instance_config.xlog_seg_size, - instance_config.archive_timeout, - stop_backup_result.lsn); - - /* sanity */ - if (!XRecOffIsValid(lsn_tmp) || XLogRecPtrIsInvalid(lsn_tmp)) - elog(ERROR, "Failed to get WAL record next to %X/%X", - (uint32) (stop_backup_result.lsn >> 32), - (uint32) (stop_backup_result.lsn)); - } - /* PostgreSQL returned something very illegal as STOP_LSN, error out */ - else - elog(ERROR, "Invalid stop_backup_lsn value %X/%X", - (uint32) (stop_backup_result.lsn >> 32), (uint32) (stop_backup_result.lsn)); - - /* Setting stop_backup_lsn will set stop point for streaming */ - stop_backup_lsn = lsn_tmp; - stop_lsn_exists = true; + join_path_components(stream_xlog_path, backup->database_dir, PG_XLOG_DIR); + xlog_path = stream_xlog_path; } + else + xlog_path = instanceState->instance_wal_subdir_path; - elog(LOG, "stop_lsn: %X/%X", - (uint32) (stop_backup_result.lsn >> 32), (uint32) (stop_backup_result.lsn)); + wait_wal_and_calculate_stop_lsn(xlog_path, stop_backup_result.lsn, backup); /* Write backup_label and tablespace_map */ if (!exclusive_backup) { - char path[MAXPGPATH]; - Assert(stop_backup_result.backup_label_content != NULL); - snprintf(path, lengthof(path), "%s/%s/%s", instanceState->instance_backup_subdir_path, - base36enc(backup->start_time), DATABASE_DIR); /* Write backup_label */ - pg_stop_backup_write_file_helper(path, PG_BACKUP_LABEL_FILE, "backup label", + pg_stop_backup_write_file_helper(backup->database_dir, PG_BACKUP_LABEL_FILE, "backup label", stop_backup_result.backup_label_content, stop_backup_result.backup_label_content_len, backup_files_list); free(stop_backup_result.backup_label_content); @@ -1948,7 +1923,7 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb /* Write tablespace_map */ if (stop_backup_result.tablespace_map_content != NULL) { - pg_stop_backup_write_file_helper(path, PG_TABLESPACE_MAP_FILE, "tablespace map", + pg_stop_backup_write_file_helper(backup->database_dir, PG_TABLESPACE_MAP_FILE, "tablespace map", stop_backup_result.tablespace_map_content, stop_backup_result.tablespace_map_content_len, backup_files_list); free(stop_backup_result.tablespace_map_content); @@ -1957,32 +1932,14 @@ pg_stop_backup(InstanceState *instanceState, pgBackup *backup, PGconn *pg_startb } } - /* - * Wait for stop_lsn to be archived or streamed. - * If replica returned valid STOP_LSN of not actually existing record, - * look for previous record with endpoint >= STOP_LSN. - */ - if (!stop_lsn_exists) - stop_backup_lsn = wait_wal_lsn(instanceState, stop_backup_result.lsn, false, backup->tli, - false, false, ERROR, stream_wal, backup); - - if (stream_wal) + if (backup->stream) { /* This function will also add list of xlog files * to the passed filelist */ if(wait_WAL_streaming_end(backup_files_list)) elog(ERROR, "WAL streaming failed"); - - snprintf(stream_xlog_path, lengthof(stream_xlog_path), "%s/%s/%s/%s", - instanceState->instance_backup_subdir_path, base36enc(backup->start_time), - DATABASE_DIR, PG_XLOG_DIR); - - xlog_path = stream_xlog_path; } - else - xlog_path = instanceState->instance_wal_subdir_path; - backup->stop_lsn = stop_backup_lsn; backup->recovery_xid = stop_backup_result.snapshot_xid; elog(LOG, "Getting the Recovery Time from WAL"); diff --git a/src/checkdb.c b/src/checkdb.c index 4ea1d0800..5d7d6652b 100644 --- a/src/checkdb.c +++ b/src/checkdb.c @@ -455,7 +455,6 @@ get_index_list(const char *dbname, bool first_db_with_amcheck, ind->heapallindexed_is_supported = heapallindexed_is_supported; ind->amcheck_nspname = pgut_malloc(strlen(amcheck_nspname) + 1); strcpy(ind->amcheck_nspname, amcheck_nspname); - pg_atomic_clear_flag(&ind->lock); if (index_list == NULL) index_list = parray_new(); @@ -463,6 +462,8 @@ get_index_list(const char *dbname, bool first_db_with_amcheck, parray_append(index_list, ind); } + pfilearray_clear_locks(index_list); + PQclear(res); return index_list; diff --git a/src/dir.c b/src/dir.c index 86848d8d5..dfcddd7d0 100644 --- a/src/dir.c +++ b/src/dir.c @@ -222,6 +222,8 @@ pgFileInit(const char *rel_path) /* Number of blocks backed up during backup */ file->n_headers = 0; + // May be add? + // pg_atomic_clear_flag(file->lock); return file; } @@ -1859,3 +1861,17 @@ cleanup_tablespace(const char *path) parray_walk(files, pgFileFree); parray_free(files); } + +/* + * Clear the synchronisation locks in a parray of (pgFile *)'s + */ +void +pfilearray_clear_locks(parray *file_list) +{ + int i; + for (i = 0; i < parray_num(file_list); i++) + { + pgFile *file = (pgFile *) parray_get(file_list, i); + pg_atomic_clear_flag(&file->lock); + } +} diff --git a/src/pg_probackup.c b/src/pg_probackup.c index 1b2e7f751..3150900b6 100644 --- a/src/pg_probackup.c +++ b/src/pg_probackup.c @@ -68,8 +68,6 @@ static char *backup_path = NULL; static CatalogState *catalogState = NULL; /* ================ catalogState (END) =========== */ -/* colon separated external directories list ("/path1:/path2") */ -char *externaldir = NULL; /* common options */ int num_threads = 1; bool stream_wal = false; diff --git a/src/pg_probackup.h b/src/pg_probackup.h index d02bbb033..a7979ed27 100644 --- a/src/pg_probackup.h +++ b/src/pg_probackup.h @@ -600,7 +600,6 @@ typedef struct int ret; } backup_files_arg; - typedef struct timelineInfo timelineInfo; /* struct to collect info about timelines in WAL archive */ @@ -679,10 +678,11 @@ typedef struct BackupPageHeader2 uint16 checksum; } BackupPageHeader2; -typedef struct StopBackupCallbackState { +typedef struct StopBackupCallbackParams +{ PGconn *conn; int server_version; -} StopBackupCallbackState; +} StopBackupCallbackParams; /* Special value for compressed_size field */ #define PageIsOk 0 @@ -1061,6 +1061,7 @@ extern int pgFileCompareRelPathWithExternalDesc(const void *f1, const void *f2); extern int pgFileCompareLinked(const void *f1, const void *f2); extern int pgFileCompareSize(const void *f1, const void *f2); extern int pgCompareOid(const void *f1, const void *f2); +extern void pfilearray_clear_locks(parray *file_list); /* in data.c */ extern bool check_data_file(ConnectionArgs *arguments, pgFile *file, @@ -1259,4 +1260,42 @@ extern void start_WAL_streaming(PGconn *backup_conn, char *stream_dst_path, ConnectionOptions *conn_opt, XLogRecPtr startpos, TimeLineID starttli); extern int wait_WAL_streaming_end(parray *backup_files_list); + +/* external variables and functions, implemented in backup.c */ +typedef struct PGStopBackupResult +{ + /* + * We will use values of snapshot_xid and invocation_time if there are + * no transactions between start_lsn and stop_lsn. + */ + TransactionId snapshot_xid; + time_t invocation_time; + /* + * Fields that store pg_catalog.pg_stop_backup() result + */ + XLogRecPtr lsn; + size_t backup_label_content_len; + char *backup_label_content; + size_t tablespace_map_content_len; + char *tablespace_map_content; +} PGStopBackupResult; + +extern bool backup_in_progress; +extern parray *backup_files_list; + +extern void pg_start_backup(const char *label, bool smooth, pgBackup *backup, + PGNodeInfo *nodeInfo, PGconn *conn); +extern void pg_silent_client_messages(PGconn *conn); +extern void pg_create_restore_point(PGconn *conn, time_t backup_start_time); +extern void pg_stop_backup_send(PGconn *conn, int server_version, bool is_started_on_replica, bool is_exclusive, char **query_text); +extern void pg_stop_backup_consume(PGconn *conn, int server_version, + bool is_exclusive, uint32 timeout, const char *query_text, + PGStopBackupResult *result); +extern void pg_stop_backup_write_file_helper(const char *path, const char *filename, const char *error_msg_filename, + const void *data, size_t len, parray *file_list); +extern XLogRecPtr wait_wal_lsn(const char *wal_segment_dir, XLogRecPtr lsn, bool is_start_lsn, TimeLineID tli, + bool in_prev_segment, bool segment_only, + int timeout_elevel, bool in_stream_dir); +extern void wait_wal_and_calculate_stop_lsn(const char *xlog_path, XLogRecPtr stop_lsn, pgBackup *backup); + #endif /* PG_PROBACKUP_H */ diff --git a/src/restore.c b/src/restore.c index b7071234a..4cef60005 100644 --- a/src/restore.c +++ b/src/restore.c @@ -824,7 +824,7 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain, } /* - * Setup directory structure for external directories and file locks + * Setup directory structure for external directories */ for (i = 0; i < parray_num(dest_files); i++) { @@ -848,11 +848,11 @@ restore_chain(pgBackup *dest_backup, parray *parent_chain, elog(VERBOSE, "Create external directory \"%s\"", dirpath); fio_mkdir(dirpath, file->mode, FIO_DB_HOST); } - - /* setup threads */ - pg_atomic_clear_flag(&file->lock); } + /* setup threads */ + pfilearray_clear_locks(dest_files); + /* Get list of files in destination directory and remove redundant files */ if (params->incremental_mode != INCR_NONE || cleanup_pgdata) { diff --git a/src/utils/parray.c b/src/utils/parray.c index 31148ee9a..95b83365d 100644 --- a/src/utils/parray.c +++ b/src/utils/parray.c @@ -175,7 +175,7 @@ parray_rm(parray *array, const void *key, int(*compare)(const void *, const void size_t parray_num(const parray *array) { - return array->used; + return array!= NULL ? array->used : (size_t) 0; } void diff --git a/src/validate.c b/src/validate.c index f000698d0..4044ac158 100644 --- a/src/validate.c +++ b/src/validate.c @@ -130,11 +130,7 @@ pgBackupValidate(pgBackup *backup, pgRestoreParams *params) // params->partial_restore_type); /* setup threads */ - for (i = 0; i < parray_num(files); i++) - { - pgFile *file = (pgFile *) parray_get(files, i); - pg_atomic_clear_flag(&file->lock); - } + pfilearray_clear_locks(files); /* init thread args with own file lists */ threads = (pthread_t *) palloc(sizeof(pthread_t) * num_threads);