From 5ecf4d0425d97cd23292f5a62c1d5c382e8103e4 Mon Sep 17 00:00:00 2001 From: Jacob Champion Date: Tue, 10 Jun 2025 16:38:59 -0700 Subject: [PATCH 1/4] oauth: Remove stale events from the kqueue multiplexer If a socket is added to the kqueue, becomes readable/writable, and subsequently becomes non-readable/writable again, the kqueue itself will remain readable until either the socket registration is removed, or the stale event is cleared via a call to kevent(). In many simple cases, Curl itself will remove the socket registration quickly, but in real-world usage, this is not guaranteed to happen. The kqueue can then remain stuck in a permanently readable state until the request ends, which results in pointless wakeups for the client and wasted CPU time. Implement drain_socket_events() to call kevent() and unstick any stale events. This is called right after drive_request(), before we return control to the client to wait. To make sure we've taken a look at the entire queue, register_socket() now tracks the number of outstanding registrations. Suggested-by: Thomas Munro --- src/interfaces/libpq-oauth/oauth-curl.c | 218 ++++++++++++++++++------ 1 file changed, 166 insertions(+), 52 deletions(-) diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c index dba9a684fa8a..8430356cfb5e 100644 --- a/src/interfaces/libpq-oauth/oauth-curl.c +++ b/src/interfaces/libpq-oauth/oauth-curl.c @@ -278,6 +278,10 @@ struct async_ctx bool user_prompted; /* have we already sent the authz prompt? */ bool used_basic_auth; /* did we send a client secret? */ bool debugging; /* can we give unsafe developer assistance? */ + +#if defined(HAVE_SYS_EVENT_H) + int nevents; /* how many events are we waiting on? */ +#endif }; /* @@ -1291,41 +1295,95 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx, return 0; #elif defined(HAVE_SYS_EVENT_H) - struct kevent ev[2] = {0}; + struct kevent ev[2]; struct kevent ev_out[2]; struct timespec timeout = {0}; - int nev = 0; + int nev; int res; + /* + * First, any existing registrations for this socket need to be removed, + * both to track the outstanding number of events, and to ensure that + * we're not woken up for things that Curl no longer cares about. + * + * ENOENT is okay, but we have to track how many we get, so use + * EV_RECEIPT. + */ + nev = 0; + EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0); + nev++; + EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0); + nev++; + + Assert(nev <= lengthof(ev)); + Assert(nev <= lengthof(ev_out)); + + res = kevent(actx->mux, ev, nev, ev_out, nev, &timeout); + if (res < 0) + { + actx_error(actx, "could not delete from kqueue: %m"); + return -1; + } + + /* + * We can't use the simple errno version of kevent, because we need to + * skip over ENOENT while still allowing a second change to be processed. + * So we need a longer-form error checking loop. + */ + for (int i = 0; i < res; ++i) + { + /* + * EV_RECEIPT should guarantee one EV_ERROR result for every change, + * whether successful or not. Failed entries contain a non-zero errno + * in the data field. + */ + Assert(ev_out[i].flags & EV_ERROR); + + errno = ev_out[i].data; + if (!errno) + { + /* Successfully removed; update the event count. */ + Assert(actx->nevents > 0); + actx->nevents--; + } + else if (errno != ENOENT) + { + actx_error(actx, "could not delete from kqueue: %m"); + return -1; + } + } + + /* If we're only removing registrations, we're done. */ + if (what == CURL_POLL_REMOVE) + return 0; + + /* + * Now add the new filters. This is more straightfoward than deletion. + * + * Combining this kevent() call with the one above seems like it should be + * theoretically possible, but beware that not all BSDs keep the original + * event flags when using EV_RECEIPT, so it's tricky to figure out which + * operations succeeded. For now we keep the deletions and the additions + * separate. + */ + nev = 0; + switch (what) { case CURL_POLL_IN: - EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0); nev++; break; case CURL_POLL_OUT: - EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0); nev++; break; case CURL_POLL_INOUT: - EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD | EV_RECEIPT, 0, 0, 0); - nev++; - EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD | EV_RECEIPT, 0, 0, 0); - nev++; - break; - - case CURL_POLL_REMOVE: - - /* - * We don't know which of these is currently registered, perhaps - * both, so we try to remove both. This means we need to tolerate - * ENOENT below. - */ - EV_SET(&ev[nev], socket, EVFILT_READ, EV_DELETE | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_READ, EV_ADD, 0, 0, 0); nev++; - EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_DELETE | EV_RECEIPT, 0, 0, 0); + EV_SET(&ev[nev], socket, EVFILT_WRITE, EV_ADD, 0, 0, 0); nev++; break; @@ -1334,45 +1392,91 @@ register_socket(CURL *curl, curl_socket_t socket, int what, void *ctx, return -1; } - res = kevent(actx->mux, ev, nev, ev_out, lengthof(ev_out), &timeout); + Assert(nev <= lengthof(ev)); + + res = kevent(actx->mux, ev, nev, NULL, 0, NULL); if (res < 0) { actx_error(actx, "could not modify kqueue: %m"); return -1; } + /* Update the event count, and we're done. */ + actx->nevents += nev; + + return 0; +#else +#error register_socket is not implemented on this platform +#endif +} + +/*------- + * Drains any stale level-triggered events out of the multiplexer. This is + * necessary only if the mux implementation requires it. + * + * As an example, consider the following sequence of events: + * 1. libcurl tries to write data to the send buffer, but it fills up. + * 2. libcurl registers CURL_POLL_OUT on the socket and returns control to the + * client to wait. + * 3. The kernel partially drains the send buffer. The socket becomes writable, + * and the client wakes up and calls back into the flow. + * 4. libcurl continues writing data to the send buffer, but it fills up again. + * The socket is no longer writable. + * + * At this point, an epoll-based mux no longer signals readiness, so nothing + * further needs to be done. But a kqueue-based mux will continue to signal + * "ready" until either the EVFILT_WRITE registration is dropped for the socket, + * or the old socket-writable event is read from the queue. Since Curl isn't + * guaranteed to do the former, we must do the latter here. + */ +static bool +drain_socket_events(struct async_ctx *actx) +{ +#if defined(HAVE_SYS_EPOLL_H) + /* The epoll implementation doesn't need to drain pending events. */ + return true; +#elif defined(HAVE_SYS_EVENT_H) + struct timespec timeout = {0}; + struct kevent *drain; + int drain_len; + /* - * We can't use the simple errno version of kevent, because we need to - * skip over ENOENT while still allowing a second change to be processed. - * So we need a longer-form error checking loop. + * Drain the events in one call, rather than looping. (We could maybe call + * kevent() drain_len times, instead of allocating space for the maximum + * number of events, but that relies on the events being in FIFO order to + * avoid starvation. The kqueue man pages don't seem to make any + * guarantees about that.) + * + * register_socket() keeps actx->nevents updated with the number of + * outstanding event filters. We don't track the registration of the + * timer; we just assume one could be registered here. */ - for (int i = 0; i < res; ++i) + drain_len = actx->nevents + 1; + + drain = malloc(sizeof(*drain) * drain_len); + if (!drain) { - /* - * EV_RECEIPT should guarantee one EV_ERROR result for every change, - * whether successful or not. Failed entries contain a non-zero errno - * in the data field. - */ - Assert(ev_out[i].flags & EV_ERROR); + actx_error(actx, "out of memory"); + return false; + } - errno = ev_out[i].data; - if (errno && errno != ENOENT) - { - switch (what) - { - case CURL_POLL_REMOVE: - actx_error(actx, "could not delete from kqueue: %m"); - break; - default: - actx_error(actx, "could not add to kqueue: %m"); - } - return -1; - } + /* + * Discard all pending events. Since our registrations are level-triggered + * (even the timer, since we use a chained kqueue for that instead of an + * EVFILT_TIMER on the top-level mux!), any events that we still need will + * remain signalled, and the stale ones will be swept away. + */ + if (kevent(actx->mux, NULL, 0, drain, drain_len, &timeout) < 0) + { + actx_error(actx, "could not drain kqueue: %m"); + free(drain); + return false; } - return 0; + free(drain); + return true; #else -#error register_socket is not implemented on this platform +#error drain_socket_events is not implemented on this platform #endif } @@ -1441,7 +1545,8 @@ set_timer(struct async_ctx *actx, long timeout) * macOS.) * * If there was no previous timer set, the kevent calls will result in - * ENOENT, which is fine. + * ENOENT, which is fine. (We don't track actx->nevents for this case; + * instead, drain_socket_events() just assumes a timer could be set.) */ EV_SET(&ev, 1, EVFILT_TIMER, EV_DELETE, 0, 0, 0); if (kevent(actx->timerfd, &ev, 1, NULL, 0, NULL) < 0 && errno != ENOENT) @@ -2755,13 +2860,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn) if (status == PGRES_POLLING_FAILED) goto error_return; - else if (status != PGRES_POLLING_OK) - { - /* not done yet */ - return status; - } + else if (status == PGRES_POLLING_OK) + break; /* done! */ + + /* + * This request is still running. + * + * Drain any stale socket events from the mux before we + * ask the client to poll. (Currently, this can occur only + * with kqueue.) If this is forgotten, the multiplexer can + * get stuck in a signalled state and we'll burn CPU + * cycles pointlessly. + */ + if (!drain_socket_events(actx)) + goto error_return; - break; + return status; } case OAUTH_STEP_WAIT_INTERVAL: From ad61c53340ef36e9612f5134d77a972aeb8d775e Mon Sep 17 00:00:00 2001 From: Jacob Champion Date: Wed, 5 Mar 2025 13:16:48 -0800 Subject: [PATCH 2/4] oauth: Remove expired timers from the multiplexer In a case similar to the previous commit, an expired timer can remain permanently readable if Curl does not remove the timeout itself. Since that removal isn't guaranteed to happen in real-world situations, implement drain_timer_events() to reset the timer before calling into drive_request(). Moving to drain_timer_events() happens to fix a logic bug in the previous caller of timer_expired(), which treated an error condition as if the timer were expired instead of bailing out. The previous implementation of timer_expired() gave differing results for epoll and kqueue if the timer was reset. (For epoll, a reset timer was considered to be expired, and for kqueue it was not.) This didn't previously cause problems, since timer_expired() was only called while the timer was known to be set, but both implementations now use the kqueue logic. --- src/interfaces/libpq-oauth/oauth-curl.c | 108 +++++++++++++++--------- 1 file changed, 68 insertions(+), 40 deletions(-) diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c index 8430356cfb5e..78ba3399495c 100644 --- a/src/interfaces/libpq-oauth/oauth-curl.c +++ b/src/interfaces/libpq-oauth/oauth-curl.c @@ -1588,40 +1588,20 @@ set_timer(struct async_ctx *actx, long timeout) /* * Returns 1 if the timeout in the multiplexer set has expired since the last - * call to set_timer(), 0 if the timer is still running, or -1 (with an - * actx_error() report) if the timer cannot be queried. + * call to set_timer(), 0 if the timer is either still running or disarmed, or + * -1 (with an actx_error() report) if the timer cannot be queried. */ static int timer_expired(struct async_ctx *actx) { -#if defined(HAVE_SYS_EPOLL_H) - struct itimerspec spec = {0}; - - if (timerfd_gettime(actx->timerfd, &spec) < 0) - { - actx_error(actx, "getting timerfd value: %m"); - return -1; - } - - /* - * This implementation assumes we're using single-shot timers. If you - * change to using intervals, you'll need to reimplement this function - * too, possibly with the read() or select() interfaces for timerfd. - */ - Assert(spec.it_interval.tv_sec == 0 - && spec.it_interval.tv_nsec == 0); - - /* If the remaining time to expiration is zero, we're done. */ - return (spec.it_value.tv_sec == 0 - && spec.it_value.tv_nsec == 0); -#elif defined(HAVE_SYS_EVENT_H) +#if defined(HAVE_SYS_EPOLL_H) || defined(HAVE_SYS_EVENT_H) int res; - /* Is the timer queue ready? */ + /* Is the timer ready? */ res = PQsocketPoll(actx->timerfd, 1 /* forRead */ , 0, 0); if (res < 0) { - actx_error(actx, "checking kqueue for timeout: %m"); + actx_error(actx, "checking timer expiration: %m"); return -1; } @@ -1653,6 +1633,36 @@ register_timer(CURLM *curlm, long timeout, void *ctx) return 0; } +/* + * Removes any expired-timer event from the multiplexer. If was_expired is not + * NULL, it will contain whether or not the timer was expired at time of call. + */ +static bool +drain_timer_events(struct async_ctx *actx, bool *was_expired) +{ + int res; + + res = timer_expired(actx); + if (res < 0) + return false; + + if (res > 0) + { + /* + * Timer is expired. We could drain the event manually from the + * timerfd, but it's easier to simply disable it; that keeps the + * platform-specific code in set_timer(). + */ + if (!set_timer(actx, -1)) + return false; + } + + if (was_expired) + *was_expired = (res > 0); + + return true; +} + /* * Prints Curl request debugging information to stderr. * @@ -2856,6 +2866,22 @@ pg_fe_run_oauth_flow_impl(PGconn *conn) { PostgresPollingStatusType status; + /* + * Clear any expired timeout before calling back into + * Curl. Curl is not guaranteed to do this for us, because + * its API expects us to use single-shot (i.e. + * edge-triggered) timeouts, and ours are level-triggered + * via the mux. + * + * This can't be combined with the drain_socket_events() + * call below: we might accidentally clear a short timeout + * that was both set and expired during the call to + * drive_request(). + */ + if (!drain_timer_events(actx, NULL)) + goto error_return; + + /* Move the request forward. */ status = drive_request(actx); if (status == PGRES_POLLING_FAILED) @@ -2879,24 +2905,26 @@ pg_fe_run_oauth_flow_impl(PGconn *conn) } case OAUTH_STEP_WAIT_INTERVAL: - - /* - * The client application is supposed to wait until our timer - * expires before calling PQconnectPoll() again, but that - * might not happen. To avoid sending a token request early, - * check the timer before continuing. - */ - if (!timer_expired(actx)) { - set_conn_altsock(conn, actx->timerfd); - return PGRES_POLLING_READING; - } + bool expired; - /* Disable the expired timer. */ - if (!set_timer(actx, -1)) - goto error_return; + /* + * The client application is supposed to wait until our + * timer expires before calling PQconnectPoll() again, but + * that might not happen. To avoid sending a token request + * early, check the timer before continuing. + */ + if (!drain_timer_events(actx, &expired)) + goto error_return; - break; + if (!expired) + { + set_conn_altsock(conn, actx->timerfd); + return PGRES_POLLING_READING; + } + + break; + } } /* From 711cd29f7372ca0ca17adf3a43f5bef291a20a90 Mon Sep 17 00:00:00 2001 From: Jacob Champion Date: Fri, 6 Jun 2025 15:22:41 -0700 Subject: [PATCH 3/4] oauth: Track total call count during a client flow Tracking down the bugs that led to the addition of drain_socket_events() and drain_timer_events() was difficult, because an inefficient flow is not visibly different from one that is working properly. To help maintainers notice when something has gone wrong, track the number of calls into the flow as part of debug mode, and print the total when the flow finishes. A new test makes sure the total count is less than 100. (We expect something on the order of 10.) This isn't foolproof, but it is able to catch several regressions in the logic of the prior two commits, and future work to add TLS support to the oauth_validator test server should strengthen it as well. --- src/interfaces/libpq-oauth/oauth-curl.c | 22 +++++++++++++ .../modules/oauth_validator/t/001_server.pl | 31 ++++++++++++++++++- 2 files changed, 52 insertions(+), 1 deletion(-) diff --git a/src/interfaces/libpq-oauth/oauth-curl.c b/src/interfaces/libpq-oauth/oauth-curl.c index 78ba3399495c..68303106a5d3 100644 --- a/src/interfaces/libpq-oauth/oauth-curl.c +++ b/src/interfaces/libpq-oauth/oauth-curl.c @@ -278,6 +278,7 @@ struct async_ctx bool user_prompted; /* have we already sent the authz prompt? */ bool used_basic_auth; /* did we send a client secret? */ bool debugging; /* can we give unsafe developer assistance? */ + int dbg_num_calls; /* (debug mode) how many times were we called? */ #if defined(HAVE_SYS_EVENT_H) int nevents; /* how many events are we waiting on? */ @@ -3074,6 +3075,8 @@ PostgresPollingStatusType pg_fe_run_oauth_flow(PGconn *conn) { PostgresPollingStatusType result; + fe_oauth_state *state = conn_sasl_state(conn); + struct async_ctx *actx; #ifndef WIN32 sigset_t osigset; bool sigpipe_pending; @@ -3102,6 +3105,25 @@ pg_fe_run_oauth_flow(PGconn *conn) result = pg_fe_run_oauth_flow_impl(conn); + /* + * To assist with finding bugs in drain_socket_events() and + * drain_timer_events(), when we're in debug mode, track the total number + * of calls to this function and print that at the end of the flow. + * + * Be careful that state->async_ctx could be NULL if early initialization + * fails during the first call. + */ + actx = state->async_ctx; + Assert(actx || result == PGRES_POLLING_FAILED); + + if (actx && actx->debugging) + { + actx->dbg_num_calls++; + if (result == PGRES_POLLING_OK || result == PGRES_POLLING_FAILED) + fprintf(stderr, "[libpq] total number of polls: %d\n", + actx->dbg_num_calls); + } + #ifndef WIN32 if (masked) { diff --git a/src/test/modules/oauth_validator/t/001_server.pl b/src/test/modules/oauth_validator/t/001_server.pl index 41672ebd5c6d..c0dafb8be764 100644 --- a/src/test/modules/oauth_validator/t/001_server.pl +++ b/src/test/modules/oauth_validator/t/001_server.pl @@ -418,6 +418,35 @@ sub connstr qr/failed to obtain access token: mutual TLS required for client \(invalid_client\)/ ); +# Count the number of calls to the internal flow when multiple retries are +# triggered. The exact number depends on many things -- the TCP stack, the +# version of Curl in use, random chance -- but a ridiculously high number +# suggests something is wrong with our ability to clear multiplexer events after +# they're no longer applicable. +my ($ret, $stdout, $stderr) = $node->psql( + 'postgres', + "SELECT 'connected for call count'", + extra_params => ['-w'], + connstr => connstr(stage => 'token', retries => 2), + on_error_stop => 0); + +is($ret, 0, "call count connection succeeds"); +like( + $stderr, + qr@Visit https://example\.com/ and enter the code: postgresuser@, + "call count: stderr matches"); + +my $count_pattern = qr/\[libpq\] total number of polls: (\d+)/; +if (like($stderr, $count_pattern, "call count: count is printed")) +{ + # For reference, a typical flow with two retries might take between 5-15 + # calls to the client implementation. And while this will probably continue + # to change across OSes and Curl updates, we're likely in trouble if we see + # hundreds or thousands of calls. + $stderr =~ $count_pattern; + cmp_ok($1, '<', 100, "call count is reasonably small"); +} + # Stress test: make sure our builtin flow operates correctly even if the client # application isn't respecting PGRES_POLLING_READING/WRITING signals returned # from PQconnectPoll(). @@ -428,7 +457,7 @@ sub connstr connstr(stage => 'all', retries => 1, interval => 1)); note "running '" . join("' '", @cmd) . "'"; -my ($stdout, $stderr) = run_command(\@cmd); +($stdout, $stderr) = run_command(\@cmd); like($stdout, qr/connection succeeded/, "stress-async: stdout matches"); unlike( From 3fbb6b92a74d72ced5bf10cc5251cdd29becc308 Mon Sep 17 00:00:00 2001 From: Jacob Champion Date: Wed, 5 Mar 2025 15:04:34 -0800 Subject: [PATCH 4/4] oauth: Add unit tests for multiplexer handling To better record the internal behaviors of oauth-curl.c, add a unit test suite for the socket and timer handling code. This is all based on TAP and driven by our existing Test::More infrastructure. --- src/interfaces/libpq-oauth/Makefile | 14 + src/interfaces/libpq-oauth/meson.build | 35 ++ src/interfaces/libpq-oauth/t/001_oauth.pl | 24 + src/interfaces/libpq-oauth/test-oauth-curl.c | 474 +++++++++++++++++++ 4 files changed, 547 insertions(+) create mode 100644 src/interfaces/libpq-oauth/t/001_oauth.pl create mode 100644 src/interfaces/libpq-oauth/test-oauth-curl.c diff --git a/src/interfaces/libpq-oauth/Makefile b/src/interfaces/libpq-oauth/Makefile index 682f17413b3a..e73573694b9a 100644 --- a/src/interfaces/libpq-oauth/Makefile +++ b/src/interfaces/libpq-oauth/Makefile @@ -79,5 +79,19 @@ uninstall: rm -f '$(DESTDIR)$(libdir)/$(stlib)' rm -f '$(DESTDIR)$(libdir)/$(shlib)' +.PHONY: all-tests +all-tests: oauth_tests$(X) + +oauth_tests$(X): test-oauth-curl.o oauth-utils.o $(WIN32RES) | submake-libpgport submake-libpq + $(CC) $(CFLAGS) $^ $(LDFLAGS) $(LDFLAGS_EX) $(SHLIB_LINK) -o $@ + +check: all-tests + $(prove_check) + +installcheck: all-tests + $(prove_installcheck) + clean distclean: clean-lib rm -f $(OBJS) $(OBJS_STATIC) $(OBJS_SHLIB) + rm -f test-oauth-curl.o oauth_tests$(X) + rm -rf tmp_check diff --git a/src/interfaces/libpq-oauth/meson.build b/src/interfaces/libpq-oauth/meson.build index df064c59a407..505e1671b863 100644 --- a/src/interfaces/libpq-oauth/meson.build +++ b/src/interfaces/libpq-oauth/meson.build @@ -47,3 +47,38 @@ libpq_oauth_so = shared_module(libpq_oauth_name, link_args: export_fmt.format(export_file.full_path()), kwargs: default_lib_args, ) + +libpq_oauth_test_deps = [] + +oauth_test_sources = files('test-oauth-curl.c') + libpq_oauth_so_sources + +if host_system == 'windows' + oauth_test_sources += rc_bin_gen.process(win32ver_rc, extra_args: [ + '--NAME', 'oauth_tests', + '--FILEDESC', 'OAuth unit test program',]) +endif + +libpq_oauth_test_deps += executable('oauth_tests', + oauth_test_sources, + dependencies: [frontend_shlib_code, libpq, libpq_oauth_deps], + kwargs: default_bin_args + { + 'c_args': default_bin_args.get('c_args', []) + libpq_oauth_so_c_args, + 'c_pch': pch_postgres_fe_h, + 'include_directories': [libpq_inc, postgres_inc], + 'install': false, + } +) + +testprep_targets += libpq_oauth_test_deps + +tests += { + 'name': 'libpq-oauth', + 'sd': meson.current_source_dir(), + 'bd': meson.current_build_dir(), + 'tap': { + 'tests': [ + 't/001_oauth.pl', + ], + 'deps': libpq_oauth_test_deps, + }, +} diff --git a/src/interfaces/libpq-oauth/t/001_oauth.pl b/src/interfaces/libpq-oauth/t/001_oauth.pl new file mode 100644 index 000000000000..e769856c2c9c --- /dev/null +++ b/src/interfaces/libpq-oauth/t/001_oauth.pl @@ -0,0 +1,24 @@ +# Copyright (c) 2025, PostgreSQL Global Development Group +use strict; +use warnings FATAL => 'all'; + +use PostgreSQL::Test::Utils; +use Test::More; + +# Defer entirely to the oauth_tests executable. stdout/err is routed through +# Test::More so that our logging infrastructure can handle it correctly. Using +# IPC::Run::new_chunker seems to help interleave the two streams a little better +# than without. +# +# TODO: prove can also deal with native executables itself, which we could +# probably make use of via PROVE_TESTS on the Makefile side. But the Meson setup +# calls Perl directly, which would require more code to work around... and +# there's still the matter of logging. +my $builder = Test::More->builder; +my $out = $builder->output; +my $err = $builder->failure_output; + +IPC::Run::run ['oauth_tests'], + '>', IPC::Run::new_chunker, sub { print {$out} $_[0] }, + '2>', IPC::Run::new_chunker, sub { print {$err} $_[0] } + or die "oauth_tests returned $?"; diff --git a/src/interfaces/libpq-oauth/test-oauth-curl.c b/src/interfaces/libpq-oauth/test-oauth-curl.c new file mode 100644 index 000000000000..1a03b0fc552e --- /dev/null +++ b/src/interfaces/libpq-oauth/test-oauth-curl.c @@ -0,0 +1,474 @@ +/* + * test-oauth-curl.c + * + * A unit test driver for libpq-oauth. This #includes oauth-curl.c, which lets + * the tests reference static functions and other internals. + * + * USE_ASSERT_CHECKING is required, to make it easy for tests to wrap + * must-succeed code as part of test setup. + * + * Copyright (c) 2025, PostgreSQL Global Development Group + */ + +#include "oauth-curl.c" + +#include + +#ifdef USE_ASSERT_CHECKING + +/* + * TAP Helpers + */ + +static int num_tests = 0; + +/* + * Reports ok/not ok to the TAP stream on stdout. + */ +#define ok(OK, TEST) \ + ok_impl(OK, TEST, #OK, __FILE__, __LINE__) + +static bool +ok_impl(bool ok, const char *test, const char *teststr, const char *file, int line) +{ + printf("%sok %d - %s\n", ok ? "" : "not ", ++num_tests, test); + + if (!ok) + { + printf("# at %s:%d:\n", file, line); + printf("# expression is false: %s\n", teststr); + } + + return ok; +} + +/* + * Like ok(this == that), but with more diagnostics on failure. + * + * Only works on ints, but luckily that's all we need here. Note that the much + * simpler-looking macro implementation + * + * is_diag(ok(THIS == THAT, TEST), THIS, #THIS, THAT, #THAT) + * + * suffers from multiple evaluation of the macro arguments... + */ +#define is(THIS, THAT, TEST) \ + do { \ + int this_ = (THIS), \ + that_ = (THAT); \ + is_diag( \ + ok_impl(this_ == that_, TEST, #THIS " == " #THAT, __FILE__, __LINE__), \ + this_, #THIS, that_, #THAT \ + ); \ + } while (0) + +static bool +is_diag(bool ok, int this, const char *thisstr, int that, const char *thatstr) +{ + if (!ok) + printf("# %s = %d; %s = %d\n", thisstr, this, thatstr, that); + + return ok; +} + +/* + * Utilities + */ + +/* + * Creates a partially-initialized async_ctx for the purposes of testing. Free + * with free_test_actx(). + */ +static struct async_ctx * +init_test_actx(void) +{ + struct async_ctx *actx; + + actx = calloc(1, sizeof(*actx)); + Assert(actx); + + actx->mux = PGINVALID_SOCKET; + actx->timerfd = -1; + actx->debugging = true; + + initPQExpBuffer(&actx->errbuf); + + Assert(setup_multiplexer(actx)); + + return actx; +} + +static void +free_test_actx(struct async_ctx *actx) +{ + termPQExpBuffer(&actx->errbuf); + + if (actx->mux != PGINVALID_SOCKET) + close(actx->mux); + if (actx->timerfd >= 0) + close(actx->timerfd); + + free(actx); +} + +static char dummy_buf[4 * 1024]; /* for fill_pipe/drain_pipe */ + +/* + * Writes to the write side of a pipe until it won't take any more data. Returns + * the amount written. + */ +static ssize_t +fill_pipe(int fd) +{ + int mode; + ssize_t written = 0; + + /* Don't block. */ + Assert((mode = fcntl(fd, F_GETFL)) != -1); + Assert(fcntl(fd, F_SETFL, mode | O_NONBLOCK) == 0); + + while (true) + { + ssize_t w; + + w = write(fd, dummy_buf, sizeof(dummy_buf)); + if (w < 0) + { + if (errno != EAGAIN && errno != EWOULDBLOCK) + { + perror("write to pipe"); + written = -1; + } + break; + } + + written += w; + } + + /* Reset the descriptor flags. */ + Assert(fcntl(fd, F_SETFD, mode) == 0); + + return written; +} + +/* + * Drains the requested amount of data from the read side of a pipe. + */ +static bool +drain_pipe(int fd, ssize_t n) +{ + Assert(n > 0); + + while (n) + { + size_t to_read = (n <= sizeof(dummy_buf)) ? n : sizeof(dummy_buf); + ssize_t drained; + + drained = read(fd, dummy_buf, to_read); + if (drained < 0) + { + perror("read from pipe"); + return false; + } + + n -= drained; + } + + return true; +} + +/* + * Tests whether the multiplexer is marked ready by the deadline. This is a + * macro so that file/line information makes sense during failures. + * + * NB: our current multiplexer implementations (epoll/kqueue) are *readable* + * when the underlying libcurl sockets are *writable*. This behavior is pinned + * here to record that expectation; PGRES_POLLING_READING is hardcoded + * throughout the flow and would need to be changed if a new multiplexer does + * something different. + */ +#define mux_is_ready(MUX, DEADLINE, TEST) \ + do { \ + int res_ = PQsocketPoll(MUX, 1, 0, DEADLINE); \ + Assert(res_ != -1); \ + ok(res_ > 0, "multiplexer is ready " TEST); \ + } while (0) + +/* + * The opposite of mux_is_ready(). + */ +#define mux_is_not_ready(MUX, TEST) \ + do { \ + int res_ = PQsocketPoll(MUX, 1, 0, 0); \ + Assert(res_ != -1); \ + is(res_, 0, "multiplexer is not ready " TEST); \ + } while (0) + +/* + * Test Suites + */ + +/* Per-suite timeout. Set via the PG_TEST_TIMEOUT_DEFAULT envvar. */ +static pg_usec_time_t timeout_us = 180 * 1000 * 1000; + +static void +test_set_timer(void) +{ + struct async_ctx *actx = init_test_actx(); + const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us; + + printf("# test_set_timer\n"); + + /* A zero-duration timer should result in a near-immediate ready signal. */ + Assert(set_timer(actx, 0)); + mux_is_ready(actx->mux, deadline, "when timer expires"); + is(timer_expired(actx), 1, "timer_expired() returns 1 when timer expires"); + + /* Resetting the timer far in the future should unset the ready signal. */ + Assert(set_timer(actx, INT_MAX)); + mux_is_not_ready(actx->mux, "when timer is reset to the future"); + is(timer_expired(actx), 0, "timer_expired() returns 0 with unexpired timer"); + + /* Setting another zero-duration timer should override the previous one. */ + Assert(set_timer(actx, 0)); + mux_is_ready(actx->mux, deadline, "when timer is re-expired"); + is(timer_expired(actx), 1, "timer_expired() returns 1 when timer is re-expired"); + + /* And disabling that timer should once again unset the ready signal. */ + Assert(set_timer(actx, -1)); + mux_is_not_ready(actx->mux, "when timer is unset"); + is(timer_expired(actx), 0, "timer_expired() returns 0 when timer is unset"); + + { + bool expired; + + /* Make sure drain_timer_events() functions correctly as well. */ + Assert(set_timer(actx, 0)); + mux_is_ready(actx->mux, deadline, "when timer is re-expired (drain_timer_events)"); + + Assert(drain_timer_events(actx, &expired)); + mux_is_not_ready(actx->mux, "when timer is drained after expiring"); + is(expired, 1, "drain_timer_events() reports expiration"); + is(timer_expired(actx), 0, "timer_expired() returns 0 after timer is drained"); + + /* A second drain should do nothing. */ + Assert(drain_timer_events(actx, &expired)); + mux_is_not_ready(actx->mux, "when timer is drained a second time"); + is(expired, 0, "drain_timer_events() reports no expiration"); + is(timer_expired(actx), 0, "timer_expired() still returns 0"); + } + + free_test_actx(actx); +} + +static void +test_register_socket(void) +{ + struct async_ctx *actx = init_test_actx(); + int pipefd[2]; + int rfd, + wfd; + bool bidirectional; + + /* Create a local pipe for communication. */ + Assert(pipe(pipefd) == 0); + rfd = pipefd[0]; + wfd = pipefd[1]; + + /* + * Some platforms (FreeBSD) implement bidirectional pipes, affecting the + * behavior of some of these tests. Store that knowledge for later. + */ + bidirectional = PQsocketPoll(rfd /* read */ , 0, 1 /* write */ , 0) > 0; + + /* + * This suite runs twice -- once using CURL_POLL_IN/CURL_POLL_OUT for + * read/write operations, respectively, and once using CURL_POLL_INOUT for + * both sides. + */ + for (int inout = 0; inout < 2; inout++) + { + const int in_event = inout ? CURL_POLL_INOUT : CURL_POLL_IN; + const int out_event = inout ? CURL_POLL_INOUT : CURL_POLL_OUT; + const pg_usec_time_t deadline = PQgetCurrentTimeUSec() + timeout_us; + size_t bidi_pipe_size = 0; /* silence compiler warnings */ + + printf("# test_register_socket %s\n", inout ? "(INOUT)" : ""); + + /* + * At the start of the test, the read side should be blocked and the + * write side should be open. (There's a mistake at the end of this + * loop otherwise.) + */ + Assert(PQsocketPoll(rfd, 1, 0, 0) == 0); + Assert(PQsocketPoll(wfd, 0, 1, 0) > 0); + + /* + * For bidirectional systems, emulate unidirectional behavior here by + * filling up the "read side" of the pipe. + */ + if (bidirectional) + Assert((bidi_pipe_size = fill_pipe(rfd)) > 0); + + /* Listen on the read side. The multiplexer shouldn't be ready yet. */ + Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0); + mux_is_not_ready(actx->mux, "when fd is not readable"); + + /* Writing to the pipe should result in a read-ready multiplexer. */ + Assert(write(wfd, "x", 1) == 1); + mux_is_ready(actx->mux, deadline, "when fd is readable"); + + /* + * Update the registration to wait on write events instead. The + * multiplexer should be unset. + */ + Assert(register_socket(NULL, rfd, CURL_POLL_OUT, actx, NULL) == 0); + mux_is_not_ready(actx->mux, "when waiting for writes on readable fd"); + + /* Re-register for read events. */ + Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0); + mux_is_ready(actx->mux, deadline, "when waiting for reads again"); + + /* Stop listening. The multiplexer should be unset. */ + Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0); + mux_is_not_ready(actx->mux, "when readable fd is removed"); + + /* Listen again. */ + Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0); + mux_is_ready(actx->mux, deadline, "when readable fd is re-added"); + + /* + * Draining the pipe should unset the multiplexer again, once the old + * event is drained. + */ + Assert(drain_pipe(rfd, 1)); + Assert(drain_socket_events(actx)); + mux_is_not_ready(actx->mux, "when fd is drained"); + + /* Undo any unidirectional emulation. */ + if (bidirectional) + Assert(drain_pipe(wfd, bidi_pipe_size)); + + /* Listen on the write side. An empty buffer should be writable. */ + Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0); + Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0); + mux_is_ready(actx->mux, deadline, "when fd is writable"); + + /* As above, wait on read events instead. */ + Assert(register_socket(NULL, wfd, CURL_POLL_IN, actx, NULL) == 0); + mux_is_not_ready(actx->mux, "when waiting for reads on writable fd"); + + /* Re-register for write events. */ + Assert(register_socket(NULL, wfd, out_event, actx, NULL) == 0); + mux_is_ready(actx->mux, deadline, "when waiting for writes again"); + + { + ssize_t written; + + /* + * Fill the pipe. Once the old writable event is drained, the mux + * should not be ready. + */ + Assert((written = fill_pipe(wfd)) > 0); + printf("# pipe buffer is full at %zd bytes\n", written); + + Assert(drain_socket_events(actx)); + mux_is_not_ready(actx->mux, "when fd buffer is full"); + + /* Drain the pipe again. */ + Assert(drain_pipe(rfd, written)); + mux_is_ready(actx->mux, deadline, "when fd buffer is drained"); + } + + /* Stop listening. */ + Assert(register_socket(NULL, wfd, CURL_POLL_REMOVE, actx, NULL) == 0); + mux_is_not_ready(actx->mux, "when fd is removed"); + + /* Make sure an expired timer doesn't interfere with event draining. */ + { + /* Make the rfd appear unidirectional if necessary. */ + if (bidirectional) + Assert((bidi_pipe_size = fill_pipe(rfd)) > 0); + + /* Set the timer and wait for it to expire. */ + Assert(set_timer(actx, 0)); + Assert(PQsocketPoll(actx->timerfd, 1, 0, deadline) > 0); + is(timer_expired(actx), 1, "timer is expired"); + + /* Register for read events and make the fd readable. */ + Assert(register_socket(NULL, rfd, in_event, actx, NULL) == 0); + Assert(write(wfd, "x", 1) == 1); + mux_is_ready(actx->mux, deadline, "when fd is readable and timer expired"); + + /* + * Draining the pipe should unset the multiplexer again, once the + * old event is drained and the timer is reset. + * + * Order matters to avoid false negatives. First drain the socket, + * then unset the timer. We're trying to catch the case where the + * pending timer expiration event takes the place of one of the + * socket events we're attempting to drain. + */ + Assert(drain_pipe(rfd, 1)); + Assert(drain_socket_events(actx)); + Assert(set_timer(actx, -1)); + + is(timer_expired(actx), 0, "timer is no longer expired"); + mux_is_not_ready(actx->mux, "when fd is drained and timer reset"); + + /* Stop listening. */ + Assert(register_socket(NULL, rfd, CURL_POLL_REMOVE, actx, NULL) == 0); + + /* Undo any unidirectional emulation. */ + if (bidirectional) + Assert(drain_pipe(wfd, bidi_pipe_size)); + } + } + + close(rfd); + close(wfd); + free_test_actx(actx); +} + +int +main(int argc, char *argv[]) +{ + const char *timeout; + + /* Grab the default timeout. */ + timeout = getenv("PG_TEST_TIMEOUT_DEFAULT"); + if (timeout) + { + int timeout_s = atoi(timeout); + + if (timeout_s > 0) + timeout_us = timeout_s * 1000 * 1000; + } + + /* + * Set up line buffering for our output, to let stderr interleave in the + * log files. + */ + setvbuf(stdout, NULL, PG_IOLBF, 0); + + test_set_timer(); + test_register_socket(); + + printf("1..%d\n", num_tests); + return 0; +} + +#else /* !USE_ASSERT_CHECKING */ + +/* + * Skip the test suite when we don't have assertions. + */ +int +main(int argc, char *argv[]) +{ + printf("1..0 # skip: cassert is not enabled\n"); + + return 0; +} + +#endif /* USE_ASSERT_CHECKING */