Skip to content

Commit 405a017

Browse files
committed
[Issue #240] WIP
1 parent b16555a commit 405a017

File tree

5 files changed

+221
-18
lines changed

5 files changed

+221
-18
lines changed

src/archive.c

+2-1
Original file line numberDiff line numberDiff line change
@@ -579,7 +579,7 @@ push_file_internal_uncompressed(const char *wal_file_name, const char *pg_xlog_d
579579
from_fullpath, strerror(errno));
580580
}
581581

582-
if (read_len > 0 && fio_write(out, buf, read_len) != read_len)
582+
if (read_len > 0 && fio_write_async(out, buf, read_len) != read_len)
583583
{
584584
fio_unlink(to_fullpath_part, FIO_BACKUP_HOST);
585585
elog(ERROR, "Cannot write to destination temp file \"%s\": %s",
@@ -804,6 +804,7 @@ push_file_internal_gz(const char *wal_file_name, const char *pg_xlog_dir,
804804
}
805805

806806
/* copy content */
807+
/* TODO: move to separate function */
807808
for (;;)
808809
{
809810
size_t read_len = 0;

src/data.c

+5-4
Original file line numberDiff line numberDiff line change
@@ -1084,21 +1084,22 @@ restore_data_file_internal(FILE *in, FILE *out, pgFile *file, uint32 backup_vers
10841084
cur_pos_out = write_pos;
10851085
}
10861086

1087-
/* If page is compressed and restore is in remote mode, send compressed
1088-
* page to the remote side.
1087+
/*
1088+
* If page is compressed and restore is in remote mode,
1089+
* send compressed page to the remote side.
10891090
*/
10901091
if (is_compressed)
10911092
{
10921093
ssize_t rc;
1093-
rc = fio_fwrite_compressed(out, page.data, compressed_size, file->compress_alg);
1094+
rc = fio_fwrite_async_compressed(out, page.data, compressed_size, file->compress_alg);
10941095

10951096
if (!fio_is_remote_file(out) && rc != BLCKSZ)
10961097
elog(ERROR, "Cannot write block %u of \"%s\": %s, size: %u",
10971098
blknum, to_fullpath, strerror(errno), compressed_size);
10981099
}
10991100
else
11001101
{
1101-
if (fio_fwrite(out, page.data, BLCKSZ) != BLCKSZ)
1102+
if (fio_fwrite_async(out, page.data, BLCKSZ) != BLCKSZ)
11021103
elog(ERROR, "Cannot write block %u of \"%s\": %s",
11031104
blknum, to_fullpath, strerror(errno));
11041105
}

src/restore.c

+3
Original file line numberDiff line numberDiff line change
@@ -1190,6 +1190,9 @@ restore_files(void *arg)
11901190
elog(ERROR, "Cannot close file \"%s\": %s", to_fullpath,
11911191
strerror(errno));
11921192

1193+
/* Writing is asynchronous in case of restore in remote mode, so check the agent status */
1194+
fio_check_async_error(out, to_fullpath));
1195+
11931196
/* free pagemap used for restore optimization */
11941197
pg_free(dest_file->pagemap.bitmap);
11951198

src/utils/file.c

+204-11
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ static __thread void* fio_stdin_buffer;
1414
static __thread int fio_stdout = 0;
1515
static __thread int fio_stdin = 0;
1616
static __thread int fio_stderr = 0;
17+
static char *async_errormsg = NULL;
1718

1819
fio_location MyLocation;
1920

@@ -420,6 +421,7 @@ int fio_open(char const* path, int mode, fio_location location)
420421
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
421422
IO_CHECK(fio_write_all(fio_stdout, path, hdr.size), hdr.size);
422423

424+
/* check results */
423425
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
424426

425427
if (hdr.arg != 0)
@@ -653,6 +655,7 @@ int fio_fseek(FILE* f, off_t offs)
653655
}
654656

655657
/* Set position in file */
658+
/* TODO: make it synchronous or check async error */
656659
int fio_seek(int fd, off_t offs)
657660
{
658661
if (fio_is_remote_fd(fd))
@@ -676,20 +679,81 @@ int fio_seek(int fd, off_t offs)
676679

677680
/* Write data to stdio file */
678681
size_t fio_fwrite(FILE* f, void const* buf, size_t size)
682+
{
683+
if (fio_is_remote_file(f))
684+
return fio_write(fio_fileno(f), buf, size)
685+
else
686+
return fwrite(buf, 1, size, f);
687+
}
688+
689+
/* Write data to the file synchronously */
690+
ssize_t fio_write(int fd, void const* buf, size_t size)
691+
{
692+
if (fio_is_remote_fd(fd))
693+
{
694+
fio_header hdr;
695+
696+
hdr.cop = FIO_WRITE;
697+
hdr.handle = fd & ~FIO_PIPE_MARKER;
698+
hdr.size = size;
699+
700+
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
701+
IO_CHECK(fio_write_all(fio_stdout, buf, size), size);
702+
703+
/* check results */
704+
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
705+
706+
/* set errno */
707+
if (hdr.arg > 0)
708+
{
709+
errno = hdr.arg;
710+
return -1;
711+
}
712+
713+
return size;
714+
}
715+
else
716+
{
717+
return write(fd, buf, size);
718+
}
719+
}
720+
721+
static void
722+
fio_write_impl(int fd, void const* buf, size_t size, int out)
723+
{
724+
int rc;
725+
fio_header hdr;
726+
727+
rc = write(fd, buf, size);
728+
729+
hdr.arg = 0;
730+
hdr.size = 0;
731+
732+
if (rc < 0)
733+
hdr.arg = errno;
734+
735+
/* send header */
736+
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
737+
738+
return;
739+
}
740+
741+
size_t fio_fwrite_async(FILE* f, void const* buf, size_t size)
679742
{
680743
return fio_is_remote_file(f)
681-
? fio_write(fio_fileno(f), buf, size)
744+
? fio_write_async(fio_fileno(f), buf, size)
682745
: fwrite(buf, 1, size, f);
683746
}
684747

685748
/* Write data to the file */
686-
ssize_t fio_write(int fd, void const* buf, size_t size)
749+
/* TODO: support async report error */
750+
ssize_t fio_write_async(int fd, void const* buf, size_t size)
687751
{
688752
if (fio_is_remote_fd(fd))
689753
{
690754
fio_header hdr;
691755

692-
hdr.cop = FIO_WRITE;
756+
hdr.cop = FIO_WRITE_ASYNC;
693757
hdr.handle = fd & ~FIO_PIPE_MARKER;
694758
hdr.size = size;
695759

@@ -704,6 +768,25 @@ ssize_t fio_write(int fd, void const* buf, size_t size)
704768
}
705769
}
706770

771+
static void
772+
fio_write_async_impl(int fd, void const* buf, size_t size, int out)
773+
{
774+
int rc;
775+
776+
/* Quick exit for tainted agent */
777+
if (async_errormsg)
778+
return;
779+
780+
rc = write(fd, buf, size);
781+
782+
if (rc <= 0)
783+
{
784+
async_errormsg = pgut_malloc(ERRMSG_MAX_LEN);
785+
snprintf(async_errormsg, ERRMSG_MAX_LEN, "%s", strerror(errno));
786+
}
787+
return;
788+
}
789+
707790
int32
708791
fio_decompress(void* dst, void const* src, size_t size, int compress_alg)
709792
{
@@ -727,14 +810,40 @@ fio_decompress(void* dst, void const* src, size_t size, int compress_alg)
727810
return uncompressed_size;
728811
}
729812

813+
int32
814+
fio_decompress_new(void* dst, void const* src, size_t size, int compress_alg, char **errormsg)
815+
{
816+
const char *internal_errormsg = NULL;
817+
int32 uncompressed_size = do_decompress(dst, BLCKSZ,
818+
src,
819+
size,
820+
compress_alg, &internal_errormsg);
821+
if (uncompressed_size < 0 && internal_errormsg != NULL)
822+
{
823+
elog(WARNING, "An error occured during decompressing block: %s", internal_errormsg);
824+
return -1;
825+
}
826+
827+
// async_errormsg = pgut_malloc(ERRMSG_MAX_LEN);
828+
// snprintf(async_errormsg, ERRMSG_MAX_LEN, "%s", strerror(errno));
829+
830+
if (uncompressed_size != BLCKSZ)
831+
{
832+
elog(ERROR, "Page uncompressed to %d bytes != BLCKSZ",
833+
uncompressed_size);
834+
return -1;
835+
}
836+
return uncompressed_size;
837+
}
838+
730839
/* Write data to the file */
731-
ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg)
840+
ssize_t fio_fwrite_async_compressed(FILE* f, void const* buf, size_t size, int compress_alg)
732841
{
733842
if (fio_is_remote_file(f))
734843
{
735844
fio_header hdr;
736845

737-
hdr.cop = FIO_WRITE_COMPRESSED;
846+
hdr.cop = FIO_WRITE_COMPRESSED_ASYNC;
738847
hdr.handle = fio_fileno(f) & ~FIO_PIPE_MARKER;
739848
hdr.size = size;
740849
hdr.arg = compress_alg;
@@ -755,12 +864,88 @@ ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compres
755864
}
756865
}
757866

758-
static ssize_t
867+
static void
759868
fio_write_compressed_impl(int fd, void const* buf, size_t size, int compress_alg)
760869
{
870+
int rc;
871+
int32 uncompressed_size;
761872
char uncompressed_buf[BLCKSZ];
762-
int32 uncompressed_size = fio_decompress(uncompressed_buf, buf, size, compress_alg);
763-
return fio_write_all(fd, uncompressed_buf, uncompressed_size);
873+
char decompress_msg[128];
874+
875+
/* If the previous command already have failed,
876+
* then there is no point in bashing a head against the wall
877+
*/
878+
if (async_errormsg)
879+
return;
880+
881+
/* TODO: We cannot allow error out in fio_decompress */
882+
uncompressed_size = fio_decompress(uncompressed_buf, buf, size, compress_alg);
883+
884+
rc = write(fd, uncompressed_buf, uncompressed_size);
885+
886+
if (rc <= 0)
887+
{
888+
async_errormsg = pgut_malloc(ERRMSG_MAX_LEN);
889+
snprintf(async_errormsg, ERRMSG_MAX_LEN, "%s", strerror(errno));
890+
}
891+
return;
892+
}
893+
894+
/* check if remote agent encountered any error during execution of async operations */
895+
void
896+
fio_check_async_error(FILE* f, const char *filepath)
897+
{
898+
if (fio_is_remote_file(f))
899+
{
900+
char *errmsg;
901+
fio_header hdr;
902+
903+
hdr.cop = FIO_GET_ASYNC_ERROR;
904+
hdr.size = 0;
905+
906+
IO_CHECK(fio_write_all(fio_stdout, &hdr, sizeof(hdr)), sizeof(hdr));
907+
908+
/* check results */
909+
IO_CHECK(fio_read_all(fio_stdin, &hdr, sizeof(hdr)), sizeof(hdr));
910+
911+
if (hdr.size > 0)
912+
{
913+
*errmsg = pgut_malloc(ERRMSG_MAX_LEN);
914+
IO_CHECK(fio_read_all(fio_stdin, *errmsg, hdr.size), hdr.size);
915+
elog(ERROR, "Cannot write to the file \"%s\": %s", filepath, errmsg);
916+
}
917+
}
918+
}
919+
920+
static void
921+
fio_get_async_error_impl(int out)
922+
{
923+
fio_header hdr;
924+
925+
hdr.cop = FIO_GET_ASYNC_ERROR;
926+
927+
/* send error message */
928+
if (async_errormsg)
929+
{
930+
hdr.size = strlen(async_errormsg) + 1;
931+
932+
/* send header */
933+
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
934+
935+
/* send message itself */
936+
IO_CHECK(fio_write_all(out, async_errormsg, hdr.size), hdr.size);
937+
938+
pg_free(async_errormsg);
939+
async_errormsg = NULL;
940+
}
941+
else
942+
{
943+
hdr.size = 0;
944+
/* send header */
945+
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
946+
}
947+
948+
return;
764949
}
765950

766951
/* Read data from stdio file */
@@ -2597,10 +2782,15 @@ void fio_communicate(int in, int out)
25972782
SYS_CHECK(close(fd[hdr.handle]));
25982783
break;
25992784
case FIO_WRITE: /* Write to the current position in file */
2600-
IO_CHECK(fio_write_all(fd[hdr.handle], buf, hdr.size), hdr.size);
2785+
// IO_CHECK(fio_write_all(fd[hdr.handle], buf, hdr.size), hdr.size);
2786+
fio_write_impl(fd[hdr.handle], buf, hdr.size, out);
26012787
break;
2602-
case FIO_WRITE_COMPRESSED: /* Write to the current position in file */
2603-
IO_CHECK(fio_write_compressed_impl(fd[hdr.handle], buf, hdr.size, hdr.arg), BLCKSZ);
2788+
case FIO_WRITE_ASYNC: /* Write to the current position in file */
2789+
fio_write_impl(fd[hdr.handle], buf, hdr.size, out);
2790+
break;
2791+
case FIO_WRITE_COMPRESSED_ASYNC: /* Write to the current position in file */
2792+
//IO_CHECK(fio_write_compressed_impl(fd[hdr.handle], buf, hdr.size, hdr.arg), BLCKSZ);
2793+
fio_write_compressed_impl(fd[hdr.handle], buf, hdr.size, hdr.arg);
26042794
break;
26052795
case FIO_READ: /* Read from the current position in file */
26062796
if ((size_t)hdr.arg > buf_size) {
@@ -2716,6 +2906,9 @@ void fio_communicate(int in, int out)
27162906
hdr.cop = FIO_DISCONNECTED;
27172907
IO_CHECK(fio_write_all(out, &hdr, sizeof(hdr)), sizeof(hdr));
27182908
break;
2909+
case FIO_GET_ASYNC_ERROR:
2910+
fio_get_async_error_impl(out);
2911+
break;
27192912
default:
27202913
Assert(false);
27212914
}

src/utils/file.h

+7-2
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ typedef enum
3636
FIO_READDIR,
3737
FIO_CLOSEDIR,
3838
FIO_PAGE,
39-
FIO_WRITE_COMPRESSED,
39+
FIO_WRITE_COMPRESSED_ASYNC,
4040
FIO_GET_CRC32,
4141
/* used for incremental restore */
4242
FIO_GET_CHECKSUM_MAP,
@@ -53,7 +53,9 @@ typedef enum
5353
FIO_DISCONNECT,
5454
FIO_DISCONNECTED,
5555
FIO_LIST_DIR,
56-
FIO_CHECK_POSTMASTER
56+
FIO_CHECK_POSTMASTER,
57+
FIO_GET_ASYNC_ERROR,
58+
FIO_WRITE_ASYNC
5759
} fio_operations;
5860

