Skip to content

Commit b94f184

Browse files
dvlascencoVLCDaniel
authored andcommitted
feat: delta engine injection support
1 parent 08c2fb1 commit b94f184

5 files changed

Lines changed: 53 additions & 14 deletions

File tree

crates/core/src/delta_datafusion/mod.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ pub use cdf::scan::DeltaCdfTableProvider;
7272
pub(crate) use data_validation::{
7373
DataValidationExec, constraints_to_exprs, generated_columns_to_exprs, validation_predicates,
7474
};
75+
pub use engine::DataFusionEngine;
7576
pub(crate) use find_files::*;
7677
pub use table_provider::{
7778
DeltaScan, DeltaScanConfig, DeltaScanConfigBuilder, DeltaTableProvider, TableProviderBuilder,

crates/core/src/delta_datafusion/table_provider/next/mod.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@ use datafusion::{
4040
physical_plan::ExecutionPlan,
4141
};
4242
use datafusion::catalog::{ScanArgs, ScanResult};
43+
use delta_kernel::Engine;
4344
use delta_kernel::table_configuration::TableConfiguration;
4445
use serde::{Deserialize, Serialize};
4546

@@ -175,7 +176,9 @@ impl TableProvider for DeltaScan {
175176
filters: &[Expr],
176177
limit: Option<usize>,
177178
) -> Result<Arc<dyn ExecutionPlan>> {
178-
let engine = DataFusionEngine::new_from_session(session);
179+
let engine: Arc<dyn Engine> = self.snapshot.snapshot().config.engine.as_ref()
180+
.map(|e| e.0.clone())
181+
.unwrap_or_else(|| DataFusionEngine::new_from_session(session));
179182

180183
// Filter out file_id column from projection if present
181184
let file_id_idx = self
@@ -218,7 +221,9 @@ impl TableProvider for DeltaScan {
218221
}
219222

220223
async fn scan_with_args<'a>(&self, state: &dyn Session, args: ScanArgs<'a>) -> Result<ScanResult> {
221-
let engine = DataFusionEngine::new_from_session(state);
224+
let engine: Arc<dyn Engine> = self.snapshot.snapshot().config.engine.as_ref()
225+
.map(|e| e.0.clone())
226+
.unwrap_or_else(|| DataFusionEngine::new_from_session(state));
222227

223228
// Filter out file_id column from projection if present
224229
let file_id_idx = self

crates/core/src/kernel/snapshot/mod.rs

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -128,14 +128,19 @@ impl Snapshot {
128128
})
129129
}
130130

