Skip to content

[PBCKP-146] truncate cfm files #550

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Nov 2, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions src/archive.c
Original file line number Diff line number Diff line change
Expand Up @@ -1375,11 +1375,11 @@ get_wal_file(const char *filename, const char *from_fullpath,
#ifdef HAVE_LIBZ
/* If requested file is regular WAL segment, then try to open it with '.gz' suffix... */
if (IsXLogFileName(filename))
rc = fio_send_file_gz(from_fullpath_gz, to_fullpath, out, &errmsg);
rc = fio_send_file_gz(from_fullpath_gz, out, &errmsg);
if (rc == FILE_MISSING)
#endif
/* ... failing that, use uncompressed */
rc = fio_send_file(from_fullpath, to_fullpath, out, NULL, &errmsg);
rc = fio_send_file(from_fullpath, out, false, NULL, &errmsg);

/* When not in prefetch mode, try to use partial file */
if (rc == FILE_MISSING && !prefetch_mode && IsXLogFileName(filename))
Expand All @@ -1389,13 +1389,13 @@ get_wal_file(const char *filename, const char *from_fullpath,
#ifdef HAVE_LIBZ
/* '.gz.partial' goes first ... */
snprintf(from_partial, sizeof(from_partial), "%s.gz.partial", from_fullpath);
rc = fio_send_file_gz(from_partial, to_fullpath, out, &errmsg);
rc = fio_send_file_gz(from_partial, out, &errmsg);
if (rc == FILE_MISSING)
#endif
{
/* ... failing that, use '.partial' */
snprintf(from_partial, sizeof(from_partial), "%s.partial", from_fullpath);
rc = fio_send_file(from_partial, to_fullpath, out, NULL, &errmsg);
rc = fio_send_file(from_partial, out, false, NULL, &errmsg);
}

if (rc == SEND_OK)
Expand Down
11 changes: 11 additions & 0 deletions src/catalog.c
Original file line number Diff line number Diff line change
Expand Up @@ -1069,6 +1069,7 @@ get_backup_filelist(pgBackup *backup, bool strict)
char linked[MAXPGPATH];
char compress_alg_string[MAXPGPATH];
int64 write_size,
uncompressed_size,
mode, /* bit length of mode_t depends on platforms */
is_datafile,
is_cfs,
Expand Down Expand Up @@ -1132,6 +1133,11 @@ get_backup_filelist(pgBackup *backup, bool strict)
if (get_control_value_int64(buf, "hdr_size", &hdr_size, false))
file->hdr_size = (int) hdr_size;

if (get_control_value_int64(buf, "full_size", &uncompressed_size, false))
file->uncompressed_size = uncompressed_size;
else
file->uncompressed_size = write_size;

if (file->external_dir_num == 0)
set_forkname(file);

Expand Down Expand Up @@ -2561,6 +2567,11 @@ write_backup_filelist(pgBackup *backup, parray *files, const char *root,
file->external_dir_num,
file->dbOid);

if (file->uncompressed_size != 0 &&
file->uncompressed_size != file->write_size)
len += sprintf(line+len, ",\"full_size\":\"" INT64_FORMAT "\"",
file->uncompressed_size);

if (file->is_datafile)
len += sprintf(line+len, ",\"segno\":\"%d\"", file->segno);

Expand Down
127 changes: 36 additions & 91 deletions src/data.c
Original file line number Diff line number Diff line change
Expand Up @@ -799,6 +799,7 @@ backup_non_data_file(pgFile *file, pgFile *prev_file,
* and its mtime is less than parent backup start time ... */
if ((pg_strcasecmp(file->name, RELMAPPER_FILENAME) != 0) &&
(prev_file && file->exists_in_prev &&
file->size == prev_file->size &&
file->mtime <= parent_backup_time))
{
/*
Expand Down Expand Up @@ -1330,7 +1331,12 @@ restore_non_data_file(parray *parent_chain, pgBackup *dest_backup,
if (already_exists)
{
/* compare checksums of already existing file and backup file */
pg_crc32 file_crc = fio_get_crc32(to_fullpath, FIO_DB_HOST, false, false);
pg_crc32 file_crc;
if (tmp_file->forkName == cfm &&
tmp_file->uncompressed_size > tmp_file->write_size)
file_crc = fio_get_crc32_truncated(to_fullpath, FIO_DB_HOST);
else
file_crc = fio_get_crc32(to_fullpath, FIO_DB_HOST, false, false);

if (file_crc == tmp_file->crc)
{
Expand Down Expand Up @@ -1387,10 +1393,12 @@ backup_non_data_file_internal(const char *from_fullpath,
const char *to_fullpath, pgFile *file,
bool missing_ok)
{
FILE *in = NULL;
FILE *out = NULL;
ssize_t read_len = 0;
char *buf = NULL;
char *errmsg = NULL;
int rc;
bool cut_zero_tail;

cut_zero_tail = file->forkName == cfm;

INIT_FILE_CRC32(true, file->crc);

Expand All @@ -1412,107 +1420,44 @@ backup_non_data_file_internal(const char *from_fullpath,

/* backup remote file */
if (fio_is_remote(FIO_DB_HOST))
{
char *errmsg = NULL;
int rc = fio_send_file(from_fullpath, to_fullpath, out, file, &errmsg);
rc = fio_send_file(from_fullpath, out, cut_zero_tail, file, &errmsg);
else
rc = fio_send_file_local(from_fullpath, out, cut_zero_tail, file, &errmsg);

/* handle errors */
if (rc == FILE_MISSING)
{
/* maybe deleted, it's not error in case of backup */
if (missing_ok)
{
elog(LOG, "File \"%s\" is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else
elog(ERROR, "File \"%s\" is not found", from_fullpath);
}
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, strerror(errno));
else if (rc != SEND_OK)
/* handle errors */
if (rc == FILE_MISSING)
{
/* maybe deleted, it's not error in case of backup */
if (missing_ok)
{
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot access remote file \"%s\"", from_fullpath);
elog(LOG, "File \"%s\" is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}

pg_free(errmsg);
else
elog(ERROR, "File \"%s\" is not found", from_fullpath);
}
/* backup local file */
else
else if (rc == WRITE_FAILED)
elog(ERROR, "Cannot write to \"%s\": %s", to_fullpath, strerror(errno));
else if (rc != SEND_OK)
{
/* open source file for read */
in = fopen(from_fullpath, PG_BINARY_R);
if (in == NULL)
{
/* maybe deleted, it's not error in case of backup */
if (errno == ENOENT)
{
if (missing_ok)
{
elog(LOG, "File \"%s\" is not found", from_fullpath);
file->write_size = FILE_NOT_FOUND;
goto cleanup;
}
else
elog(ERROR, "File \"%s\" is not found", from_fullpath);
}

elog(ERROR, "Cannot open file \"%s\": %s", from_fullpath,
strerror(errno));
}

/* disable stdio buffering for local input/output files to avoid triple buffering */
setvbuf(in, NULL, _IONBF, BUFSIZ);
setvbuf(out, NULL, _IONBF, BUFSIZ);

/* allocate 64kB buffer */
buf = pgut_malloc(CHUNK_SIZE);

/* copy content and calc CRC */
for (;;)
{
read_len = fread(buf, 1, CHUNK_SIZE, in);

if (ferror(in))
elog(ERROR, "Cannot read from file \"%s\": %s",
from_fullpath, strerror(errno));

if (read_len > 0)
{
if (fwrite(buf, 1, read_len, out) != read_len)
elog(ERROR, "Cannot write to file \"%s\": %s", to_fullpath,
strerror(errno));

/* update CRC */
COMP_FILE_CRC32(true, file->crc, buf, read_len);
file->read_size += read_len;
}

if (feof(in))
break;
}
if (errmsg)
elog(ERROR, "%s", errmsg);
else
elog(ERROR, "Cannot access remote file \"%s\"", from_fullpath);
}

file->write_size = (int64) file->read_size;

if (file->write_size > 0)
file->uncompressed_size = file->write_size;
file->uncompressed_size = file->read_size;

cleanup:
if (errmsg != NULL)
pg_free(errmsg);

/* finish CRC calculation and store into pgFile */
FIN_FILE_CRC32(true, file->crc);

if (in && fclose(in))
elog(ERROR, "Cannot close the file \"%s\": %s", from_fullpath, strerror(errno));

if (out && fclose(out))
elog(ERROR, "Cannot close the file \"%s\": %s", to_fullpath, strerror(errno));

pg_free(buf);
}

/*
Expand Down
133 changes: 1 addition & 132 deletions src/dir.c
Original file line number Diff line number Diff line change
Expand Up @@ -262,137 +262,6 @@ pgFileDelete(mode_t mode, const char *full_path)
}
}

/*
* Read the local file to compute its CRC.
* We cannot make decision about file decompression because
* user may ask to backup already compressed files and we should be
* obvious about it.
*/
pg_crc32
pgFileGetCRC(const char *file_path, bool use_crc32c, bool missing_ok)
{
FILE *fp;
pg_crc32 crc = 0;
char *buf;
size_t len = 0;

INIT_FILE_CRC32(use_crc32c, crc);

/* open file in binary read mode */
fp = fopen(file_path, PG_BINARY_R);
if (fp == NULL)
{
if (errno == ENOENT)
{
if (missing_ok)
{
FIN_FILE_CRC32(use_crc32c, crc);
return crc;
}
}

elog(ERROR, "Cannot open file \"%s\": %s",
file_path, strerror(errno));
}

/* disable stdio buffering */
setvbuf(fp, NULL, _IONBF, BUFSIZ);
buf = pgut_malloc(STDIO_BUFSIZE);

/* calc CRC of file */
for (;;)
{
if (interrupted)
elog(ERROR, "interrupted during CRC calculation");

len = fread(buf, 1, STDIO_BUFSIZE, fp);

if (ferror(fp))
elog(ERROR, "Cannot read \"%s\": %s", file_path, strerror(errno));

/* update CRC */
COMP_FILE_CRC32(use_crc32c, crc, buf, len);

if (feof(fp))
break;
}

FIN_FILE_CRC32(use_crc32c, crc);
fclose(fp);
pg_free(buf);

return crc;
}

/*
* Read the local file to compute its CRC.
* We cannot make decision about file decompression because
* user may ask to backup already compressed files and we should be
* obvious about it.
*/
pg_crc32
pgFileGetCRCgz(const char *file_path, bool use_crc32c, bool missing_ok)
{
gzFile fp;
pg_crc32 crc = 0;
int len = 0;
int err;
char *buf;

INIT_FILE_CRC32(use_crc32c, crc);

/* open file in binary read mode */
fp = gzopen(file_path, PG_BINARY_R);
if (fp == NULL)
{
if (errno == ENOENT)
{
if (missing_ok)
{
FIN_FILE_CRC32(use_crc32c, crc);
return crc;
}
}

elog(ERROR, "Cannot open file \"%s\": %s",
file_path, strerror(errno));
}

buf = pgut_malloc(STDIO_BUFSIZE);

/* calc CRC of file */
for (;;)
{
if (interrupted)
elog(ERROR, "interrupted during CRC calculation");

len = gzread(fp, buf, STDIO_BUFSIZE);

if (len <= 0)
{
/* we either run into eof or error */
if (gzeof(fp))
break;
else
{
const char *err_str = NULL;

err_str = gzerror(fp, &err);
elog(ERROR, "Cannot read from compressed file %s", err_str);
}
}

/* update CRC */
COMP_FILE_CRC32(use_crc32c, crc, buf, len);
}

FIN_FILE_CRC32(use_crc32c, crc);
gzclose(fp);
pg_free(buf);

return crc;
}

void
pgFileFree(void *file)
{
Expand Down Expand Up @@ -1812,7 +1681,7 @@ write_database_map(pgBackup *backup, parray *database_map, parray *backup_files_
FIO_BACKUP_HOST);
file->crc = pgFileGetCRC(database_map_path, true, false);
file->write_size = file->size;
file->uncompressed_size = file->read_size;
file->uncompressed_size = file->size;

parray_append(backup_files_list, file);
}
Expand Down
2 changes: 1 addition & 1 deletion src/merge.c
Original file line number Diff line number Diff line change
Expand Up @@ -1078,7 +1078,7 @@ merge_files(void *arg)
tmp_file->hdr_crc = file->hdr_crc;
}
else
tmp_file->uncompressed_size = tmp_file->write_size;
tmp_file->uncompressed_size = tmp_file->uncompressed_size;

/* Copy header metadata from old map into a new one */
tmp_file->n_headers = file->n_headers;
Expand Down
Loading