Skip to content

Add non-blocking async COPY FROM support (pg_putcopydata_async, pg_putcopyend_async, pg_flush)#176

Open
jjn1056 wants to merge 12 commits intobucardo:masterfrom
jjn1056:async-copy-from
Open

Add non-blocking async COPY FROM support (pg_putcopydata_async, pg_putcopyend_async, pg_flush)#176
jjn1056 wants to merge 12 commits intobucardo:masterfrom
jjn1056:async-copy-from

Conversation

@jjn1056
Copy link
Copy Markdown

@jjn1056 jjn1056 commented Mar 18, 2026

Closes #177

Summary

Adds three new methods to enable non-blocking COPY FROM STDIN for async Perl libraries:

  • pg_putcopydata_async — non-blocking version of pg_putcopydata. Returns 1/0/-1 matching PQputCopyData semantics.
  • pg_putcopyend_async — non-blocking version of pg_putcopyend. Polls for server result without blocking.
  • pg_flush — exposes PQflush for manual output buffer management. Returns 0/1/-1 matching PQflush semantics.

Motivation

DBD::Pg has async support for queries (pg_async/PG_ASYNC) and async COPY TO (pg_getcopydata_async), but COPY FROM is unconditionally blocking. pg_putcopydata blocks inside PQputCopyData when the TCP send buffer fills because DBD::Pg never calls PQsetnonblocking.

This prevents async Perl libraries (Future::IO-based, EV::Pg-style, IO::Async) from performing non-blocking bulk data loading. COPY is the standard PostgreSQL mechanism for bulk import and is dramatically faster than multi-row INSERT.

Approach

The implementation follows the pattern already established by pg_getcopydata/pg_getcopydata_async:

  • pg_db_putcopydata gains an async flag parameter (0 for blocking, 1 for async)
  • The blocking path (async=0) is unchanged — existing pg_putcopydata behavior is identical
  • The async path calls PQsetnonblocking(conn, 1) on first use, scoped to COPY state (safe because no other operations are permitted during COPY)
  • PQsetnonblocking is restored to 0 when pg_putcopyend_async completes
  • Return values match libpq conventions exactly (verified against EV::Pg and libpq docs)

Return value conventions

pg_putcopydata_async: 1 = queued, 0 = buffer full (retry), -1 = error
pg_putcopyend_async: 1 = done, 0 = not ready (poll), -1 = error
pg_flush: 0 = flushed, 1 = pending (poll), -1 = error

After pg_putcopydata_async returns 1, the caller calls pg_flush to push data to the server. This matches the standard libpq pattern: PQputCopyData queues, PQflush sends.

Implementation details

  • Pg.h: Added TRACE_PQFLUSH and TRACE_PQSETNONBLOCKING macros
  • dbdimp.h: Added copy_nonblocking field to imp_dbh_t, new function declarations
  • dbdimp.c: Modified pg_db_putcopydata to accept async flag, added pg_db_putcopyend_async (uses copystate=-1 sentinel for "end sent, awaiting result" phase), added pg_db_flush
  • Pg.xs: Added XS bindings following the getcopydata/getcopydata_async pattern
  • Pg.pm: Added install_method calls and POD documentation with examples

Tests

27 new test assertions in t/07copy.t covering:

  • Basic async put/flush/end cycle with data verification
  • pg_flush return values
  • Wrong-state errors (not in COPY, COPY OUT state, no argument)
  • do() rejection during async COPY IN
  • Recovery after interrupted COPY
  • Binary COPY round-trip via async methods
  • Multiple async COPY cycles on the same connection (PQsetnonblocking toggling)
  • Large data set (1000 rows) with buffer-full handling
  • Backward compatibility (blocking methods still work after async used)

Full test suite passes: 3,773 tests, 0 failures, 0 regressions.

Backward compatibility

Zero risk. The blocking pg_putcopydata is unchanged — the XS wrapper passes async=0. No existing behavior is affected. New methods are purely additive.

jjn1056 and others added 5 commits March 17, 2026 14:24
Add pg_putcopydata_async, pg_putcopyend_async, and pg_flush methods to
enable non-blocking COPY FROM STDIN for async Perl libraries.

pg_putcopydata_async enables PQsetnonblocking on the connection (safe
during COPY state since no other operations are permitted), then uses
PQflush to manage the output buffer. Returns 0 when buffer is full or
2 when flush is pending, allowing the caller to poll the socket and
retry without blocking the event loop.

pg_putcopyend_async sends PQputCopyEnd and polls for the server result
using PQconsumeInput/PQisBusy. Uses copystate=-1 as a sentinel to
track the "end sent, awaiting result" phase across retries. Restores
blocking mode automatically on completion.

