Add non-blocking async COPY FROM support (pg_putcopydata_async, pg_putcopyend_async, pg_flush)#176
Add non-blocking async COPY FROM support (pg_putcopydata_async, pg_putcopyend_async, pg_flush)#176jjn1056 wants to merge 12 commits intobucardo:masterfrom
Conversation
Add pg_putcopydata_async, pg_putcopyend_async, and pg_flush methods to enable non-blocking COPY FROM STDIN for async Perl libraries. pg_putcopydata_async enables PQsetnonblocking on the connection (safe during COPY state since no other operations are permitted), then uses PQflush to manage the output buffer. Returns 0 when buffer is full or 2 when flush is pending, allowing the caller to poll the socket and retry without blocking the event loop. pg_putcopyend_async sends PQputCopyEnd and polls for the server result using PQconsumeInput/PQisBusy. Uses copystate=-1 as a sentinel to track the "end sent, awaiting result" phase across retries. Restores blocking mode automatically on completion. pg_flush exposes PQflush for callers that need to complete a pending flush between putcopydata_async or putcopyend_async calls. The blocking pg_putcopydata is unchanged (passes async=0 internally). All 3,763 existing tests continue to pass with zero regressions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Drop the invented return value 2 from pg_putcopydata_async and pg_putcopyend_async. The interface now matches libpq and EV::Pg: pg_putcopydata_async: 1 (queued), 0 (buffer full), -1 (error) pg_putcopyend_async: 1 (done), 0 (not ready), -1 (error) pg_flush: 0 (flushed), 1 (pending), -1 (error) After pg_putcopydata_async returns 1, caller calls pg_flush to push data to the server. This is the standard libpq pattern: PQputCopyData queues, PQflush sends. No DBD::Pg-specific protocol to learn. pg_putcopydata_async no longer calls PQflush internally for async mode; that responsibility moves to pg_flush. The COPY_BOTH path (logical replication) still auto-flushes as before. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Show simple and robust usage patterns with real data (matching the existing pg_putcopydata pizza examples), IO::Select for socket polling, explicit column lists, and the full pg_putcopyend_async poll loop. Add a note about COPY text format and alternatives. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- pg_putcopydata_async fails in COPY OUT state - pg_putcopydata_async fails with no argument - do() fails during async COPY IN (mirrors blocking test) - Recovery after rude non-COPY attempt during async COPY - Binary COPY round-trip via async methods - Multiple async COPY cycles on the same connection Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Make explicit that the connection itself is restricted to COPY operations, but the non-blocking methods let the event loop service other connections and tasks between calls. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
|
Spelling and perlcritic test failures. Set AUTHOR_TESTING=1 and RELEASE_TESTING=1 to reproduce and revise. Does this PR completely supersede the functionality (and tests) in PR #163 ? If so, I'll close that issue. |
|
This PR does not supersede PR #163. They address different directions of COPY:
They're complementary and could both land independently. Re: spelling and perlcritic failures — working on fixes now. |
Fix spelling and perlcritic test failures found with AUTHOR_TESTING=1 and RELEASE_TESTING=1. Rename binarycopy_async temp table to use the dbd_pg_ prefix per project convention. Add dbd_pg_test_async_copy to the @tables cleanup list in dbdpg_test_setup.pl. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
33bea32 to
8a03679
Compare
|
@esabol Thanks for looking at this. Sorry took me so long to get back to you; I got pulled into a critical end of quarter thing at the office and was tapped out :). I went thru all these and doubled checked for regressions, etc. I think this along with PR 163 gets us really stellar asynchronous support for COPY, which is for my use cases a big win. Let me know if you spot anything else I need to tweak, and again thanks for your time running this all important project for the Perl community! |
|
One of the parts I wasn't sure about was if we needed to version gateway any of this. As far as I could tell this should work even on vastly out of date versions of Postgresql, like 9.5 but I might try to spin some sort dockers with that just to double check. |
|
@jjn1056 wrote:
Ah. Thanks for checking. Technically, according to the README, DBD::Pg is supposed to support PostgreSQL going all the way back to 8.0. It's unfortunate we don't have the CI apparatus to test that. |
|
I'll see if I can find anyone with a working postgresql 8 docker. It would be handy because I want to look at pipelining next and that requires Pg 14+ so we will need to think about some sort of conditional compile eventually; so if we need it now would be good to get agreement on the way we should do that. |
turnstep
left a comment
There was a problem hiding this comment.
Left a few notes, but I need to circle back when I can give this a lot more focus. Excellent work, though, I'm excited to get this in and have some solid async support finally.
| } | ||
| else if (0 == copystatus) { /* non-blocking mode only */ | ||
| else if (0 == copystatus) { | ||
| /* Non-blocking mode: output buffer full, caller should wait for write-ready */ |
There was a problem hiding this comment.
Should we set imp_dbh->copystate here?
There was a problem hiding this comment.
I don't think we should change copystate here — we're still in COPY_IN, the buffer being full is a transient condition. The caller will retry the same pg_putcopydata_async call after the socket becomes write-ready, and that re-entry checks for PGRES_COPY_IN as a precondition. Changing copystate would break that check. What do you think?
There was a problem hiding this comment.
Going to have to circle back to this when I have a clear schedule and a clear head. It all made sense when I wrote that, but now I'm not sure. :)
|
|
||
| ## Non-blocking end: poll until server confirms | ||
| while ((my $end = $dbh->pg_putcopyend_async()) == 0) { | ||
| $sel->can_read(); |
There was a problem hiding this comment.
Er...can_read or can_write? Can buffer be full here?
There was a problem hiding this comment.
You're right — can_read is only correct for the result-polling phase, but pg_putcopyend_async also returns 0 during the flush phase (which needs write-ready). Rather than try to solve that with IO::Select, I replaced the example with Time::HiRes::sleep-based polling to match the style used by the existing async examples (pg_ready, pg_cancel, etc.) elsewhere in the POD. This also sidesteps the read vs write ambiguity entirely. In practice, users of these methods will be in a real event loop (IO::Async, Mojo::IOLoop, AnyEvent, etc.) where they'd register the socket for both read and write events — but examples for specific event loops probably belong in a cookbook or separate document rather than the core POD.
| TRACE_PQERRORMESSAGE; | ||
| pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn)); | ||
| if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (error: setnonblocking)\n", THEADER_slow); | ||
| return -1; |
There was a problem hiding this comment.
Do we need to change copystate?
There was a problem hiding this comment.
I don't think so — copystate tracks the COPY protocol state (COPY_IN, COPY_OUT, etc.) while copy_nonblocking tracks whether we've enabled non-blocking mode on the connection. Switching to non-blocking doesn't change the protocol state — we're still in COPY_IN. The two flags are intentionally orthogonal. What do you think?
Per turnstep's review: use the shorter form consistently across documentation, matching what the C comments already use. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per turnstep's review: the simple example had unchecked return values, an ambiguous flush loop condition, and created a new IO::Select on every iteration. The robust example already covers proper usage. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per turnstep's review and to match the existing async example style used throughout the POD (pg_ready, pg_cancel examples all use sleep-based polling). Also sidesteps the can_read vs can_write ambiguity in pg_putcopyend_async's multi-phase return. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Don't sweat this too much. Support for old unsupported versions of Postgres (and v8 is so damn old) has always been on a "best effort" basis. Maybe we should make that more explicit in the docs? I still spin up 8.4 for testing, but it's getting harder and harder to compile those old versions. I never want to stop supporting older versions just because they are old, and am willing to have the code go through some contortions to keep them working, but rest assured, there are limits. :) |
Per turnstep's review: a -1 from pg_flush was silently ignored. Now checks for error and breaks out of the loop, matching the pattern used for pg_putcopydata_async error handling. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move the pg_flush error check (-1) outside the while loop so that `last` exits the outer for loop instead of just the inner while. Previously, a flush error during the retry loop would only break the while, allowing the for loop to continue processing rows. Suggested-by: Ed Sabol Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Closes #177
Summary
Adds three new methods to enable non-blocking COPY FROM STDIN for async Perl libraries:
Motivation
DBD::Pg has async support for queries (pg_async/PG_ASYNC) and async COPY TO (pg_getcopydata_async), but COPY FROM is unconditionally blocking. pg_putcopydata blocks inside PQputCopyData when the TCP send buffer fills because DBD::Pg never calls PQsetnonblocking.
This prevents async Perl libraries (Future::IO-based, EV::Pg-style, IO::Async) from performing non-blocking bulk data loading. COPY is the standard PostgreSQL mechanism for bulk import and is dramatically faster than multi-row INSERT.
Approach
The implementation follows the pattern already established by pg_getcopydata/pg_getcopydata_async:
asyncflag parameter (0 for blocking, 1 for async)Return value conventions
pg_putcopydata_async: 1 = queued, 0 = buffer full (retry), -1 = error
pg_putcopyend_async: 1 = done, 0 = not ready (poll), -1 = error
pg_flush: 0 = flushed, 1 = pending (poll), -1 = error
After pg_putcopydata_async returns 1, the caller calls pg_flush to push data to the server. This matches the standard libpq pattern: PQputCopyData queues, PQflush sends.
Implementation details
copy_nonblockingfield to imp_dbh_t, new function declarationsTests
27 new test assertions in t/07copy.t covering:
Full test suite passes: 3,773 tests, 0 failures, 0 regressions.
Backward compatibility
Zero risk. The blocking pg_putcopydata is unchanged — the XS wrapper passes async=0. No existing behavior is affected. New methods are purely additive.