131+
/// Returns the engine from the config, or falls back to the log store's default engine.
132+
fn resolve_engine(config: &DeltaTableConfig, log_store: &dyn LogStore) -> Arc<dyn Engine> {
133+
config.engine.as_ref().map(|e| e.0.clone())
134+
.unwrap_or_else(|| log_store.engine(None))
135+
}
136+
131137
/// Create a new [`Snapshot`] instance
132138
pub async fn try_new(
133139
log_store: &dyn LogStore,
134140
config: DeltaTableConfig,
135141
version: Option<i64>,
136142
) -> DeltaResult<Self> {
137-
// TODO: bundle operation_id with logstore ...
138-
let engine = log_store.engine(None);
143+
let engine = Self::resolve_engine(&config, log_store);
139144

140145
// NB: kernel engine uses Url::join to construct paths,
141146
// if the path does not end with a slash, the would override the entire path.
@@ -148,6 +153,11 @@ impl Snapshot {
148153
Self::try_new_with_engine(log_store, engine, table_root, config, version.map(|v| v as u64)).await
149154
}
150155

156+
/// Returns the configured engine, falling back to the log store's default engine.
157+
pub(crate) fn engine(&self, log_store: &dyn LogStore) -> Arc<dyn Engine> {
158+
Self::resolve_engine(&self.config, log_store)
159+
}
160+
151161
pub fn scan_builder(&self) -> ScanBuilder {
152162
ScanBuilder::new(self.inner.clone())
153163
}
@@ -276,8 +286,7 @@ impl Snapshot {
276286
Err(err) => return Box::pin(once(ready(Err(err)))),
277287
};
278288

279-
// TODO: bundle operation id with log store ...
280-
let engine = log_store.engine(None);
289+
let engine = self.engine(log_store);
281290
let stream = scan
282291
.scan_metadata(engine)
283292
.map(|d| Ok(rb_from_scan_meta(d?)?));
@@ -301,7 +310,7 @@ impl Snapshot {
301310
Err(err) => return Box::pin(once(ready(Err(err)))),
302311
};
303312

304-
let engine = log_store.engine(None);
313+
let engine = self.engine(log_store);
305314
let stream = scan
306315
.scan_metadata_from(engine, existing_version, existing_data, existing_predicate)
307316
.map(|d| Ok(rb_from_scan_meta(d?)?));
@@ -439,8 +448,7 @@ impl Snapshot {
439448
let mut builder = RecordBatchReceiverStreamBuilder::new(100);
440449
let tx = builder.tx();
441450

442-
// TODO: bundle operation id with log store ...
443-
let engine = log_store.engine(None);
451+
let engine = self.engine(log_store);
444452

445453
let remove_data = match self.inner.log_segment().read_actions(
446454
engine.as_ref(),
@@ -486,8 +494,7 @@ impl Snapshot {
486494
log_store: &dyn LogStore,
487495
app_id: String,
488496
) -> DeltaResult<Option<i64>> {
489-
// TODO: bundle operation id with log store ...
490-
let engine = log_store.engine(None);
497+
let engine = self.engine(log_store);
491498
let inner = self.inner.clone();
492499
let version =
493500
spawn_blocking_with_span(move || inner.get_app_id_version(&app_id, engine.as_ref()))
@@ -506,7 +513,7 @@ impl Snapshot {
506513
log_store: &dyn LogStore,
507514
domain: impl ToString,
508515
) -> DeltaResult<Option<String>> {
509-
let engine = log_store.engine(None);
516+
let engine = self.engine(log_store);
510517
let inner = self.inner.clone();
511518
let domain = domain.to_string();
512519
let metadata =
@@ -617,10 +624,11 @@ impl EagerSnapshot {
617624
return Ok(());
618625
}
619626

627+
let engine = self.snapshot.engine(log_store);
620628
self.snapshot = self
621629
.snapshot
622630
.clone()
623-
.update(log_store.engine(None), target_version)
631+
.update(engine, target_version)
624632
.await?;
625633

626634
self.files = self

crates/core/src/lib.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ pub use self::schema::partitions::*;
104104
pub use self::schema::*;
105105
pub use self::table::DeltaTable;
106106
pub use self::table::builder::{
107-
DeltaTableBuilder, DeltaTableConfig, DeltaVersion, ensure_table_uri,
107+
DeltaTableBuilder, DeltaTableConfig, DeltaVersion, EngineRef, ensure_table_uri,
108108
};
109109
pub use self::table::config::TableProperty;
110110
pub use object_store::{Error as ObjectStoreError, ObjectMeta, ObjectStore, path::Path};

crates/core/src/table/builder.rs

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use serde::{Deserialize, Serialize};
1111
use tracing::debug;
1212
use url::Url;
1313

14+
use delta_kernel::Engine;
15+
1416
use super::normalize_table_url;
1517
use crate::logstore::storage::IORuntime;
1618
use crate::logstore::{LogStoreRef, StorageConfig, object_store_factories};
@@ -29,6 +31,16 @@ pub enum DeltaVersion {
2931
Timestamp(DateTime<Utc>),
3032
}
3133

34+
/// Wrapper around `Arc<dyn Engine>` that implements `Debug` and `Clone`.
35+
#[derive(Clone)]
36+
pub struct EngineRef(pub Arc<dyn Engine>);
37+
38+
impl std::fmt::Debug for EngineRef {
39+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40+
write!(f, "EngineRef(<engine>)")
41+
}
42+
}
43+
3244
/// Configuration options for delta table
3345
#[derive(Debug, Serialize, Deserialize, Clone, DeltaConfig)]
3446
#[serde(rename_all = "camelCase")]
@@ -61,6 +73,12 @@ pub struct DeltaTableConfig {
6173
#[delta(skip)]
6274
pub log_size_limiter: Option<LogSizeLimiter>,
6375

76+
/// Custom kernel engine to use for snapshot loading and scan operations.
77+
/// When set, this engine is used instead of the default engine derived from the log store.
78+
#[serde(skip_serializing, skip_deserializing)]
79+
#[delta(skip)]
80+
pub engine: Option<EngineRef>,
81+
6482
/// HSTACK: skip stats parsing during file listing. Runtime-only (not persisted).
6583
/// Default `true` for performance; set to `false` when stats-based pruning helps the query.
6684
#[serde(skip_serializing, skip_deserializing)]
@@ -76,6 +94,7 @@ impl Default for DeltaTableConfig {
7694
log_batch_size: 1024,
7795
io_runtime: None,
7896
log_size_limiter: None,
97+
engine: None,
7998
skip_stats_in_file_listing: true,
8099
}
81100
}
@@ -156,6 +175,12 @@ impl DeltaTableBuilder {
156175
self
157176
}
158177

178+
/// Sets a custom kernel `Engine` to use for snapshot loading and scan operations.
179+
pub fn with_engine(mut self, engine: Arc<dyn Engine>) -> Self {
180+
self.table_config.engine = Some(EngineRef(engine));
181+
self
182+
}
183+
159184
/// Sets `version` to the builder
160185
pub fn with_version(mut self, version: i64) -> Self {
161186
self.version = DeltaVersion::Version(version);

0 commit comments

Comments
 (0)