@@ -128,14 +128,19 @@ impl Snapshot {
128128 } )
129129 }
130130
131+ /// Returns the engine from the config, or falls back to the log store's default engine.
132+ fn resolve_engine ( config : & DeltaTableConfig , log_store : & dyn LogStore ) -> Arc < dyn Engine > {
133+ config. engine . as_ref ( ) . map ( |e| e. 0 . clone ( ) )
134+ . unwrap_or_else ( || log_store. engine ( None ) )
135+ }
136+
131137 /// Create a new [`Snapshot`] instance
132138 pub async fn try_new (
133139 log_store : & dyn LogStore ,
134140 config : DeltaTableConfig ,
135141 version : Option < i64 > ,
136142 ) -> DeltaResult < Self > {
137- // TODO: bundle operation_id with logstore ...
138- let engine = log_store. engine ( None ) ;
143+ let engine = Self :: resolve_engine ( & config, log_store) ;
139144
140145 // NB: kernel engine uses Url::join to construct paths,
141146 // if the path does not end with a slash, the would override the entire path.
@@ -148,6 +153,11 @@ impl Snapshot {
148153 Self :: try_new_with_engine ( log_store, engine, table_root, config, version. map ( |v| v as u64 ) ) . await
149154 }
150155
156+ /// Returns the configured engine, falling back to the log store's default engine.
157+ pub ( crate ) fn engine ( & self , log_store : & dyn LogStore ) -> Arc < dyn Engine > {
158+ Self :: resolve_engine ( & self . config , log_store)
159+ }
160+
151161 pub fn scan_builder ( & self ) -> ScanBuilder {
152162 ScanBuilder :: new ( self . inner . clone ( ) )
153163 }
@@ -276,8 +286,7 @@ impl Snapshot {
276286 Err ( err) => return Box :: pin ( once ( ready ( Err ( err) ) ) ) ,
277287 } ;
278288
279- // TODO: bundle operation id with log store ...
280- let engine = log_store. engine ( None ) ;
289+ let engine = self . engine ( log_store) ;
281290 let stream = scan
282291 . scan_metadata ( engine)
283292 . map ( |d| Ok ( rb_from_scan_meta ( d?) ?) ) ;
@@ -301,7 +310,7 @@ impl Snapshot {
301310 Err ( err) => return Box :: pin ( once ( ready ( Err ( err) ) ) ) ,
302311 } ;
303312
304- let engine = log_store . engine ( None ) ;
313+ let engine = self . engine ( log_store ) ;
305314 let stream = scan
306315 . scan_metadata_from ( engine, existing_version, existing_data, existing_predicate)
307316 . map ( |d| Ok ( rb_from_scan_meta ( d?) ?) ) ;
@@ -439,8 +448,7 @@ impl Snapshot {
439448 let mut builder = RecordBatchReceiverStreamBuilder :: new ( 100 ) ;
440449 let tx = builder. tx ( ) ;
441450
442- // TODO: bundle operation id with log store ...
443- let engine = log_store. engine ( None ) ;
451+ let engine = self . engine ( log_store) ;
444452
445453 let remove_data = match self . inner . log_segment ( ) . read_actions (
446454 engine. as_ref ( ) ,
@@ -486,8 +494,7 @@ impl Snapshot {
486494 log_store : & dyn LogStore ,
487495 app_id : String ,
488496 ) -> DeltaResult < Option < i64 > > {
489- // TODO: bundle operation id with log store ...
490- let engine = log_store. engine ( None ) ;
497+ let engine = self . engine ( log_store) ;
491498 let inner = self . inner . clone ( ) ;
492499 let version =
493500 spawn_blocking_with_span ( move || inner. get_app_id_version ( & app_id, engine. as_ref ( ) ) )
@@ -506,7 +513,7 @@ impl Snapshot {
506513 log_store : & dyn LogStore ,
507514 domain : impl ToString ,
508515 ) -> DeltaResult < Option < String > > {
509- let engine = log_store . engine ( None ) ;
516+ let engine = self . engine ( log_store ) ;
510517 let inner = self . inner . clone ( ) ;
511518 let domain = domain. to_string ( ) ;
512519 let metadata =
@@ -617,10 +624,11 @@ impl EagerSnapshot {
617624 return Ok ( ( ) ) ;
618625 }
619626
627+ let engine = self . snapshot . engine ( log_store) ;
620628 self . snapshot = self
621629 . snapshot
622630 . clone ( )
623- . update ( log_store . engine ( None ) , target_version)
631+ . update ( engine, target_version)
624632 . await ?;
625633
626634 self . files = self
0 commit comments