pg_flush exposes PQflush for callers that need to complete a pending
flush between putcopydata_async or putcopyend_async calls.

The blocking pg_putcopydata is unchanged (passes async=0 internally).
All 3,763 existing tests continue to pass with zero regressions.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Drop the invented return value 2 from pg_putcopydata_async and
pg_putcopyend_async. The interface now matches libpq and EV::Pg:

pg_putcopydata_async: 1 (queued), 0 (buffer full), -1 (error)
pg_putcopyend_async:  1 (done), 0 (not ready), -1 (error)
pg_flush:             0 (flushed), 1 (pending), -1 (error)

After pg_putcopydata_async returns 1, caller calls pg_flush to push
data to the server. This is the standard libpq pattern: PQputCopyData
queues, PQflush sends. No DBD::Pg-specific protocol to learn.

pg_putcopydata_async no longer calls PQflush internally for async
mode; that responsibility moves to pg_flush. The COPY_BOTH path
(logical replication) still auto-flushes as before.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Show simple and robust usage patterns with real data (matching the
existing pg_putcopydata pizza examples), IO::Select for socket
polling, explicit column lists, and the full pg_putcopyend_async
poll loop. Add a note about COPY text format and alternatives.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- pg_putcopydata_async fails in COPY OUT state
- pg_putcopydata_async fails with no argument
- do() fails during async COPY IN (mirrors blocking test)
- Recovery after rude non-COPY attempt during async COPY
- Binary COPY round-trip via async methods
- Multiple async COPY cycles on the same connection

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Make explicit that the connection itself is restricted to COPY
operations, but the non-blocking methods let the event loop service
other connections and tasks between calls.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@esabol
Copy link
Copy Markdown
Collaborator

esabol commented Mar 18, 2026

Spelling and perlcritic test failures. Set AUTHOR_TESTING=1 and RELEASE_TESTING=1 to reproduce and revise.

Does this PR completely supersede the functionality (and tests) in PR #163 ? If so, I'll close that issue.

Comment thread t/07copy.t Outdated
Comment thread t/07copy.t
@jjn1056
Copy link
Copy Markdown
Author

jjn1056 commented Mar 29, 2026

This PR does not supersede PR #163. They address different directions of COPY:

They're complementary and could both land independently.

Re: spelling and perlcritic failures — working on fixes now.

Fix spelling and perlcritic test failures found with AUTHOR_TESTING=1
and RELEASE_TESTING=1. Rename binarycopy_async temp table to use the
dbd_pg_ prefix per project convention. Add dbd_pg_test_async_copy to
the @tables cleanup list in dbdpg_test_setup.pl.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@jjn1056 jjn1056 force-pushed the async-copy-from branch 2 times, most recently from 33bea32 to 8a03679 Compare March 29, 2026 14:53
@jjn1056
Copy link
Copy Markdown
Author

jjn1056 commented Mar 29, 2026

@esabol Thanks for looking at this. Sorry took me so long to get back to you; I got pulled into a critical end of quarter thing at the office and was tapped out :).

I went thru all these and doubled checked for regressions, etc. I think this along with PR 163 gets us really stellar asynchronous support for COPY, which is for my use cases a big win.

Let me know if you spot anything else I need to tweak, and again thanks for your time running this all important project for the Perl community!

@esabol
Copy link
Copy Markdown
Collaborator

esabol commented Mar 29, 2026

Thanks, @jjn1056 ! The latest commit passes the CI workflow. Everything looks good to me, but we need to get @turnstep to do the final review.

@esabol esabol requested a review from turnstep March 29, 2026 21:27
@jjn1056
Copy link
Copy Markdown
Author

jjn1056 commented Mar 30, 2026

One of the parts I wasn't sure about was if we needed to version gateway any of this. As far as I could tell this should work even on vastly out of date versions of Postgresql, like 9.5 but I might try to spin some sort dockers with that just to double check.

@esabol
Copy link
Copy Markdown
Collaborator

esabol commented Mar 30, 2026

@jjn1056 wrote:

One of the parts I wasn't sure about was if we needed to version gateway any of this. As far as I could tell this should work even on vastly out of date versions of Postgresql, like 9.5 but I might try to spin some sort dockers with that just to double check.

Ah. Thanks for checking. Technically, according to the README, DBD::Pg is supposed to support PostgreSQL going all the way back to 8.0. It's unfortunate we don't have the CI apparatus to test that.

@jjn1056
Copy link
Copy Markdown
Author

jjn1056 commented Mar 30, 2026

