diff --git a/.gitignore b/.gitignore index 64007203..61c09050 100644 --- a/.gitignore +++ b/.gitignore @@ -20,5 +20,6 @@ .idea .vscode **/.DS_Store -dist/* +**/dist/ +docs/site/ .qoder diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 47f1bab9..1c830177 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -43,8 +43,10 @@ mod filter_pushdown; #[cfg(feature = "fulltext")] mod full_text_search; mod merge_into; +mod physical_files_size; mod physical_plan; mod procedures; +mod referenced_files_size; mod relation_planner; pub mod runtime; mod sql_context; @@ -67,7 +69,9 @@ pub use catalog::{PaimonCatalogProvider, PaimonSchemaProvider}; pub use error::to_datafusion_error; #[cfg(feature = "fulltext")] pub use full_text_search::{register_full_text_search, FullTextSearchFunction}; +pub use physical_files_size::{register_physical_files_size, PhysicalFilesSizeFunction}; pub use physical_plan::PaimonTableScan; +pub use referenced_files_size::{register_referenced_files_size, ReferencedFilesSizeFunction}; pub use relation_planner::PaimonRelationPlanner; pub use sql_context::SQLContext; pub use table::PaimonTableProvider; diff --git a/crates/integrations/datafusion/src/physical_files_size.rs b/crates/integrations/datafusion/src/physical_files_size.rs new file mode 100644 index 00000000..14c3af41 --- /dev/null +++ b/crates/integrations/datafusion/src/physical_files_size.rs @@ -0,0 +1,175 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Table function that computes the total physical file sizes in the table directory. +//! +//! Usage: `SELECT * FROM physical_files_size('db.table_name')` + +use std::any::Any; +use std::fmt::Debug; +use std::sync::{Arc, OnceLock}; + +use async_trait::async_trait; +use datafusion::arrow::array::{Int64Array, RecordBatch}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::catalog::Session; +use datafusion::catalog::TableFunctionImpl; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use paimon::catalog::Catalog; +use paimon::table::referenced_files::{collect_physical_files_summary, PhysicalFilesSummary}; +use paimon::table::Table; + +use crate::error::to_datafusion_error; +use crate::runtime::{await_with_runtime, block_on_with_runtime}; +use crate::table_function_args::{extract_string_literal, parse_table_identifier}; + +const FUNCTION_NAME: &str = "physical_files_size"; + +pub fn register_physical_files_size( + ctx: &SessionContext, + catalog: Arc, + default_database: &str, +) { + ctx.register_udtf( + FUNCTION_NAME, + Arc::new(PhysicalFilesSizeFunction::new(catalog, default_database)), + ); +} + +pub struct PhysicalFilesSizeFunction { + catalog: Arc, + default_database: String, +} + +impl Debug for PhysicalFilesSizeFunction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PhysicalFilesSizeFunction") + .field("default_database", &self.default_database) + .finish() + } +} + +impl PhysicalFilesSizeFunction { + pub fn new(catalog: Arc, default_database: &str) -> Self { + Self { + catalog, + default_database: default_database.to_string(), + } + } +} + +impl TableFunctionImpl for PhysicalFilesSizeFunction { + fn call(&self, args: &[Expr]) -> DFResult> { + if args.len() != 1 { + return Err(datafusion::error::DataFusionError::Plan( + "physical_files_size requires 1 argument: (table_name)".to_string(), + )); + } + + let table_name = extract_string_literal(FUNCTION_NAME, &args[0], "table_name")?; + let identifier = + parse_table_identifier(FUNCTION_NAME, &table_name, &self.default_database)?; + + let catalog = Arc::clone(&self.catalog); + let table = block_on_with_runtime( + async move { catalog.get_table(&identifier).await }, + "physical_files_size: catalog access thread panicked", + ) + .map_err(to_datafusion_error)?; + + Ok(Arc::new(PhysicalFilesSizeTableProvider { table })) + } +} + +fn output_schema() -> SchemaRef { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA + .get_or_init(|| { + Arc::new(Schema::new(vec![ + Field::new("manifest_file_count", DataType::Int64, false), + Field::new("manifest_file_size", DataType::Int64, false), + Field::new("data_file_count", DataType::Int64, false), + Field::new("data_file_size", DataType::Int64, false), + Field::new("index_file_count", DataType::Int64, false), + Field::new("index_file_size", DataType::Int64, false), + ])) + }) + .clone() +} + +#[derive(Debug)] +struct PhysicalFilesSizeTableProvider { + table: Table, +} + +#[async_trait] +impl TableProvider for PhysicalFilesSizeTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + output_schema() + } + + fn table_type(&self) -> TableType { + TableType::View + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let table = self.table.clone(); + let summary = await_with_runtime(async move { + collect_physical_files_summary(table.file_io(), table.location()).await + }) + .await + .map_err(to_datafusion_error)?; + + let batch = summary_to_record_batch(&summary)?; + let schema = output_schema(); + + Ok(MemorySourceConfig::try_new_exec( + &[vec![batch]], + schema, + projection.cloned(), + )?) + } +} + +fn summary_to_record_batch(s: &PhysicalFilesSummary) -> DFResult { + Ok(RecordBatch::try_new( + output_schema(), + vec![ + Arc::new(Int64Array::from(vec![s.manifest_file_count])), + Arc::new(Int64Array::from(vec![s.manifest_file_size])), + Arc::new(Int64Array::from(vec![s.data_file_count])), + Arc::new(Int64Array::from(vec![s.data_file_size])), + Arc::new(Int64Array::from(vec![s.index_file_count])), + Arc::new(Int64Array::from(vec![s.index_file_size])), + ], + )?) +} diff --git a/crates/integrations/datafusion/src/referenced_files_size.rs b/crates/integrations/datafusion/src/referenced_files_size.rs new file mode 100644 index 00000000..387a538f --- /dev/null +++ b/crates/integrations/datafusion/src/referenced_files_size.rs @@ -0,0 +1,205 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Table function that computes per-snapshot referenced file size summaries. +//! +//! Usage: `SELECT * FROM referenced_files_size('db.table_name')` + +use std::any::Any; +use std::fmt::Debug; +use std::sync::{Arc, OnceLock}; + +use async_trait::async_trait; +use datafusion::arrow::array::{Int64Array, RecordBatch, StringArray}; +use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef}; +use datafusion::catalog::Session; +use datafusion::catalog::TableFunctionImpl; +use datafusion::datasource::memory::MemorySourceConfig; +use datafusion::datasource::{TableProvider, TableType}; +use datafusion::error::Result as DFResult; +use datafusion::logical_expr::Expr; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::SessionContext; +use paimon::catalog::Catalog; +use paimon::table::referenced_files::{collect_referenced_files_summary, ReferencedFilesSummary}; +use paimon::table::Table; + +use crate::error::to_datafusion_error; +use crate::runtime::{await_with_runtime, block_on_with_runtime}; +use crate::table_function_args::{extract_string_literal, parse_table_identifier}; + +const FUNCTION_NAME: &str = "referenced_files_size"; + +pub fn register_referenced_files_size( + ctx: &SessionContext, + catalog: Arc, + default_database: &str, +) { + ctx.register_udtf( + FUNCTION_NAME, + Arc::new(ReferencedFilesSizeFunction::new(catalog, default_database)), + ); +} + +pub struct ReferencedFilesSizeFunction { + catalog: Arc, + default_database: String, +} + +impl Debug for ReferencedFilesSizeFunction { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ReferencedFilesSizeFunction") + .field("default_database", &self.default_database) + .finish() + } +} + +impl ReferencedFilesSizeFunction { + pub fn new(catalog: Arc, default_database: &str) -> Self { + Self { + catalog, + default_database: default_database.to_string(), + } + } +} + +impl TableFunctionImpl for ReferencedFilesSizeFunction { + fn call(&self, args: &[Expr]) -> DFResult> { + if args.len() != 1 { + return Err(datafusion::error::DataFusionError::Plan( + "referenced_files_size requires 1 argument: (table_name)".to_string(), + )); + } + + let table_name = extract_string_literal(FUNCTION_NAME, &args[0], "table_name")?; + let identifier = + parse_table_identifier(FUNCTION_NAME, &table_name, &self.default_database)?; + + let catalog = Arc::clone(&self.catalog); + let table = block_on_with_runtime( + async move { catalog.get_table(&identifier).await }, + "referenced_files_size: catalog access thread panicked", + ) + .map_err(to_datafusion_error)?; + + Ok(Arc::new(ReferencedFilesSizeTableProvider { table })) + } +} + +fn output_schema() -> SchemaRef { + static SCHEMA: OnceLock = OnceLock::new(); + SCHEMA + .get_or_init(|| { + Arc::new(Schema::new(vec![ + Field::new("source", DataType::Utf8, false), + Field::new("manifest_file_count", DataType::Int64, false), + Field::new("manifest_file_size", DataType::Int64, false), + Field::new("data_file_count", DataType::Int64, false), + Field::new("data_file_size", DataType::Int64, false), + Field::new("index_file_count", DataType::Int64, false), + Field::new("index_file_size", DataType::Int64, false), + ])) + }) + .clone() +} + +#[derive(Debug)] +struct ReferencedFilesSizeTableProvider { + table: Table, +} + +#[async_trait] +impl TableProvider for ReferencedFilesSizeTableProvider { + fn as_any(&self) -> &dyn Any { + self + } + + fn schema(&self) -> SchemaRef { + output_schema() + } + + fn table_type(&self) -> TableType { + TableType::View + } + + async fn scan( + &self, + _state: &dyn Session, + projection: Option<&Vec>, + _filters: &[Expr], + _limit: Option, + ) -> DFResult> { + let table = self.table.clone(); + let summaries = await_with_runtime(async move { + let schema = table.schema(); + let partition_keys = schema.partition_keys(); + let partition_fields = schema.partition_fields(); + collect_referenced_files_summary( + table.file_io(), + table.location(), + partition_keys, + &partition_fields, + ) + .await + }) + .await + .map_err(to_datafusion_error)?; + + let batch = summaries_to_record_batch(&summaries)?; + let schema = output_schema(); + + Ok(MemorySourceConfig::try_new_exec( + &[vec![batch]], + schema, + projection.cloned(), + )?) + } +} + +fn summaries_to_record_batch(summaries: &[ReferencedFilesSummary]) -> DFResult { + let n = summaries.len(); + let mut sources = Vec::with_capacity(n); + let mut mf_counts = Vec::with_capacity(n); + let mut mf_sizes = Vec::with_capacity(n); + let mut df_counts = Vec::with_capacity(n); + let mut df_sizes = Vec::with_capacity(n); + let mut if_counts = Vec::with_capacity(n); + let mut if_sizes = Vec::with_capacity(n); + + for s in summaries { + sources.push(s.source.as_str()); + mf_counts.push(s.manifest_file_count); + mf_sizes.push(s.manifest_file_size); + df_counts.push(s.data_file_count); + df_sizes.push(s.data_file_size); + if_counts.push(s.index_file_count); + if_sizes.push(s.index_file_size); + } + + Ok(RecordBatch::try_new( + output_schema(), + vec![ + Arc::new(StringArray::from(sources)), + Arc::new(Int64Array::from(mf_counts)), + Arc::new(Int64Array::from(mf_sizes)), + Arc::new(Int64Array::from(df_counts)), + Arc::new(Int64Array::from(df_sizes)), + Arc::new(Int64Array::from(if_counts)), + Arc::new(Int64Array::from(if_sizes)), + ], + )?) +} diff --git a/crates/integrations/datafusion/src/sql_context.rs b/crates/integrations/datafusion/src/sql_context.rs index b54f443d..973e263f 100644 --- a/crates/integrations/datafusion/src/sql_context.rs +++ b/crates/integrations/datafusion/src/sql_context.rs @@ -506,7 +506,7 @@ impl SQLContext { // Sort replacements by position (descending) so that replacements // from right to left don't shift indices of earlier ones - replacements.sort_by(|a, b| b.0 .0.cmp(&a.0 .0)); + replacements.sort_by_key(|r| std::cmp::Reverse(r.0 .0)); // Build the rewritten SQL by replacing each clause from right to left let mut rewritten_sql = sql.to_string(); diff --git a/crates/paimon/src/io/file_io.rs b/crates/paimon/src/io/file_io.rs index eef5dd65..7e78004c 100644 --- a/crates/paimon/src/io/file_io.rs +++ b/crates/paimon/src/io/file_io.rs @@ -158,6 +158,44 @@ impl FileIO { Ok(statuses) } + /// List all files recursively under the given directory path. + pub async fn list_status_recursive(&self, path: &str) -> Result> { + let (op, relative_path) = self.storage.create(path)?; + let base_path = &path[..path.len() - relative_path.len()]; + let list_path = normalize_root(relative_path); + + let entries = + op.list_with(&list_path) + .recursive(true) + .await + .context(IoUnexpectedSnafu { + message: format!("Failed to list files recursively in '{path}'"), + })?; + + let mut statuses = Vec::new(); + let list_path_normalized = list_path.trim_start_matches('/'); + for entry in entries { + let entry_path = entry.path(); + if entry_path.trim_start_matches('/') == list_path_normalized { + continue; + } + let meta = entry.metadata(); + if meta.is_dir() { + continue; + } + statuses.push(FileStatus { + size: meta.content_length(), + is_dir: false, + path: format!("{base_path}{entry_path}"), + last_modified: meta + .last_modified() + .map(|v| DateTime::::from(SystemTime::from(v))), + }); + } + + Ok(statuses) + } + /// Check if exists. /// /// References: diff --git a/crates/paimon/src/spec/index_manifest.rs b/crates/paimon/src/spec/index_manifest.rs index 25994437..cdafc522 100644 --- a/crates/paimon/src/spec/index_manifest.rs +++ b/crates/paimon/src/spec/index_manifest.rs @@ -122,6 +122,18 @@ impl IndexManifest { Self::read_from_bytes(&content) } + /// Read index manifest entries and return their byte size. + pub async fn read_with_size( + file_io: &FileIO, + path: &str, + ) -> Result<(Vec, i64)> { + let input_file = file_io.new_input(path)?; + let content = input_file.read().await?; + let size = content.len() as i64; + let entries = Self::read_from_bytes(&content)?; + Ok((entries, size)) + } + /// Read index manifest entries from Avro-encoded bytes. pub fn read_from_bytes(bytes: &[u8]) -> Result> { crate::spec::avro::from_avro_bytes_fast(bytes) diff --git a/crates/paimon/src/table/mod.rs b/crates/paimon/src/table/mod.rs index c84d7c8c..c00fa786 100644 --- a/crates/paimon/src/table/mod.rs +++ b/crates/paimon/src/table/mod.rs @@ -40,6 +40,7 @@ mod kv_file_writer; mod partition_filter; mod postpone_file_writer; mod read_builder; +pub mod referenced_files; pub(crate) mod rest_env; pub(crate) mod row_id_predicate; pub(crate) mod schema_manager; diff --git a/crates/paimon/src/table/referenced_files.rs b/crates/paimon/src/table/referenced_files.rs new file mode 100644 index 00000000..2c41c675 --- /dev/null +++ b/crates/paimon/src/table/referenced_files.rs @@ -0,0 +1,920 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Collect deduplicated referenced file size summaries for all snapshots of a table. +//! +//! Reference: [LocalOrphanFilesClean](https://github.com/apache/paimon/blob/master/paimon-core/src/main/java/org/apache/paimon/operation/LocalOrphanFilesClean.java) + +use std::collections::HashMap; +use std::sync::Mutex; + +use crate::io::FileIO; +use crate::spec::{ + bucket_dir_name, BinaryRow, DataField, IndexManifest, Manifest, ManifestEntry, + ManifestFileMeta, PartitionComputer, +}; +use crate::table::{BranchManager, SnapshotManager, TagManager}; +use futures::future::try_join_all; +use futures::stream::{self, StreamExt, TryStreamExt}; + +/// Per-scope aggregated summary of referenced files (deduplicated). +/// +/// Each row represents the unique referenced files for a scope: +/// - `"total"`: all snapshots across all branches and tags +/// - `"branch:main"`: main branch snapshots + tags +/// - `"branch:"`: a specific branch +/// +/// Files are deduplicated by file name within each scope, so the sum +/// represents actual disk usage that is still referenced (protected from cleanup). +/// Both ADD and DELETE manifest entries are included since both reference +/// physical files that cannot be removed until the snapshot expires. +#[derive(Debug, Clone, Default)] +pub struct ReferencedFilesSummary { + pub source: String, + pub manifest_file_count: i64, + pub manifest_file_size: i64, + pub data_file_count: i64, + pub data_file_size: i64, + pub index_file_count: i64, + pub index_file_size: i64, +} + +/// Deduplicated file set for a scope, keyed by file name. +#[derive(Default)] +struct ScopeFileSet { + manifest_files: HashMap, + data_files: HashMap, + index_files: HashMap, +} + +impl ScopeFileSet { + fn to_summary(&self, source: &str) -> ReferencedFilesSummary { + ReferencedFilesSummary { + source: source.to_string(), + manifest_file_count: self.manifest_files.len() as i64, + manifest_file_size: self.manifest_files.values().sum(), + data_file_count: self.data_files.len() as i64, + data_file_size: self.data_files.values().sum(), + index_file_count: self.index_files.len() as i64, + index_file_size: self.index_files.values().sum(), + } + } + + fn merge(&mut self, other: &ScopeFileSet) { + for (k, v) in &other.manifest_files { + self.manifest_files.entry(k.clone()).or_insert(*v); + } + for (k, v) in &other.data_files { + self.data_files.entry(k.clone()).or_insert(*v); + } + for (k, v) in &other.index_files { + self.index_files.entry(k.clone()).or_insert(*v); + } + } +} + +const SNAPSHOT_CONCURRENCY: usize = 32; + +/// Cached data file entries (file_name, file_size) per manifest file full path. +type ManifestCache = Mutex>>; + +/// Resolves extra file paths for stat-ing their real sizes. +struct ExtraFileResolver { + table_location: String, + partition_computer: Option, +} + +impl ExtraFileResolver { + fn new(table_location: &str, partition_keys: &[String], schema_fields: &[DataField]) -> Self { + let partition_computer = if partition_keys.is_empty() { + None + } else { + PartitionComputer::new( + partition_keys, + schema_fields, + "__DEFAULT_PARTITION__", + false, + ) + .ok() + }; + Self { + table_location: table_location.to_string(), + partition_computer, + } + } + + fn resolve_extra_file_path( + &self, + partition_bytes: &[u8], + bucket: i32, + extra_file_name: &str, + external_path: Option<&str>, + ) -> Option { + if let Some(ext_path) = external_path { + let dir = ext_path.trim_end_matches('/'); + return Some(format!("{}/{}", dir, extra_file_name)); + } + let partition_path = if let Some(ref computer) = self.partition_computer { + let row = BinaryRow::from_serialized_bytes(partition_bytes).ok()?; + computer.generate_partition_path(&row).ok()? + } else { + String::new() + }; + let bucket_dir = bucket_dir_name(bucket); + Some(format!( + "{}/{}{}/{}", + self.table_location, partition_path, bucket_dir, extra_file_name + )) + } +} + +/// Collect per-scope deduplicated referenced file size summaries for a table. +/// +/// Returns rows: +/// 1. `"total"` — union of all referenced files from main branch, tags, and branches +/// 2. `"branch:main"` — main branch snapshots + tag snapshots +/// 3. `"branch:"` — one row per branch +/// +/// Snapshots are processed concurrently (up to 32 at a time). Within each +/// snapshot, manifest list and manifest file reads are also concurrent. +/// A shared cache avoids re-reading the same manifest file across snapshots. +/// +/// Files are deduplicated by name within each scope to produce an accurate +/// count of unique referenced files. Both ADD and DELETE entries are included +/// since both reference physical files protected from cleanup. +/// +/// Manifest list files and index manifest files are counted as manifest files, +/// consistent with `physical_files_size` classification. +/// +/// Extra files referenced by data file entries are stat-ed to obtain their +/// real sizes, using partition/bucket info to construct full paths. +pub async fn collect_referenced_files_summary( + file_io: &FileIO, + table_location: &str, + partition_keys: &[String], + schema_fields: &[DataField], +) -> crate::Result> { + let manifest_cache: ManifestCache = Mutex::new(HashMap::new()); + let manifest_cache_ref = &manifest_cache; + let extra_resolver = ExtraFileResolver::new(table_location, partition_keys, schema_fields); + let extra_resolver_ref = &extra_resolver; + + let sm = SnapshotManager::new(file_io.clone(), table_location.to_string()); + let tm = TagManager::new(file_io.clone(), table_location.to_string()); + + // 1. Main branch snapshots + tags (concurrently) + // For main branch, snapshot reading and manifest resolution both use root SM. + let (main_files, tag_files) = tokio::try_join!( + collect_scope_files(file_io, &sm, &sm, manifest_cache_ref, extra_resolver_ref), + collect_tag_files( + file_io, + &sm, + &sm, + &tm, + manifest_cache_ref, + extra_resolver_ref + ), + )?; + let mut main_files = main_files; + main_files.merge(&tag_files); + + // 2. Branch file sets (all branches concurrently) + let bm = BranchManager::new(file_io.clone(), table_location.to_string()); + let branch_names = bm.list_all().await?; + + let sm_ref = &sm; + let branch_futures: Vec<_> = branch_names + .iter() + .map(|branch_name| { + let branch_sm = sm.with_branch(branch_name); + let branch_tm = tm.with_branch(branch_name); + async move { + // Branch SM reads snapshot/tag files from branch path, + // but manifest paths are always resolved from the table root. + let (mut branch_files, branch_tag_files) = tokio::try_join!( + collect_scope_files( + file_io, + &branch_sm, + sm_ref, + manifest_cache_ref, + extra_resolver_ref + ), + collect_tag_files( + file_io, + &branch_sm, + sm_ref, + &branch_tm, + manifest_cache_ref, + extra_resolver_ref + ), + )?; + branch_files.merge(&branch_tag_files); + Ok::<_, crate::Error>(branch_files) + } + }) + .collect(); + let branch_results = try_join_all(branch_futures).await?; + + // 3. Assemble output: total, main, branches + let mut total_files = ScopeFileSet::default(); + total_files.merge(&main_files); + for bs in &branch_results { + total_files.merge(bs); + } + + let mut result = vec![ + total_files.to_summary("total"), + main_files.to_summary("branch:main"), + ]; + for (name, files) in branch_names.iter().zip(&branch_results) { + result.push(files.to_summary(&format!("branch:{name}"))); + } + Ok(result) +} + +async fn collect_scope_files( + file_io: &FileIO, + sm: &SnapshotManager, + manifest_sm: &SnapshotManager, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result { + let snapshot_ids = sm.list_all_ids().await?; + + let per_snapshot: Vec> = stream::iter(snapshot_ids) + .map(|snapshot_id| { + let sm = sm.clone(); + async move { + collect_single_snapshot_files( + file_io, + &sm, + manifest_sm, + snapshot_id, + manifest_cache, + extra_resolver, + ) + .await + } + }) + .buffer_unordered(SNAPSHOT_CONCURRENCY) + .try_collect() + .await?; + + let mut merged = ScopeFileSet::default(); + for fs in per_snapshot.into_iter().flatten() { + merged.merge(&fs); + } + Ok(merged) +} + +async fn collect_tag_files( + file_io: &FileIO, + _sm: &SnapshotManager, + manifest_sm: &SnapshotManager, + tm: &TagManager, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result { + let tag_names = tm.list_all_names().await?; + + let tag_futures: Vec<_> = tag_names + .iter() + .map(|tag_name| async move { + let snapshot = match tm.get(tag_name).await? { + Some(s) => s, + None => return Ok(None), + }; + collect_snapshot_files( + file_io, + manifest_sm, + &snapshot, + manifest_cache, + extra_resolver, + ) + .await + }) + .collect(); + let tag_results = try_join_all(tag_futures).await?; + + let mut merged = ScopeFileSet::default(); + for fs in tag_results.into_iter().flatten() { + merged.merge(&fs); + } + Ok(merged) +} + +async fn collect_single_snapshot_files( + file_io: &FileIO, + sm: &SnapshotManager, + manifest_sm: &SnapshotManager, + snapshot_id: i64, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result> { + let snapshot = match try_get_snapshot(sm, snapshot_id).await? { + Some(s) => s, + None => return Ok(None), + }; + + collect_snapshot_files( + file_io, + manifest_sm, + &snapshot, + manifest_cache, + extra_resolver, + ) + .await +} + +async fn collect_snapshot_files( + file_io: &FileIO, + manifest_sm: &SnapshotManager, + snapshot: &crate::spec::Snapshot, + manifest_cache: &ManifestCache, + extra_resolver: &ExtraFileResolver, +) -> crate::Result> { + let mut file_set = ScopeFileSet::default(); + + // Collect manifest list file names (these are manifest-type files themselves) + let mut manifest_list_names = vec![ + snapshot.base_manifest_list().to_string(), + snapshot.delta_manifest_list().to_string(), + ]; + if let Some(cl) = snapshot.changelog_manifest_list() { + manifest_list_names.push(cl.to_string()); + } + + // Pre-compute paths (always resolved from table root) + let manifest_list_paths: Vec = manifest_list_names + .iter() + .map(|name| manifest_sm.manifest_path(name)) + .collect(); + + // Read all manifest lists concurrently and record their sizes + let manifest_list_futures: Vec<_> = manifest_list_paths + .iter() + .map(|path| try_read_manifest_list_with_size(file_io, path)) + .collect(); + let manifest_list_results = try_join_all(manifest_list_futures).await?; + + // Register manifest list files themselves as manifest files + for (name, (_, size)) in manifest_list_names.iter().zip(&manifest_list_results) { + if *size > 0 { + file_set.manifest_files.entry(name.clone()).or_insert(*size); + } + } + + // Flatten all manifest file metas from all manifest lists + let all_manifest_metas: Vec<&ManifestFileMeta> = manifest_list_results + .iter() + .flat_map(|(metas, _)| metas.iter()) + .collect(); + + // Register manifest files + for meta in &all_manifest_metas { + file_set + .manifest_files + .entry(meta.file_name().to_string()) + .or_insert(meta.file_size()); + } + + // Read manifest files to get data file entries, using cache by full path + let manifest_paths: Vec = all_manifest_metas + .iter() + .map(|meta| manifest_sm.manifest_path(meta.file_name())) + .collect(); + + let uncached_indices: Vec = manifest_paths + .iter() + .enumerate() + .filter(|(_, path)| { + let cache = manifest_cache.lock().unwrap(); + !cache.contains_key(path.as_str()) + }) + .map(|(i, _)| i) + .collect(); + + if !uncached_indices.is_empty() { + let uncached_paths: Vec<&str> = uncached_indices + .iter() + .map(|&i| manifest_paths[i].as_str()) + .collect(); + + let manifest_futures: Vec<_> = uncached_paths + .iter() + .map(|path| try_read_manifest(file_io, path)) + .collect(); + let results = try_join_all(manifest_futures).await?; + + // Collect extra files that need stat-ing + let mut extra_file_stat_tasks: Vec<(usize, usize, String)> = Vec::new(); + let mut all_file_entries: Vec> = Vec::with_capacity(results.len()); + + for (manifest_idx, entries) in results.iter().enumerate() { + let mut file_entries: Vec<(String, i64)> = Vec::new(); + for e in entries { + file_entries.push((e.file().file_name.clone(), e.file().file_size)); + for extra in &e.file().extra_files { + let entry_idx = file_entries.len(); + let full_path = extra_resolver.resolve_extra_file_path( + e.partition(), + e.bucket(), + extra, + e.file().external_path.as_deref(), + ); + if let Some(path) = full_path { + extra_file_stat_tasks.push((manifest_idx, entry_idx, path)); + } + file_entries.push((extra.clone(), 0)); + } + } + all_file_entries.push(file_entries); + } + + // Batch stat extra files concurrently + if !extra_file_stat_tasks.is_empty() { + let stat_futures: Vec<_> = extra_file_stat_tasks + .iter() + .map(|(_, _, path)| try_stat_file_size(file_io, path)) + .collect(); + let stat_results = try_join_all(stat_futures).await?; + + for ((manifest_idx, entry_idx, _), size) in + extra_file_stat_tasks.iter().zip(stat_results) + { + if size > 0 { + all_file_entries[*manifest_idx][*entry_idx].1 = size; + } + } + } + + let mut cache = manifest_cache.lock().unwrap(); + for (path, file_entries) in uncached_paths.into_iter().zip(all_file_entries) { + cache.insert(path.to_string(), file_entries); + } + } + + // Collect data files from cache (deduplicated by HashMap key) + { + let cache = manifest_cache.lock().unwrap(); + for path in &manifest_paths { + if let Some(entries) = cache.get(path.as_str()) { + for (name, size) in entries { + file_set.data_files.entry(name.clone()).or_insert(*size); + } + } + } + } + + // Read index manifest if present + if let Some(index_manifest_name) = snapshot.index_manifest() { + // The index manifest file itself is a manifest-type file + let index_manifest_path = manifest_sm.manifest_path(index_manifest_name); + let index_entries = + try_read_index_manifest_with_size(file_io, &index_manifest_path).await?; + + if index_entries.1 > 0 { + file_set + .manifest_files + .entry(index_manifest_name.to_string()) + .or_insert(index_entries.1); + } + + for entry in &index_entries.0 { + file_set + .index_files + .entry(entry.index_file.file_name.clone()) + .or_insert(entry.index_file.file_size as i64); + } + } + + // Collect statistics file if present + if let Some(statistics_name) = snapshot.statistics() { + let statistics_path = format!( + "{}/statistics/{}", + extra_resolver.table_location, statistics_name + ); + let size = try_stat_file_size(file_io, &statistics_path).await?; + if size > 0 { + file_set + .manifest_files + .entry(statistics_name.to_string()) + .or_insert(size); + } + } + + Ok(Some(file_set)) +} + +async fn try_get_snapshot( + sm: &SnapshotManager, + snapshot_id: i64, +) -> crate::Result> { + match sm.get_snapshot(snapshot_id).await { + Ok(s) => Ok(Some(s)), + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok(None) + } + Err(crate::Error::DataInvalid { ref message, .. }) + if message.contains("does not exist") => + { + Ok(None) + } + Err(e) => Err(e), + } +} + +/// Read a manifest list file. Returns (entries, file_size_in_bytes). +async fn try_read_manifest_list_with_size( + file_io: &FileIO, + path: &str, +) -> crate::Result<(Vec, i64)> { + let input = file_io.new_input(path)?; + match input.read().await { + Ok(bytes) => { + let size = bytes.len() as i64; + let metas = crate::spec::avro::from_avro_bytes_fast(&bytes)?; + Ok((metas, size)) + } + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok((Vec::new(), 0)) + } + Err(e) => Err(e), + } +} + +async fn try_read_manifest(file_io: &FileIO, path: &str) -> crate::Result> { + match Manifest::read(file_io, path).await { + Ok(entries) => Ok(entries), + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok(Vec::new()) + } + Err(e) => Err(e), + } +} + +/// Stat a file to get its size. Returns 0 if the file is not found. +async fn try_stat_file_size(file_io: &FileIO, path: &str) -> crate::Result { + let input = file_io.new_input(path)?; + match input.metadata().await { + Ok(status) => Ok(status.size as i64), + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok(0) + } + Err(e) => Err(e), + } +} + +/// Read an index manifest file. Returns (entries, file_size_in_bytes). +async fn try_read_index_manifest_with_size( + file_io: &FileIO, + path: &str, +) -> crate::Result<(Vec, i64)> { + match IndexManifest::read_with_size(file_io, path).await { + Ok(result) => Ok(result), + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok((Vec::new(), 0)) + } + Err(e) => Err(e), + } +} + +/// Summary of all physical files in the table directory, categorized by file type. +#[derive(Debug, Clone, Default)] +pub struct PhysicalFilesSummary { + pub manifest_file_count: i64, + pub manifest_file_size: i64, + pub data_file_count: i64, + pub data_file_size: i64, + pub index_file_count: i64, + pub index_file_size: i64, +} + +/// Categorize a file name into a file type. +/// Everything that is not a manifest/statistics or index file is classified as data. +fn classify_file_name(file_name: &str) -> FileType { + if file_name.starts_with("manifest-") + || file_name.starts_with("index-manifest-") + || file_name.starts_with("statistics-") + { + FileType::Manifest + } else if file_name.starts_with("index-") { + FileType::Index + } else { + FileType::Data + } +} + +enum FileType { + Manifest, + Data, + Index, +} + +const DIR_LIST_CONCURRENCY: usize = 32; + +/// Scan the table directory and compute total file sizes grouped by type. +/// +/// First lists top-level subdirectories, then concurrently lists each +/// subdirectory recursively (up to 32 in parallel) to maximize throughput +/// on object stores with many partition directories. +/// +/// Files are classified by their file name prefix: +/// - `manifest-*` / `index-manifest-*` → manifest +/// - `index-*` (excluding `index-manifest-*`) → index +/// - Everything else → data +pub async fn collect_physical_files_summary( + file_io: &FileIO, + table_location: &str, +) -> crate::Result { + // List top-level entries to discover subdirectories and top-level files + let top_entries = match file_io.list_status(table_location).await { + Ok(s) => s, + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + return Ok(PhysicalFilesSummary::default()); + } + Err(e) => return Err(e), + }; + + let mut summary = PhysicalFilesSummary::default(); + + // Classify top-level files directly + let mut sub_dirs = Vec::new(); + for entry in &top_entries { + if entry.is_dir { + sub_dirs.push(entry.path.clone()); + } else { + let file_name = entry.path.rsplit('/').next().unwrap_or(&entry.path); + accumulate_file(&mut summary, file_name, entry.size); + } + } + + // Concurrently list each subdirectory recursively + let dir_results: Vec>> = stream::iter(sub_dirs) + .map(|dir_path| async move { + match file_io.list_status_recursive(&dir_path).await { + Ok(s) => Ok(s), + Err(crate::Error::IoUnexpected { ref source, .. }) + if source.kind() == opendal::ErrorKind::NotFound => + { + Ok(Vec::new()) + } + Err(e) => Err(e), + } + }) + .buffer_unordered(DIR_LIST_CONCURRENCY) + .collect() + .await; + + for result in dir_results { + let statuses = result?; + for status in &statuses { + let file_name = status.path.rsplit('/').next().unwrap_or(&status.path); + accumulate_file(&mut summary, file_name, status.size); + } + } + + Ok(summary) +} + +fn accumulate_file(summary: &mut PhysicalFilesSummary, file_name: &str, size: u64) { + match classify_file_name(file_name) { + FileType::Manifest => { + summary.manifest_file_count += 1; + summary.manifest_file_size += size as i64; + } + FileType::Data => { + summary.data_file_count += 1; + summary.data_file_size += size as i64; + } + FileType::Index => { + summary.index_file_count += 1; + summary.index_file_size += size as i64; + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::io::FileIOBuilder; + use crate::spec::{CommitKind, Snapshot}; + use crate::table::{BranchManager, SnapshotManager, TagManager}; + + fn test_file_io() -> FileIO { + FileIOBuilder::new("memory").build().unwrap() + } + + #[tokio::test] + async fn test_collect_empty_table() { + let file_io = test_file_io(); + let result = + collect_referenced_files_summary(&file_io, "memory:/test_empty_table", &[], &[]) + .await + .unwrap(); + // total + branch:main + assert_eq!(result.len(), 2); + assert_eq!(result[0].source, "total"); + assert_eq!(result[0].data_file_count, 0); + assert_eq!(result[1].source, "branch:main"); + assert_eq!(result[1].data_file_count, 0); + } + + #[tokio::test] + async fn test_collect_with_missing_manifest() { + let table_path = "memory:/test_missing_manifest"; + let file_io = test_file_io(); + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + + let sm = SnapshotManager::new(file_io.clone(), table_path.to_string()); + + // Create a snapshot that references non-existent manifest lists + let snapshot = Snapshot::builder() + .version(3) + .id(1) + .schema_id(0) + .base_manifest_list("non-existent-base".to_string()) + .delta_manifest_list("non-existent-delta".to_string()) + .commit_user("test".to_string()) + .commit_identifier(0) + .commit_kind(CommitKind::APPEND) + .time_millis(1000) + .build(); + sm.commit_snapshot(&snapshot).await.unwrap(); + + let result = collect_referenced_files_summary(&file_io, table_path, &[], &[]) + .await + .unwrap(); + // total + branch:main + assert_eq!(result.len(), 2); + assert_eq!(result[0].source, "total"); + assert_eq!(result[0].manifest_file_count, 0); + assert_eq!(result[0].data_file_count, 0); + assert_eq!(result[1].source, "branch:main"); + assert_eq!(result[1].manifest_file_count, 0); + assert_eq!(result[1].data_file_count, 0); + } + + #[tokio::test] + async fn test_branch_tag_referenced_files() { + use crate::spec::stats::BinaryTableStats; + use crate::spec::{DataFileMeta, FileKind, Manifest, ManifestFileMeta, ManifestList}; + + let table_path = "memory:/test_branch_tag"; + let file_io = test_file_io(); + + file_io + .mkdirs(&format!("{table_path}/snapshot/")) + .await + .unwrap(); + file_io + .mkdirs(&format!("{table_path}/manifest/")) + .await + .unwrap(); + + let sm = SnapshotManager::new(file_io.clone(), table_path.to_string()); + let empty_stats = BinaryTableStats::new(vec![0u8; 8], vec![0u8; 8], vec![Some(0)]); + + // Write a manifest file (referenced by branch tag only) at the TABLE ROOT + let manifest_name = "manifest-branch-only-1"; + let manifest_path = format!("{table_path}/manifest/{manifest_name}"); + let data_file = DataFileMeta { + file_name: "data-branch-tag-file-1.parquet".to_string(), + file_size: 4096, + row_count: 100, + min_key: vec![], + max_key: vec![], + key_stats: BinaryTableStats::new(vec![], vec![], vec![]), + value_stats: BinaryTableStats::new(vec![], vec![], vec![]), + min_sequence_number: 0, + max_sequence_number: 0, + schema_id: 0, + level: 0, + extra_files: vec![], + creation_time: None, + delete_row_count: Some(0), + embedded_index: None, + file_source: None, + value_stats_cols: None, + external_path: None, + first_row_id: None, + write_cols: None, + }; + let entry = ManifestEntry::new(FileKind::Add, vec![0u8; 12], 0, 1, data_file, 2); + Manifest::write(&file_io, &manifest_path, &[entry]) + .await + .unwrap(); + + // Write a manifest list that references the above manifest (at the table root) + let manifest_list_name = "manifest-list-branch-tag-base"; + let manifest_list_path = format!("{table_path}/manifest/{manifest_list_name}"); + let manifest_meta = + ManifestFileMeta::new(manifest_name.to_string(), 512, 1, 0, empty_stats.clone(), 0); + ManifestList::write(&file_io, &manifest_list_path, &[manifest_meta]) + .await + .unwrap(); + + // Write an empty delta manifest list at the table root + let delta_list_name = "manifest-list-branch-tag-delta"; + let delta_list_path = format!("{table_path}/manifest/{delta_list_name}"); + ManifestList::write(&file_io, &delta_list_path, &[]) + .await + .unwrap(); + + // Create a main branch snapshot (with non-existent manifest lists) + let main_snapshot = Snapshot::builder() + .version(3) + .id(1) + .schema_id(0) + .base_manifest_list("manifest-list-main-base".to_string()) + .delta_manifest_list("manifest-list-main-delta".to_string()) + .commit_user("test".to_string()) + .commit_identifier(0) + .commit_kind(CommitKind::APPEND) + .time_millis(1000) + .build(); + sm.commit_snapshot(&main_snapshot).await.unwrap(); + + // Create branch b1 with NO snapshots + let bm = BranchManager::new(file_io.clone(), table_path.to_string()); + bm.create_branch("b1").await.unwrap(); + + // Create a tag under branch b1 that references the readable manifest lists + let branch_tm = TagManager::new(file_io.clone(), table_path.to_string()).with_branch("b1"); + let branch_tag_snapshot = Snapshot::builder() + .version(3) + .id(100) + .schema_id(0) + .base_manifest_list(manifest_list_name.to_string()) + .delta_manifest_list(delta_list_name.to_string()) + .commit_user("test".to_string()) + .commit_identifier(0) + .commit_kind(CommitKind::APPEND) + .time_millis(2000) + .build(); + branch_tm.create("v1", &branch_tag_snapshot).await.unwrap(); + + let result = collect_referenced_files_summary(&file_io, table_path, &[], &[]) + .await + .unwrap(); + + // Should have: total, branch:main, branch:b1 + assert_eq!(result.len(), 3); + assert_eq!(result[0].source, "total"); + assert_eq!(result[1].source, "branch:main"); + assert_eq!(result[2].source, "branch:b1"); + + // branch:b1 must have non-zero counts from the branch tag's readable manifests. + // The manifest list + manifest file + delta manifest list = 3 manifest files. + assert!( + result[2].manifest_file_count > 0, + "branch:b1 must have manifest files from branch tag, got {}", + result[2].manifest_file_count + ); + assert!( + result[2].manifest_file_size > 0, + "branch:b1 must have non-zero manifest file size, got {}", + result[2].manifest_file_size + ); + // The manifest references one data file + assert_eq!(result[2].data_file_count, 1); + assert_eq!(result[2].data_file_size, 4096); + + // total should include branch:b1's files + assert!(result[0].data_file_count >= 1); + assert!(result[0].data_file_size >= 4096); + } +} diff --git a/docs/src/sql.md b/docs/src/sql.md index 1d745d57..67ae413a 100644 --- a/docs/src/sql.md +++ b/docs/src/sql.md @@ -537,6 +537,105 @@ SELECT * FROM full_text_search('paimon.my_db.docs', 'content', 'paimon search', The function searches across all Tantivy full-text index files for the target column, merges results by relevance score, and returns the top-k matching rows. If no matching index is found, an empty result is returned. +## Referenced Files Size + +The `referenced_files_size` table-valued function computes aggregated manifest/data/index file size summaries for all snapshots referenced by a table, including snapshots from the main branch, tags, and other branches. This is useful for understanding storage usage and for orphan file analysis. + +Historical snapshots may be in the process of being cleaned up — if a manifest file has already been deleted, it is gracefully skipped (counted as 0 files/bytes). + +### Registration + +```rust +use paimon_datafusion::register_referenced_files_size; + +register_referenced_files_size(&ctx, catalog.clone(), "default"); +``` + +### Usage + +```sql +SELECT * FROM referenced_files_size('table_name') +``` + +| Argument | Type | Description | +|---|---|---| +| `table_name` | STRING | Table name, fully qualified (`catalog.db.table`) or short form | + +Example: + +```sql +SELECT * FROM referenced_files_size('paimon.my_db.orders'); +``` + +### Output Schema + +| Column | Type | Description | +|---|---|---| +| `source` | STRING | Scope: `total` or `branch:` | +| `manifest_file_count` | BIGINT | Number of manifest files | +| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) | +| `data_file_count` | BIGINT | Number of data files | +| `data_file_size` | BIGINT | Total size of data files (bytes) | +| `index_file_count` | BIGINT | Number of index files | +| `index_file_size` | BIGINT | Total size of index files (bytes) | + +The output contains one row per scope: +- `total` — sum across all branches and tags +- `branch:main` — main branch snapshots + tag snapshots +- `branch:` — one row per other branch + +To get the total referenced size: + +```sql +SELECT manifest_file_size + data_file_size + index_file_size AS total_size +FROM referenced_files_size('paimon.my_db.orders') +WHERE source = 'total'; +``` + +## Physical Files Size + +The `physical_files_size` table-valued function scans the table directory recursively and computes the total size of all physical files on disk, categorized by file type. By comparing with `referenced_files_size`, you can identify orphan files that are no longer referenced by any snapshot. + +Files are classified by their file name prefix: +- `manifest-*` / `index-manifest-*` → manifest +- `index-*` (excluding `index-manifest-*`) → index +- Everything else → data + +### Registration + +```rust +use paimon_datafusion::register_physical_files_size; + +register_physical_files_size(&ctx, catalog.clone(), "default"); +``` + +### Usage + +```sql +SELECT * FROM physical_files_size('table_name') +``` + +| Argument | Type | Description | +|---|---|---| +| `table_name` | STRING | Table name, fully qualified (`catalog.db.table`) or short form | + +Example: + +```sql +SELECT * FROM physical_files_size('paimon.my_db.orders'); +``` + +### Output Schema + +| Column | Type | Description | +|---|---|---| +| `manifest_file_count` | BIGINT | Number of manifest files on disk | +| `manifest_file_size` | BIGINT | Total size of manifest files (bytes) | +| `data_file_count` | BIGINT | Number of data files on disk | +| `data_file_size` | BIGINT | Total size of data files (bytes) | +| `index_file_count` | BIGINT | Number of index files on disk | +| `index_file_size` | BIGINT | Total size of index files (bytes) | + ## Time Travel Paimon supports time travel queries to read historical data.