From 5bc4d5d51b6394763737b0f1611a1fa21043887c Mon Sep 17 00:00:00 2001 From: shaoyijie Date: Mon, 18 May 2026 01:30:37 -0700 Subject: [PATCH 1/2] feat(python): expose register_table_function for Paimon UDTFs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `SQLContext.register_table_function(name, default_database=None)` to the Python binding so Paimon table-valued functions can be registered from Python — the binding previously had no way to reach `register_udtf`. A single dispatch method keeps the API surface stable: it currently supports `vector_search` and `full_text_search`, and the same `match` will pick up `referenced_files_size` / `physical_files_size` once those land, without changing the Python signature. The function binds to the current catalog. So the binding can obtain that catalog without keeping a duplicate handle of its own, `SQLContext::current_catalog` is made public. The binding also enables the `fulltext` feature so `register_full_text_search` is available. Co-Authored-By: Claude Opus 4.7 --- bindings/python/Cargo.toml | 2 +- bindings/python/src/context.rs | 36 ++++++++++++++++++- .../datafusion/src/sql_context.rs | 7 +++- 3 files changed, 42 insertions(+), 3 deletions(-) diff --git a/bindings/python/Cargo.toml b/bindings/python/Cargo.toml index 6ed24065..0c3b487c 100644 --- a/bindings/python/Cargo.toml +++ b/bindings/python/Cargo.toml @@ -31,6 +31,6 @@ arrow = { workspace = true, features = ["pyarrow"] } datafusion = { workspace = true } datafusion-ffi = { workspace = true } paimon = { path = "../../crates/paimon", features = ["storage-all"] } -paimon-datafusion = { path = "../../crates/integrations/datafusion" } +paimon-datafusion = { path = "../../crates/integrations/datafusion", features = ["fulltext"] } pyo3 = { version = "0.28", features = ["abi3-py310"] } tokio = { workspace = true } diff --git a/bindings/python/src/context.rs b/bindings/python/src/context.rs index e1050d38..1b7e3250 100644 --- a/bindings/python/src/context.rs +++ b/bindings/python/src/context.rs @@ -23,7 +23,10 @@ use datafusion::catalog::CatalogProvider; use datafusion_ffi::catalog_provider::FFI_CatalogProvider; use datafusion_ffi::proto::logical_extension_codec::FFI_LogicalExtensionCodec; use paimon::{CatalogFactory, Options}; -use paimon_datafusion::{PaimonCatalogProvider, SQLContext}; +use paimon_datafusion::{ + register_full_text_search, register_vector_search, PaimonCatalogProvider, SQLContext, +}; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::types::PyCapsule; @@ -148,6 +151,37 @@ impl PySQLContext { .map_err(df_to_py_err) } + /// Registers a built-in Paimon table-valued function (UDTF) on the session + /// so it can be used in SQL, e.g. + /// `SELECT * FROM vector_search('items', 'embedding', '[1.0, 0.0]', 10)`. + /// + /// `name` selects the function; supported values are `vector_search` and + /// `full_text_search`. The function is bound to the current catalog, so a + /// catalog must already be registered (the first `register_catalog` call + /// also sets it current). `default_database` defaults to `"default"` and + /// resolves the table-name argument the function receives in SQL. + #[pyo3(signature = (name, default_database=None))] + fn register_table_function( + &self, + name: String, + default_database: Option, + ) -> PyResult<()> { + let catalog = self.inner.current_catalog().map_err(df_to_py_err)?; + let default_database = default_database.as_deref().unwrap_or("default"); + let ctx = self.inner.ctx(); + match name.as_str() { + "vector_search" => register_vector_search(ctx, catalog, default_database), + "full_text_search" => register_full_text_search(ctx, catalog, default_database), + other => { + return Err(PyValueError::new_err(format!( + "unknown table function '{other}'; \ + supported: 'vector_search', 'full_text_search'" + ))) + } + } + Ok(()) + } + fn sql(&self, py: Python<'_>, sql: String) -> PyResult>> { let rt = runtime(); let batches = rt.block_on(async { diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index b54f443d..141832b5 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -1220,7 +1220,12 @@ impl SQLContext { .clone() } - fn current_catalog(&self) -> DFResult> { + /// Returns the Paimon catalog currently set as default. + /// + /// Exposed so callers that need the registered [`Catalog`] (for example to + /// register a table-valued function against it) can retrieve it without + /// keeping a duplicate handle of their own. + pub fn current_catalog(&self) -> DFResult> { let name = self.current_catalog_name(); self.catalogs.get(&name).cloned().ok_or_else(|| { DataFusionError::Plan( From c261b80a43d7127f741f0486c2988429e2b7590e Mon Sep 17 00:00:00 2001 From: shaoyijie Date: Mon, 18 May 2026 01:45:20 -0700 Subject: [PATCH 2/2] test(python): cover register_table_function Add tests for `SQLContext.register_table_function`: - vector_search / full_text_search register without error - the optional default_database keyword is accepted - an unknown function name raises a clear error - calling it before any catalog is registered raises Registration alone touches neither the Lumina nor Tantivy runtime, so these tests are deterministic and need no index fixtures. Co-Authored-By: Claude Opus 4.7 --- bindings/python/tests/test_datafusion.py | 49 ++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/bindings/python/tests/test_datafusion.py b/bindings/python/tests/test_datafusion.py index 5e4e5e99..5ddc9e29 100644 --- a/bindings/python/tests/test_datafusion.py +++ b/bindings/python/tests/test_datafusion.py @@ -177,3 +177,52 @@ def test_register_batch_invalid_catalog(): assert False, "Expected an error for unknown catalog" except Exception as e: assert "unknown_catalog" in str(e).lower() or "not a paimon" in str(e).lower() or "unknown" in str(e).lower() + + +def test_register_table_function_vector_search(): + with tempfile.TemporaryDirectory() as warehouse: + ctx = SQLContext() + ctx.register_catalog("paimon", {"warehouse": warehouse}) + + # Registering against the current catalog should not raise. + ctx.register_table_function("vector_search") + + +def test_register_table_function_full_text_search(): + with tempfile.TemporaryDirectory() as warehouse: + ctx = SQLContext() + ctx.register_catalog("paimon", {"warehouse": warehouse}) + + ctx.register_table_function("full_text_search") + + +def test_register_table_function_with_default_database(): + with tempfile.TemporaryDirectory() as warehouse: + ctx = SQLContext() + ctx.register_catalog("paimon", {"warehouse": warehouse}) + + # The optional default_database keyword is accepted. + ctx.register_table_function("vector_search", default_database="default") + + +def test_register_table_function_unknown_name(): + with tempfile.TemporaryDirectory() as warehouse: + ctx = SQLContext() + ctx.register_catalog("paimon", {"warehouse": warehouse}) + + try: + ctx.register_table_function("does_not_exist") + assert False, "Expected an error for an unknown table function" + except Exception as e: + assert "unknown table function" in str(e).lower() + assert "does_not_exist" in str(e) + + +def test_register_table_function_without_catalog(): + # With no catalog registered there is no current catalog to bind to. + ctx = SQLContext() + try: + ctx.register_table_function("vector_search") + assert False, "Expected an error when no catalog is registered" + except Exception as e: + assert "catalog" in str(e).lower()