Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,19 @@ All notable changes to this project will be documented in this file. It uses the
bounding memory for large `COPY FROM` and `INSERT SELECT` ([#303]).
* Added pushdown for the three-argument forms of `ltrim`, `rtrim`, and
`btrim` ([#307]).
* Added support for binary driver to `clickhouse_raw_query()`.
* Added support for binary driver to `clickhouse_raw_query()` ([#309]).
* Added `clickhouse_query(server, sql)`, a set-returning function that runs a
query against a configured foreign server and returns its rows typed by the
caller's column definition list.
caller's column definition list ([#309]).
* Added pushdown support for partial aggregates under partitionwise
aggregation, so a query over a partitioned table mixing local and foreign
partitions computes the foreign partition's aggregate on ClickHouse
instead of fetching its rows. Covers decomposable aggregates (`count`,
`sum`, `min`, `max`, `bool_and`/`bool_or`, `bit_and`/`bit_or`), plus `avg`
over integers and `avg`/`var_pop`/`var_samp`/`stddev_pop`/`stddev_samp`
over floats. Aggregates with an internal transition state (anything over
`numeric`, `avg(bigint)`, `avg(interval)`) still fall back to local
aggregation. Requires `enable_partitionwise_aggregate` ([#298]).

### 🐞 Bug Fixes

Expand Down Expand Up @@ -84,6 +93,8 @@ All notable changes to this project will be documented in this file. It uses the
"ClickHouse/pg_clickhouse#293 Add ClickHouse server-version detection plumbing"
[#296]: https://github.com/ClickHouse/pg_clickhouse/pull/296
"ClickHouse/pg_clickhouse#296 Fix benchmark queries that crash/hang with binary driver"
[#298]: https://github.com/ClickHouse/pg_clickhouse/pull/298
"ClickHouse/pg_clickhouse#296 Simple partitioned aggregation"
[#300]: https://github.com/ClickHouse/pg_clickhouse/pull/300
"ClickHouse/pg_clickhouse#300 fix(http): handle subsecond precision"
[#301]: https://github.com/ClickHouse/pg_clickhouse/pull/301
Expand All @@ -92,6 +103,8 @@ All notable changes to this project will be documented in this file. It uses the
"ClickHouse/pg_clickhouse#303 Flush buffered data during binary insert"
[#307]: https://github.com/ClickHouse/pg_clickhouse/pull/307
"ClickHouse/pg_clickhouse#307 Push down three-argument trim functions"
[#307]: https://github.com/ClickHouse/pg_clickhouse/pull/309
"ClickHouse/pg_clickhouse#309 add binary support to clickhouse_raw_query, add clickhouse_query"

## [v0.3.2] — 2026-06-16

Expand Down
252 changes: 252 additions & 0 deletions doc/offload-partition.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,252 @@
-- Offload contiguous set of local RANGE partitions to single ClickHouse table,
-- replacing them with one wide foreign-table partition.
--
-- The resulting partition spans union of old_parts' bounds, so it can only ever
-- match whole partitions. The remote table ch_table must already exist with
-- matching columns & hold this range only; it defaults to parent's name.
CREATE FUNCTION clickhouse_offload_range(
parent regclass,
old_parts regclass[],
server name,
ch_table text DEFAULT NULL,
table_opts text DEFAULT NULL,
new_part name DEFAULT NULL
) RETURNS bigint
LANGUAGE plpgsql AS $offload$
DECLARE
partstrat "char";
partnatts int;
keyattnum int;
keycol name;
keytype text;
schemaname name;
parentname name;
coldefs text;
newrel name := new_part;
opts text;
p regclass;
bound text;
m text[];
rows_csv text := '';
from_value text;
to_value text;
contiguous boolean;
n bigint;
local_rows bigint := 0;
BEGIN
-- Restrict to single-column RANGE partitioning, the time-series case
SELECT pt.partstrat, pt.partnatts, pt.partattrs[0]
INTO partstrat, partnatts, keyattnum
FROM pg_catalog.pg_partitioned_table pt
WHERE pt.partrelid = parent;
IF NOT FOUND THEN
RAISE EXCEPTION 'pg_clickhouse: % is not a partitioned table', parent;
END IF;
IF partstrat <> 'r' OR partnatts <> 1 OR keyattnum = 0 THEN
RAISE EXCEPTION
'pg_clickhouse: clickhouse_offload_range supports single-column RANGE partitioning only';
END IF;

SELECT a.attname, pg_catalog.format_type(a.atttypid, a.atttypmod),
n.nspname, c.relname
INTO keycol, keytype, schemaname, parentname
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid AND a.attnum = keyattnum
WHERE c.oid = parent;

IF ch_table IS NULL THEN
ch_table := parentname;
END IF;

-- Lock sources, count them, and collect their bounds. Deriving the range
-- from the catalog rules out splitting a partition or leaving a gap
FOREACH p IN ARRAY old_parts LOOP
IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_inherits
WHERE inhrelid = p AND inhparent = parent) THEN
RAISE EXCEPTION 'pg_clickhouse: % is not a partition of %', p, parent;
END IF;
EXECUTE pg_catalog.format('LOCK TABLE %s IN SHARE MODE', p);
EXECUTE pg_catalog.format('SELECT pg_catalog.count(*) FROM %s', p) INTO n;
local_rows := local_rows + n;

SELECT pg_catalog.pg_get_expr(c.relpartbound, c.oid) INTO bound
FROM pg_catalog.pg_class c WHERE c.oid = p;
m := pg_catalog.regexp_match(bound, '^FOR VALUES FROM \((.+?)\) TO \((.+)\)$');
IF m IS NULL OR m[1] = 'MINVALUE' OR m[2] = 'MAXVALUE' THEN
RAISE EXCEPTION 'pg_clickhouse: % has unsupported bound %', p, bound;
END IF;
rows_csv := rows_csv || CASE WHEN rows_csv = '' THEN '' ELSE ', ' END
|| pg_catalog.format('(%s::%s, %s::%s)', m[1], keytype, m[2], keytype);
END LOOP;

-- Combined bound is min lower to max upper; require the pieces to tile it
-- with no gap (overlap is impossible between partitions of one parent)
EXECUTE pg_catalog.format($q$
WITH b(lo, hi) AS (VALUES %s),
o AS (SELECT lo, hi, pg_catalog.lead(lo) OVER (ORDER BY lo) AS nlo FROM b)
SELECT pg_catalog.min(lo)::text, pg_catalog.max(hi)::text,
coalesce(pg_catalog.bool_and(hi = nlo) FILTER (WHERE nlo IS NOT NULL), true)
FROM o
$q$, rows_csv) INTO from_value, to_value, contiguous;

IF NOT contiguous THEN
RAISE EXCEPTION
'pg_clickhouse: old_parts bounds are not contiguous; they leave a gap';
END IF;

IF newrel IS NULL THEN
newrel := pg_catalog.left(
pg_catalog.regexp_replace(
pg_catalog.format('%s_%s_%s', parentname, from_value, to_value),
'[^a-zA-Z0-9]+', '_', 'g'),
63);
END IF;

-- Stage the destination as a standalone foreign table so the copy lands
-- before it becomes queryable through parent. Inline CHECK matches the
-- partition bound so ATTACH skips its validation scan of the remote table.
-- Columns are spelled out: CREATE FOREIGN TABLE rejects LIKE
SELECT pg_catalog.string_agg(
pg_catalog.format('%I %s%s', a.attname,
pg_catalog.format_type(a.atttypid, a.atttypmod),
CASE WHEN a.attnotnull THEN ' NOT NULL' ELSE '' END),
', ' ORDER BY a.attnum)
INTO coldefs
FROM pg_catalog.pg_attribute a
WHERE a.attrelid = parent AND a.attnum > 0 AND NOT a.attisdropped;

opts := pg_catalog.format('table_name %L', ch_table);
IF table_opts IS NOT NULL AND table_opts <> '' THEN
opts := opts || ', ' || table_opts;
END IF;
EXECUTE pg_catalog.format(
'CREATE FOREIGN TABLE %I.%I (%s, CHECK (%I IS NOT NULL AND %I >= %L AND %I < %L)) SERVER %I OPTIONS (%s)',
schemaname, newrel, coldefs,
keycol, keycol, from_value, keycol, to_value, server, opts);

FOREACH p IN ARRAY old_parts LOOP
EXECUTE pg_catalog.format('INSERT INTO %I.%I SELECT * FROM %s',
schemaname, newrel, p);
END LOOP;

-- Atomic cutover: drop locals, attach the foreign partition in their place
FOREACH p IN ARRAY old_parts LOOP
EXECUTE pg_catalog.format('DROP TABLE %s', p);
END LOOP;
EXECUTE pg_catalog.format(
'ALTER TABLE %s ATTACH PARTITION %I.%I FOR VALUES FROM (%L) TO (%L)',
parent, schemaname, newrel, from_value, to_value);

RETURN local_rows;
END;
$offload$;

-- Mirrors parent's columns, mapping each PostgreSQL type to ClickHouse and
-- wrapping nullable columns in Nullable(); arrays become Array(element) and stay
-- bare since ClickHouse forbids Nullable arrays. Partition key stays non-Nullable
-- since MergeTree ORDER BY rejects Nullable keys. ch_table defaults to parent's
-- name and lands in server's database, same target clickhouse_offload_range
-- attaches. Connection is taken from server, honoring its driver. Returns the
-- executed DDL.
CREATE FUNCTION clickhouse_offload_create_table(
parent regclass,
server name,
ch_table text DEFAULT NULL,
order_by text DEFAULT NULL,
engine text DEFAULT 'MergeTree'
) RETURNS text
LANGUAGE plpgsql AS $create$
DECLARE
partstrat "char";
partnatts int;
keyattnum int;
keycol name;
parentname name;
coldefs text;
badcol text;
ddl text;
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_catalog.pg_foreign_server WHERE srvname = server) THEN
RAISE EXCEPTION 'pg_clickhouse: server % does not exist', server;
END IF;

SELECT pt.partstrat, pt.partnatts, pt.partattrs[0]
INTO partstrat, partnatts, keyattnum
FROM pg_catalog.pg_partitioned_table pt
WHERE pt.partrelid = parent;
IF NOT FOUND THEN
RAISE EXCEPTION 'pg_clickhouse: % is not a partitioned table', parent;
END IF;
IF partstrat <> 'r' OR partnatts <> 1 OR keyattnum = 0 THEN
RAISE EXCEPTION
'pg_clickhouse: clickhouse_offload_create_table supports single-column RANGE partitioning only';
END IF;

SELECT c.relname, a.attname
INTO parentname, keycol
FROM pg_catalog.pg_class c
JOIN pg_catalog.pg_attribute a ON a.attrelid = c.oid AND a.attnum = keyattnum
WHERE c.oid = parent;

IF ch_table IS NULL THEN
ch_table := parentname;
END IF;
IF order_by IS NULL THEN
order_by := pg_catalog.quote_ident(keycol);
END IF;

-- Invert type map (see str_types_map in pglink.c). For array columns map the
-- element type and wrap in Array(); et is the element type, NULL for scalars.
-- chtype is the scalar/element ClickHouse type, NULL when unmapped
SELECT pg_catalog.string_agg(
pg_catalog.format('%s %s', pg_catalog.quote_ident(a.attname),
CASE
-- ClickHouse forbids Nullable(Array(...)); arrays stay bare
WHEN et.oid IS NOT NULL THEN pg_catalog.format('Array(%s)', chtype)
WHEN a.attnotnull OR a.attnum = keyattnum THEN chtype
ELSE pg_catalog.format('Nullable(%s)', chtype)
END),
', ' ORDER BY a.attnum) FILTER (WHERE chtype IS NOT NULL),
pg_catalog.min(pg_catalog.format('%I %s', a.attname,
pg_catalog.format_type(a.atttypid, a.atttypmod)))
FILTER (WHERE chtype IS NULL)
INTO coldefs, badcol
FROM pg_catalog.pg_attribute a
JOIN pg_catalog.pg_type t ON t.oid = a.atttypid
LEFT JOIN pg_catalog.pg_type et ON et.oid = t.typelem AND t.typcategory = 'A'
CROSS JOIN LATERAL (SELECT CASE coalesce(et.typname, t.typname)
WHEN 'bool' THEN 'Bool'
WHEN 'int2' THEN 'Int16'
WHEN 'int4' THEN 'Int32'
WHEN 'int8' THEN 'Int64'
WHEN 'float4' THEN 'Float32'
WHEN 'float8' THEN 'Float64'
WHEN 'numeric' THEN CASE WHEN a.atttypmod = -1 THEN NULL
ELSE pg_catalog.format('Decimal(%s, %s)',
((a.atttypmod - 4) >> 16) & 65535,
(a.atttypmod - 4) & 65535) END
WHEN 'date' THEN 'Date32'
WHEN 'timestamp' THEN 'DateTime64(6)'
WHEN 'timestamptz' THEN $$DateTime64(6, 'UTC')$$
WHEN 'uuid' THEN 'UUID'
WHEN 'text' THEN 'String'
WHEN 'varchar' THEN 'String'
WHEN 'bpchar' THEN 'String'
WHEN 'bytea' THEN 'String'
WHEN 'json' THEN 'String'
WHEN 'jsonb' THEN 'String'
END) AS m(chtype)
WHERE a.attrelid = parent AND a.attnum > 0 AND NOT a.attisdropped;

IF badcol IS NOT NULL THEN
RAISE EXCEPTION 'pg_clickhouse: cannot map column % to a ClickHouse type', badcol;
END IF;

ddl := pg_catalog.format('CREATE TABLE %s (%s) ENGINE = %s ORDER BY %s',
ch_table, coldefs, engine, order_by);
-- DDL yields no rows; column list is a formality clickhouse_query requires
PERFORM * FROM clickhouse_query(server, ddl) AS (ddl_result text);
RETURN ddl;
END;
$create$;
81 changes: 81 additions & 0 deletions doc/pg_clickhouse.md
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,83 @@ try=# EXPLAIN (ANALYZE, VERBOSE)
the number of rows that must be pulled back into Postgres from 1000 (all of
them) to just 8, one for each node.

### Partitioned Tables

A PostgreSQL [partitioned table] can mix local partitions with foreign
partitions backed by ClickHouse. A common layout offloads older data to
ClickHouse while recent data stays in PostgreSQL:

```pgsql
CREATE TABLE events (id int, ts date, val int, amt float8)
PARTITION BY RANGE (ts);

-- 2023 data lives on ClickHouse
CREATE FOREIGN TABLE events_2023 PARTITION OF events
FOR VALUES FROM ('2023-01-01') TO ('2024-01-01')
SERVER ch_svr OPTIONS (table_name 'events');

-- 2024 data stays local
CREATE TABLE events_2024 PARTITION OF events
FOR VALUES FROM ('2024-01-01') TO ('2025-01-01');
```

For example on how to move data from local to foreign partitions, see
[offload-partition.sql](offload-partition.sql).

Aggregates spanning both local and foreign partitions need [partitionwise
aggregation], which PostgreSQL disables by default:

```pgsql
SET enable_partitionwise_aggregate = on;
```

With `enable_partitionwise_aggregate` enabled, PostgreSQL computes a *partial
aggregate* below `Append`, then a *finalize aggregate* above combines those
partials into result. pg_clickhouse pushes the foreign partition's partial
down to ClickHouse:

```pgsql
try=# EXPLAIN (VERBOSE, COSTS OFF)
SELECT count(*), sum(val), min(ts), max(ts) FROM events;
QUERY PLAN
-------------------------------------------------------------------------------------------------------------------------
Finalize Aggregate
Output: count(*), sum(events.val), min(events.ts), max(events.ts)
-> Append
-> Foreign Scan
Output: (PARTIAL count(*)), (PARTIAL sum(events.val)), (PARTIAL min(events.ts)), (PARTIAL max(events.ts))
Relations: Aggregate on (events_2023 events)
Remote SQL: SELECT count(*), sum(val), min(ts), max(ts) FROM "default".events
-> Partial Aggregate
Output: PARTIAL count(*), PARTIAL sum(events_1.val), PARTIAL min(events_1.ts), PARTIAL max(events_1.ts)
-> Seq Scan on public.events_2024 events_1
Output: events_1.val, events_1.ts
```

#### When partial aggregates push down

PostgreSQL represents a partial aggregate as a *transition state* that the
finalize step combines across partitions. pg_clickhouse can push a partition's
partial down only when it can express it as a ClickHouse value:

* **Decomposable aggregates** whose transition state is already the final
value, push down directly: `count`, `sum`, `min`, `max`, `bool_and`/`every`,
`bool_or`, `bit_and`, `bit_or`, and `bit_xor`.
* **`avg` over integers** pushes its `{count, sum}` state as an array.
* **`avg`, `var_pop`, `var_samp`, `stddev_pop`, and `stddev_samp` over
floating point** push their `{N, sum, sum of squared deviations}` state as
an array.

`FILTER (WHERE …)` pushes down with these aggregate functions.

#### When they fall back

Aggregates whose transition state is PostgreSQL's opaque `internal` type have
no portable representation, so the foreign partition instead fetches its rows
and aggregates them locally. This covers anything over `numeric`, plus
`avg(bigint)` and `avg(interval)`. `DISTINCT`, ordered-set, and variadic
aggregates also fall back.

### PREPARE, EXECUTE, DEALLOCATE

As of v0.1.2, pg_clickhouse supports parameterized queries, mainly created
Expand Down Expand Up @@ -1586,6 +1663,10 @@ Copyright (c) 2025-2026, ClickHouse.
"PostgreSQL Docs: EXPLAIN"
[SELECT]: https://www.postgresql.org/docs/current/sql-select.html
"PostgreSQL Docs: SELECT"
[partitioned table]: https://www.postgresql.org/docs/current/ddl-partitioning.html
"PostgreSQL Docs: Table Partitioning"
[partitionwise aggregation]: https://www.postgresql.org/docs/current/runtime-config-query.html#GUC-ENABLE-PARTITIONWISE-AGGREGATE
"PostgreSQL Docs: enable_partitionwise_aggregate"
[PREPARE]: https://www.postgresql.org/docs/current/sql-prepare.html
"PostgreSQL Docs: PREPARE"
[EXECUTE]: https://www.postgresql.org/docs/current/sql-execute.html
Expand Down
Loading
Loading