I'll see if I can find anyone with a working postgresql 8 docker. It would be handy because I want to look at pipelining next and that requires Pg 14+ so we will need to think about some sort of conditional compile eventually; so if we need it now would be good to get agreement on the way we should do that.

Copy link
Copy Markdown
Contributor

@turnstep turnstep left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left a few notes, but I need to circle back when I can give this a lot more focus. Excellent work, though, I'm excited to get this in and have some solid async support finally.

Comment thread dbdimp.c Outdated
Comment thread Pg.pm
Comment thread Pg.pm Outdated
Comment thread Pg.pm Outdated
Comment thread Pg.pm Outdated
Comment thread dbdimp.c
}
else if (0 == copystatus) { /* non-blocking mode only */
else if (0 == copystatus) {
/* Non-blocking mode: output buffer full, caller should wait for write-ready */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we set imp_dbh->copystate here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we should change copystate here — we're still in COPY_IN, the buffer being full is a transient condition. The caller will retry the same pg_putcopydata_async call after the socket becomes write-ready, and that re-entry checks for PGRES_COPY_IN as a precondition. Changing copystate would break that check. What do you think?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Going to have to circle back to this when I have a clear schedule and a clear head. It all made sense when I wrote that, but now I'm not sure. :)

Comment thread Pg.pm Outdated

## Non-blocking end: poll until server confirms
while ((my $end = $dbh->pg_putcopyend_async()) == 0) {
$sel->can_read();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Er...can_read or can_write? Can buffer be full here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right — can_read is only correct for the result-polling phase, but pg_putcopyend_async also returns 0 during the flush phase (which needs write-ready). Rather than try to solve that with IO::Select, I replaced the example with Time::HiRes::sleep-based polling to match the style used by the existing async examples (pg_ready, pg_cancel, etc.) elsewhere in the POD. This also sidesteps the read vs write ambiguity entirely. In practice, users of these methods will be in a real event loop (IO::Async, Mojo::IOLoop, AnyEvent, etc.) where they'd register the socket for both read and write events — but examples for specific event loops probably belong in a cookbook or separate document rather than the core POD.

Comment thread dbdimp.c
TRACE_PQERRORMESSAGE;
pg_error(aTHX_ dbh, PGRES_FATAL_ERROR, PQerrorMessage(imp_dbh->conn));
if (TEND_slow) TRC(DBILOGFP, "%sEnd pg_db_putcopydata (error: setnonblocking)\n", THEADER_slow);
return -1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to change copystate?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so — copystate tracks the COPY protocol state (COPY_IN, COPY_OUT, etc.) while copy_nonblocking tracks whether we've enabled non-blocking mode on the connection. Switching to non-blocking doesn't change the protocol state — we're still in COPY_IN. The two flags are intentionally orthogonal. What do you think?

Comment thread t/07copy.t
jjn1056 and others added 3 commits April 3, 2026 20:12
Per turnstep's review: use the shorter form consistently across
documentation, matching what the C comments already use.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per turnstep's review: the simple example had unchecked return values,
an ambiguous flush loop condition, and created a new IO::Select on
every iteration. The robust example already covers proper usage.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Per turnstep's review and to match the existing async example style
used throughout the POD (pg_ready, pg_cancel examples all use
sleep-based polling). Also sidesteps the can_read vs can_write
ambiguity in pg_putcopyend_async's multi-phase return.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@turnstep
Copy link
Copy Markdown
Contributor

turnstep commented Apr 4, 2026

Ah. Thanks for checking. Technically, according to the README, DBD::Pg is supposed to support PostgreSQL going all the way back to 8.0. It's unfortunate we don't have the CI apparatus to test that.

Don't sweat this too much. Support for old unsupported versions of Postgres (and v8 is so damn old) has always been on a "best effort" basis. Maybe we should make that more explicit in the docs? I still spin up 8.4 for testing, but it's getting harder and harder to compile those old versions. I never want to stop supporting older versions just because they are old, and am willing to have the code go through some contortions to keep them working, but rest assured, there are limits. :)

jjn1056 and others added 2 commits April 3, 2026 21:35
Per turnstep's review: a -1 from pg_flush was silently ignored.
Now checks for error and breaks out of the loop, matching the
pattern used for pg_putcopydata_async error handling.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Move the pg_flush error check (-1) outside the while loop so that
`last` exits the outer for loop instead of just the inner while.
Previously, a flush error during the retry loop would only break
the while, allowing the for loop to continue processing rows.

Suggested-by: Ed Sabol
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add non-blocking async COPY FROM STDIN support

3 participants