diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile index 682f17413b3a..e73573694b9a 100644 --- a/src/interfaces/libpq-oauth/Makefile +++ b/src/interfaces/libpq-oauth/Makefile @@ -79,5 +79,19 @@ uninstall: rm -f '$(DESTDIR)$(libdir)/$(stlib)' rm -f '$(DESTDIR)$(libdir)/$(shlib)' +.PHONY: all-tests +all-tests: oauth_tests$(X) + +oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq + $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@ + +check: all-tests + $(prove_check) + +installcheck: all-tests + $(prove_installcheck) + clean distclean: clean-lib rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB) + rm -f test-oauth-curl.o oauth_tests$(X) + rm -rf tmp_check diff --git a/src/interfaces/libpq-oauth/meson.build b/src/interfaces/libpq-oauth/meson.build index df064c59a407..505e1671b863 100644 --- a/src/interfaces/libpq-oauth/meson.build +++ b/src/interfaces/libpq-oauth/meson.build @@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name, link_args: export_fmt.format(export_file.full_path()), kwargs: default_lib_args, ) + +libpq_oauth_test_deps = [] + +oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources + +if host_system == 'windows' + oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'oauth_tests', + '--FILEDESC', 'OAuth unit test program',]) +endif + +libpq_oauth_test_deps += executable('oauth_tests', + oauth_test_sources, + dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps], + kwargs: default_bin_args + { + 'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args, + 'c_pch': pch_postgres_fe_h, + 'include_directories': [libpq_inc, postgres_inc], + 'install': false, + } +) + +testprep_targets += libpq_oauth_test_deps + +tests += { + 'name': 'libpq-oauth', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'tap': { + 'tests': [ + 't/001_oauth.pl', + ], + 'deps': libpq_oauth_test_deps, + }, +} diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c index dba9a684fa8a..68303106a5d3 100644 --- a/src/interfaces/libpq-oauth/oauth-curl.c +++ b/src/interfaces/libpq-oauth/oauth-curl.c @@ -278,6 +278,11 @@ struct async_ctx bool user_prompted; /* have we already sent the authz prompt? */ bool used_basic_auth; /* did we send a client secret? */ bool debugging; /* can we give unsafe developer assistance? */ + int dbg_num_calls; /* (debug mode) how many times were we called? */ + +#if defined(HAVE_SYS_EVENT_H) + int nevents; /* how many events are we waiting on? */ +#endif }; /* @@ -1291,41 +1296,95 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx, return 0; #elif defined(HAVE_SYS_EVENT_H) - struct kevent ev[2] = {0}; + struct kevent ev[2]; struct kevent ev_out[2]; struct timespec timeout = {0}; - int nev = 0; + int nev; int res; + /* + * First, any existing registrations for this socket need to be removed, + * both to track the outstanding number of events, and to ensure that + * we're not woken up for things that Curl no longer cares about. + * + * ENOENT is okay, but we have to track how many we get, so use + * EV_RECEIPT. + */ + nev = 0; + EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0); + nev++; + EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0); + nev++; + + Assert(nev <= lengthof(ev)); + Assert(nev <= lengthof(ev_out)); + + res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout); + if (res < 0) + { + actx_error(actx, "could not delete from kqueue: %m"); + return -1; + } + + /* + * We can't use the simple errno version of kevent, because we need to + * skip over ENOENT while still allowing a second change to be processed. + * So we need a longer-form error checking loop. + */ + for (int i = 0; i < res; ++i) + { + /* + * EV_RECEIPT should guarantee one EV_ERROR result for every change, + * whether successful or not. Failed entries contain a non-zero errno + * in the data field. + */ + Assert(ev_out[i].flags & EV_ERROR); + + errno = ev_out[i].data; + if (!errno) + { + /* Successfully removed; update the event count. */ + Assert(actx->nevents > 0); + actx->nevents--; + } + else if (errno != ENOENT) + { + actx_error(actx, "could not delete from kqueue: %m"); + return -1; + } + } + + /* If we're only removing registrations, we're done. */ + if (what == CURL_POLL_REMOVE) + return 0; + + /* + * Now add the new filters. This is more straightfoward than deletion. + * + * Combining this kevent() call with the one above seems like it should be + * theoretically possible, but beware that not all BSDs keep the original + * event flags when using EV_RECEIPT, so it's tricky to figure out which + * operations succeeded. For now we keep the deletions and the additions + * separate. + */ + nev = 0; + switch (what) { case CURL_POLL_IN: - EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0); nev++; break; case CURL_POLL_OUT: - EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0); nev++; break; case CURL_POLL_INOUT: - EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0); nev++; - EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0); - nev++; - break; - - case CURL_POLL_REMOVE: - - /* - * We don't know which of these is currently registered, perhaps - * both, so we try to remove both. This means we need to tolerate - * ENOENT below. - */ - EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0); - nev++; - EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0); nev++; break; @@ -1334,45 +1393,91 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx, return -1; } - res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout); + Assert(nev <= lengthof(ev)); + + res = kevent(actx->mux, ev, nev, NULL, 0, NULL); if (res < 0) { actx_error(actx, "could not modify kqueue: %m"); return -1; } + /* Update the event count, and we're done. */ + actx->nevents += nev; + + return 0; +#else +#error register_socket is not implemented on this platform +#endif +} + +/*------- + * Drains any stale level-triggered events out of the multiplexer. This is + * necessary only if the mux implementation requires it. + * + * As an example, consider the following sequence of events: + * 1. libcurl tries to write data to the send buffer, but it fills up. + * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the + * client to wait. + * 3. The kernel partially drains the send buffer. The socket becomes writable, + * and the client wakes up and calls back into the flow. + * 4. libcurl continues writing data to the send buffer, but it fills up again. + * The socket is no longer writable. + * + * At this point, an epoll-based mux no longer signals readiness, so nothing + * further needs to be done. But a kqueue-based mux will continue to signal + * "ready" until either the EVFILT_WRITE registration is dropped for the socket, + * or the old socket-writable event is read from the queue. Since Curl isn't + * guaranteed to do the former, we must do the latter here. + */ +static bool +drain_socket_events(struct async_ctx *actx) +{ +#if defined(HAVE_SYS_EPOLL_H) + /* The epoll implementation doesn't need to drain pending events. */ + return true; +#elif defined(HAVE_SYS_EVENT_H) + struct timespec timeout = {0}; + struct kevent *drain; + int drain_len; + /* - * We can't use the simple errno version of kevent, because we need to - * skip over ENOENT while still allowing a second change to be processed. - * So we need a longer-form error checking loop. + * Drain the events in one call, rather than looping. (We could maybe call + * kevent() drain_len times, instead of allocating space for the maximum + * number of events, but that relies on the events being in FIFO order to + * avoid starvation. The kqueue man pages don't seem to make any + * guarantees about that.) + * + * register_socket() keeps actx->nevents updated with the number of + * outstanding event filters. We don't track the registration of the + * timer; we just assume one could be registered here. */ - for (int i = 0; i < res; ++i) + drain_len = actx->nevents + 1; + + drain = malloc(sizeof(*drain) * drain_len); + if (!drain) { - /* - * EV_RECEIPT should guarantee one EV_ERROR result for every change, - * whether successful or not. Failed entries contain a non-zero errno - * in the data field. - */ - Assert(ev_out[i].flags & EV_ERROR); + actx_error(actx, "out of memory"); + return false; + } - errno = ev_out[i].data; - if (errno && errno != ENOENT) - { - switch (what) - { - case CURL_POLL_REMOVE: - actx_error(actx, "could not delete from kqueue: %m"); - break; - default: - actx_error(actx, "could not add to kqueue: %m"); - } - return -1; - } + /* + * Discard all pending events. Since our registrations are level-triggered + * (even the timer, since we use a chained kqueue for that instead of an + * EVFILT_TIMER on the top-level mux!), any events that we still need will + * remain signalled, and the stale ones will be swept away. + */ + if (kevent(actx->mux, NULL, 0, drain, drain_len, &timeout) < 0) + { + actx_error(actx, "could not drain kqueue: %m"); + free(drain); + return false; } - return 0; + free(drain); + return true; #else -#error register_socket is not implemented on this platform +#error drain_socket_events is not implemented on this platform #endif } @@ -1441,7 +1546,8 @@ set_timer(struct async_ctx *actx, long timeout) * macOS.) * * If there was no previous timer set, the kevent calls will result in - * ENOENT, which is fine. + * ENOENT, which is fine. (We don't track actx->nevents for this case; + * instead, drain_socket_events() just assumes a timer could be set.) */ EV_SET(&ev, 1, EVFILT_TIMER, EV_DELETE, 0, 0, 0); if (kevent(actx->timerfd, &ev, 1, NULL, 0, NULL) < 0 && errno != ENOENT) @@ -1483,40 +1589,20 @@ set_timer(struct async_ctx *actx, long timeout) /* * Returns 1 if the timeout in the multiplexer set has expired since the last - * call to set_timer(), 0 if the timer is still running, or -1 (with an - * actx_error() report) if the timer cannot be queried. + * call to set_timer(), 0 if the timer is either still running or disarmed, or + * -1 (with an actx_error() report) if the timer cannot be queried. */ static int timer_expired(struct async_ctx *actx) { -#if defined(HAVE_SYS_EPOLL_H) - struct itimerspec spec = {0}; - - if (timerfd_gettime(actx->timerfd, &spec) < 0) - { - actx_error(actx, "getting timerfd value: %m"); - return -1; - } - - /* - * This implementation assumes we're using single-shot timers. If you - * change to using intervals, you'll need to reimplement this function - * too, possibly with the read() or select() interfaces for timerfd. - */ - Assert(spec.it_interval.tv_sec == 0 - && spec.it_interval.tv_nsec == 0); - - /* If the remaining time to expiration is zero, we're done. */ - return (spec.it_value.tv_sec == 0 - && spec.it_value.tv_nsec == 0); -#elif defined(HAVE_SYS_EVENT_H) +#if defined(HAVE_SYS_EPOLL_H) || defined(HAVE_SYS_EVENT_H) int res; - /* Is the timer queue ready? */ + /* Is the timer ready? */ res = PQsocketPoll(actx->timerfd, 1 /* forRead */ , 0, 0); if (res < 0) { - actx_error(actx, "checking kqueue for timeout: %m"); + actx_error(actx, "checking timer expiration: %m"); return -1; } @@ -1548,6 +1634,36 @@ register_timer(CURLM *curlm, long timeout, void *ctx) return 0; } +/* + * Removes any expired-timer event from the multiplexer. If was_expired is not + * NULL, it will contain whether or not the timer was expired at time of call. + */ +static bool +drain_timer_events(struct async_ctx *actx, bool *was_expired) +{ + int res; + + res = timer_expired(actx); + if (res < 0) + return false; + + if (res > 0) + { + /* + * Timer is expired. We could drain the event manually from the + * timerfd, but it's easier to simply disable it; that keeps the + * platform-specific code in set_timer(). + */ + if (!set_timer(actx, -1)) + return false; + } + + if (was_expired) + *was_expired = (res > 0); + + return true; +} + /* * Prints Curl request debugging information to stderr. * @@ -2751,38 +2867,65 @@ pg_fe_run_oauth_flow_impl(PGconn *conn) { PostgresPollingStatusType status; + /* + * Clear any expired timeout before calling back into + * Curl. Curl is not guaranteed to do this for us, because + * its API expects us to use single-shot (i.e. + * edge-triggered) timeouts, and ours are level-triggered + * via the mux. + * + * This can't be combined with the drain_socket_events() + * call below: we might accidentally clear a short timeout + * that was both set and expired during the call to + * drive_request(). + */ + if (!drain_timer_events(actx, NULL)) + goto error_return; + + /* Move the request forward. */ status = drive_request(actx); if (status == PGRES_POLLING_FAILED) goto error_return; - else if (status != PGRES_POLLING_OK) - { - /* not done yet */ - return status; - } + else if (status == PGRES_POLLING_OK) + break; /* done! */ - break; + /* + * This request is still running. + * + * Drain any stale socket events from the mux before we + * ask the client to poll. (Currently, this can occur only + * with kqueue.) If this is forgotten, the multiplexer can + * get stuck in a signalled state and we'll burn CPU + * cycles pointlessly. + */ + if (!drain_socket_events(actx)) + goto error_return; + + return status; } case OAUTH_STEP_WAIT_INTERVAL: - - /* - * The client application is supposed to wait until our timer - * expires before calling PQconnectPoll() again, but that - * might not happen. To avoid sending a token request early, - * check the timer before continuing. - */ - if (!timer_expired(actx)) { - set_conn_altsock(conn, actx->timerfd); - return PGRES_POLLING_READING; - } + bool expired; - /* Disable the expired timer. */ - if (!set_timer(actx, -1)) - goto error_return; + /* + * The client application is supposed to wait until our + * timer expires before calling PQconnectPoll() again, but + * that might not happen. To avoid sending a token request + * early, check the timer before continuing. + */ + if (!drain_timer_events(actx, &expired)) + goto error_return; - break; + if (!expired) + { + set_conn_altsock(conn, actx->timerfd); + return PGRES_POLLING_READING; + } + + break; + } } /* @@ -2932,6 +3075,8 @@ PostgresPollingStatusType pg_fe_run_oauth_flow(PGconn *conn) { PostgresPollingStatusType result; + fe_oauth_state *state = conn_sasl_state(conn); + struct async_ctx *actx; #ifndef WIN32 sigset_t osigset; bool sigpipe_pending; @@ -2960,6 +3105,25 @@ pg_fe_run_oauth_flow(PGconn *conn) result = pg_fe_run_oauth_flow_impl(conn); + /* + * To assist with finding bugs in drain_socket_events() and + * drain_timer_events(), when we're in debug mode, track the total number + * of calls to this function and print that at the end of the flow. + * + * Be careful that state->async_ctx could be NULL if early initialization + * fails during the first call. + */ + actx = state->async_ctx; + Assert(actx || result == PGRES_POLLING_FAILED); + + if (actx && actx->debugging) + { + actx->dbg_num_calls++; + if (result == PGRES_POLLING_OK || result == PGRES_POLLING_FAILED) + fprintf(stderr, "[libpq] total number of polls: %d\n", + actx->dbg_num_calls); + } + #ifndef WIN32 if (masked) { diff --git a/src/interfaces/libpq-oauth/t/001_oauth.pl b/src/interfaces/libpq-oauth/t/001_oauth.pl new file mode 100644 index 000000000000..e769856c2c9c --- /dev/null +++ b/src/interfaces/libpq-oauth/t/001_oauth.pl @@ -0,0 +1,24 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Utils; +use Test::More; + +# Defer entirely to the oauth_tests executable. stdout/err is routed through +# Test::More so that our logging infrastructure can handle it correctly. Using +# IPC::Run::new_chunker seems to help interleave the two streams a little better +# than without. +# +# TODO: prove can also deal with native executables itself, which we could +# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup +# calls Perl directly, which would require more code to work around... and +# there's still the matter of logging. +my $builder = Test::More->builder; +my $out = $builder->output; +my $err = $builder->failure_output; + +IPC::Run::run ['oauth_tests'], + '>', IPC::Run::new_chunker, sub { print {$out} $_[0] }, + '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] } + or die "oauth_tests returned $?"; diff --git a/src/interfaces/libpq-oauth/test-oauth-curl.c b/src/interfaces/libpq-oauth/test-oauth-curl.c new file mode 100644 index 000000000000..1a03b0fc552e --- /dev/null +++ b/src/interfaces/libpq-oauth/test-oauth-curl.c @@ -0,0 +1,474 @@ +/* + * test-oauth-curl.c + * + * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets + * the tests reference static functions and other internals. + * + * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap + * must-succeed code as part of test setup. + * + * Copyright (c) 2025, PostgreSQL Global Development Group + */ + +#include "oauth-curl.c" + +#include + +#ifdef USE_ASSERT_CHECKING + +/* + * TAP Helpers + */ + +static int num_tests = 0; + +/* + * Reports ok/not ok to the TAP stream on stdout. + */ +#define ok(OK, TEST) \ + ok_impl(OK, TEST, #OK, __FILE__, __LINE__) + +static bool +ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line) +{ + printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test); + + if (!ok) + { + printf("# at %s:%d:\n", file, line); + printf("# expression is false: %s\n", teststr); + } + + return ok; +} + +/* + * Like ok(this == that), but with more diagnostics on failure. + * + * Only works on ints, but luckily that's all we need here. Note that the much + * simpler-looking macro implementation + * + * is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT) + * + * suffers from multiple evaluation of the macro arguments... + */ +#define is(THIS, THAT, TEST) \ + do { \ + int this_ = (THIS), \ + that_ = (THAT); \ + is_diag( \ + ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \ + this_, #THIS, that_, #THAT \ + ); \ + } while (0) + +static bool +is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr) +{ + if (!ok) + printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that); + + return ok; +} + +/* + * Utilities + */ + +/* + * Creates a partially-initialized async_ctx for the purposes of testing. Free + * with free_test_actx(). + */ +static struct async_ctx * +init_test_actx(void) +{ + struct async_ctx *actx; + + actx = calloc(1, sizeof(*actx)); + Assert(actx); + + actx->mux = PGINVALID_SOCKET; + actx->timerfd = -1; + actx->debugging = true; + + initPQExpBuffer(&actx->errbuf); + + Assert(setup_multiplexer(actx)); + + return actx; +} + +static void +free_test_actx(struct async_ctx *actx) +{ + termPQExpBuffer(&actx->errbuf); + + if (actx->mux != PGINVALID_SOCKET) + close(actx->mux); + if (actx->timerfd >= 0) + close(actx->timerfd); + + free(actx); +} + +static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */ + +/* + * Writes to the write side of a pipe until it won't take any more data. Returns + * the amount written. + */ +static ssize_t +fill_pipe(int fd) +{ + int mode; + ssize_t written = 0; + + /* Don't block. */ + Assert((mode = fcntl(fd, F_GETFL)) != -1); + Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0); + + while (true) + { + ssize_t w; + + w = write(fd, dummy_buf, sizeof(dummy_buf)); + if (w < 0) + { + if (errno != EAGAIN && errno != EWOULDBLOCK) + { + perror("write to pipe"); + written = -1; + } + break; + } + + written += w; + } + + /* Reset the descriptor flags. */ + Assert(fcntl(fd, F_SETFD, mode) == 0); + + return written; +} + +/* + * Drains the requested amount of data from the read side of a pipe. + */ +static bool +drain_pipe(int fd, ssize_t n) +{ + Assert(n > 0); + + while (n) + { + size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf); + ssize_t drained; + + drained = read(fd, dummy_buf, to_read); + if (drained < 0) + { + perror("read from pipe"); + return false; + } + + n -= drained; + } + + return true; +} + +/* + * Tests whether the multiplexer is marked ready by the deadline. This is a + * macro so that file/line information makes sense during failures. + * + * NB: our current multiplexer implementations (epoll/kqueue) are *readable* + * when the underlying libcurl sockets are *writable*. This behavior is pinned + * here to record that expectation; PGRES_POLLING_READING is hardcoded + * throughout the flow and would need to be changed if a new multiplexer does + * something different. + */ +#define mux_is_ready(MUX, DEADLINE, TEST) \ + do { \ + int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \ + Assert(res_ != -1); \ + ok(res_ > 0, "multiplexer is ready " TEST); \ + } while (0) + +/* + * The opposite of mux_is_ready(). + */ +#define mux_is_not_ready(MUX, TEST) \ + do { \ + int res_ = PQsocketPoll(MUX, 1, 0, 0); \ + Assert(res_ != -1); \ + is(res_, 0, "multiplexer is not ready " TEST); \ + } while (0) + +/* + * Test Suites + */ + +/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */ +static pg_usec_time_t timeout_us = 180 * 1000 * 1000; + +static void +test_set_timer(void) +{ + struct async_ctx *actx = init_test_actx(); + const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us; + + printf("# test_set_timer\n"); + + /* A zero-duration timer should result in a near-immediate ready signal. */ + Assert(set_timer(actx, 0)); + mux_is_ready(actx->mux, deadline, "when timer expires"); + is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires"); + + /* Resetting the timer far in the future should unset the ready signal. */ + Assert(set_timer(actx, INT_MAX)); + mux_is_not_ready(actx->mux, "when timer is reset to the future"); + is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer"); + + /* Setting another zero-duration timer should override the previous one. */ + Assert(set_timer(actx, 0)); + mux_is_ready(actx->mux, deadline, "when timer is re-expired"); + is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired"); + + /* And disabling that timer should once again unset the ready signal. */ + Assert(set_timer(actx, -1)); + mux_is_not_ready(actx->mux, "when timer is unset"); + is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset"); + + { + bool expired; + + /* Make sure drain_timer_events() functions correctly as well. */ + Assert(set_timer(actx, 0)); + mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)"); + + Assert(drain_timer_events(actx, &expired)); + mux_is_not_ready(actx->mux, "when timer is drained after expiring"); + is(expired, 1, "drain_timer_events() reports expiration"); + is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained"); + + /* A second drain should do nothing. */ + Assert(drain_timer_events(actx, &expired)); + mux_is_not_ready(actx->mux, "when timer is drained a second time"); + is(expired, 0, "drain_timer_events() reports no expiration"); + is(timer_expired(actx), 0, "timer_expired() still returns 0"); + } + + free_test_actx(actx); +} + +static void +test_register_socket(void) +{ + struct async_ctx *actx = init_test_actx(); + int pipefd[2]; + int rfd, + wfd; + bool bidirectional; + + /* Create a local pipe for communication. */ + Assert(pipe(pipefd) == 0); + rfd = pipefd[0]; + wfd = pipefd[1]; + + /* + * Some platforms (FreeBSD) implement bidirectional pipes, affecting the + * behavior of some of these tests. Store that knowledge for later. + */ + bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0; + + /* + * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for + * read/write operations, respectively, and once using CURL_POLL_INOUT for + * both sides. + */ + for (int inout = 0; inout < 2; inout++) + { + const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN; + const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT; + const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us; + size_t bidi_pipe_size = 0; /* silence compiler warnings */ + + printf("# test_register_socket %s\n", inout ? "(INOUT)" : ""); + + /* + * At the start of the test, the read side should be blocked and the + * write side should be open. (There's a mistake at the end of this + * loop otherwise.) + */ + Assert(PQsocketPoll(rfd, 1, 0, 0) == 0); + Assert(PQsocketPoll(wfd, 0, 1, 0) > 0); + + /* + * For bidirectional systems, emulate unidirectional behavior here by + * filling up the "read side" of the pipe. + */ + if (bidirectional) + Assert((bidi_pipe_size = fill_pipe(rfd)) > 0); + + /* Listen on the read side. The multiplexer shouldn't be ready yet. */ + Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0); + mux_is_not_ready(actx->mux, "when fd is not readable"); + + /* Writing to the pipe should result in a read-ready multiplexer. */ + Assert(write(wfd, "x", 1) == 1); + mux_is_ready(actx->mux, deadline, "when fd is readable"); + + /* + * Update the registration to wait on write events instead. The + * multiplexer should be unset. + */ + Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0); + mux_is_not_ready(actx->mux, "when waiting for writes on readable fd"); + + /* Re-register for read events. */ + Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0); + mux_is_ready(actx->mux, deadline, "when waiting for reads again"); + + /* Stop listening. The multiplexer should be unset. */ + Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0); + mux_is_not_ready(actx->mux, "when readable fd is removed"); + + /* Listen again. */ + Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0); + mux_is_ready(actx->mux, deadline, "when readable fd is re-added"); + + /* + * Draining the pipe should unset the multiplexer again, once the old + * event is drained. + */ + Assert(drain_pipe(rfd, 1)); + Assert(drain_socket_events(actx)); + mux_is_not_ready(actx->mux, "when fd is drained"); + + /* Undo any unidirectional emulation. */ + if (bidirectional) + Assert(drain_pipe(wfd, bidi_pipe_size)); + + /* Listen on the write side. An empty buffer should be writable. */ + Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0); + Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0); + mux_is_ready(actx->mux, deadline, "when fd is writable"); + + /* As above, wait on read events instead. */ + Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0); + mux_is_not_ready(actx->mux, "when waiting for reads on writable fd"); + + /* Re-register for write events. */ + Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0); + mux_is_ready(actx->mux, deadline, "when waiting for writes again"); + + { + ssize_t written; + + /* + * Fill the pipe. Once the old writable event is drained, the mux + * should not be ready. + */ + Assert((written = fill_pipe(wfd)) > 0); + printf("# pipe buffer is full at %zd bytes\n", written); + + Assert(drain_socket_events(actx)); + mux_is_not_ready(actx->mux, "when fd buffer is full"); + + /* Drain the pipe again. */ + Assert(drain_pipe(rfd, written)); + mux_is_ready(actx->mux, deadline, "when fd buffer is drained"); + } + + /* Stop listening. */ + Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0); + mux_is_not_ready(actx->mux, "when fd is removed"); + + /* Make sure an expired timer doesn't interfere with event draining. */ + { + /* Make the rfd appear unidirectional if necessary. */ + if (bidirectional) + Assert((bidi_pipe_size = fill_pipe(rfd)) > 0); + + /* Set the timer and wait for it to expire. */ + Assert(set_timer(actx, 0)); + Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0); + is(timer_expired(actx), 1, "timer is expired"); + + /* Register for read events and make the fd readable. */ + Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0); + Assert(write(wfd, "x", 1) == 1); + mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired"); + + /* + * Draining the pipe should unset the multiplexer again, once the + * old event is drained and the timer is reset. + * + * Order matters to avoid false negatives. First drain the socket, + * then unset the timer. We're trying to catch the case where the + * pending timer expiration event takes the place of one of the + * socket events we're attempting to drain. + */ + Assert(drain_pipe(rfd, 1)); + Assert(drain_socket_events(actx)); + Assert(set_timer(actx, -1)); + + is(timer_expired(actx), 0, "timer is no longer expired"); + mux_is_not_ready(actx->mux, "when fd is drained and timer reset"); + + /* Stop listening. */ + Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0); + + /* Undo any unidirectional emulation. */ + if (bidirectional) + Assert(drain_pipe(wfd, bidi_pipe_size)); + } + } + + close(rfd); + close(wfd); + free_test_actx(actx); +} + +int +main(int argc, char *argv[]) +{ + const char *timeout; + + /* Grab the default timeout. */ + timeout = getenv("PG_TEST_TIMEOUT_DEFAULT"); + if (timeout) + { + int timeout_s = atoi(timeout); + + if (timeout_s > 0) + timeout_us = timeout_s * 1000 * 1000; + } + + /* + * Set up line buffering for our output, to let stderr interleave in the + * log files. + */ + setvbuf(stdout, NULL, PG_IOLBF, 0); + + test_set_timer(); + test_register_socket(); + + printf("1..%d\n", num_tests); + return 0; +} + +#else /* !USE_ASSERT_CHECKING */ + +/* + * Skip the test suite when we don't have assertions. + */ +int +main(int argc, char *argv[]) +{ + printf("1..0 # skip: cassert is not enabled\n"); + + return 0; +} + +#endif /* USE_ASSERT_CHECKING */ diff --git a/src/test/modules/oauth_validator/t/001_server.pl b/src/test/modules/oauth_validator/t/001_server.pl index 41672ebd5c6d..c0dafb8be764 100644 --- a/src/test/modules/oauth_validator/t/001_server.pl +++ b/src/test/modules/oauth_validator/t/001_server.pl @@ -418,6 +418,35 @@ sub connstr qr/failed to obtain access token: mutual TLS required for client \(invalid_client\)/ ); +# Count the number of calls to the internal flow when multiple retries are +# triggered. The exact number depends on many things -- the TCP stack, the +# version of Curl in use, random chance -- but a ridiculously high number +# suggests something is wrong with our ability to clear multiplexer events after +# they're no longer applicable. +my ($ret, $stdout, $stderr) = $node->psql( + 'postgres', + "SELECT 'connected for call count'", + extra_params => ['-w'], + connstr => connstr(stage => 'token', retries => 2), + on_error_stop => 0); + +is($ret, 0, "call count connection succeeds"); +like( + $stderr, + qr@Visit https://example\.com/ and enter the code: postgresuser@, + "call count: stderr matches"); + +my $count_pattern = qr/\[libpq\] total number of polls: (\d+)/; +if (like($stderr, $count_pattern, "call count: count is printed")) +{ + # For reference, a typical flow with two retries might take between 5-15 + # calls to the client implementation. And while this will probably continue + # to change across OSes and Curl updates, we're likely in trouble if we see + # hundreds or thousands of calls. + $stderr =~ $count_pattern; + cmp_ok($1, '<', 100, "call count is reasonably small"); +} + # Stress test: make sure our builtin flow operates correctly even if the client # application isn't respecting PGRES_POLLING_READING/WRITING signals returned # from PQconnectPoll(). @@ -428,7 +457,7 @@ sub connstr connstr(stage => 'all', retries => 1, interval => 1)); note "running '" . join("' '", @cmd) . "'"; -my ($stdout, $stderr) = run_command(\@cmd); +($stdout, $stderr) = run_command(\@cmd); like($stdout, qr/connection succeeded/, "stress-async: stdout matches"); unlike(