5961
typedef enum
@@ -92,6 +94,8 @@ extern int fio_get_agent_version(void);
9294
extern FILE* fio_fopen(char const* name, char const* mode, fio_location location);
9395
extern size_t fio_fwrite(FILE* f, void const* buf, size_t size);
9496
extern ssize_t fio_fwrite_compressed(FILE* f, void const* buf, size_t size, int compress_alg);
97+
extern size_t fio_fwrite_async(FILE* f, void const* buf, size_t size);
98+
extern void fio_check_async_error(FILE* f, const char *filepath);
9599
extern ssize_t fio_fread(FILE* f, void* buf, size_t size);
96100
extern int fio_pread(FILE* f, void* buf, off_t offs);
97101
extern int fio_fprintf(FILE* f, char const* arg, ...) pg_attribute_printf(2, 3);
@@ -104,6 +108,7 @@ extern void fio_error(int rc, int size, char const* file, int line);
104108

105109
extern int fio_open(char const* name, int mode, fio_location location);
106110
extern ssize_t fio_write(int fd, void const* buf, size_t size);
111+
extern ssize_t fio_write_async(int fd, void const* buf, size_t size);
107112
extern ssize_t fio_read(int fd, void* buf, size_t size);
108113
extern int fio_flush(int fd);
109114
extern int fio_seek(int fd, off_t offs);

0 commit comments

Comments
 (0)