From ff6eeda2e0bfd16b0b9239fcfaf2ef25d9f12f3e Mon Sep 17 00:00:00 2001 From: marshmallow Date: Sat, 23 May 2026 10:45:13 +1000 Subject: [PATCH] cache node evaluation --- ...cd0a9c2b45ae2feeda76f7002b334bac28674.json | 12 + ...6f863ece674126914d7146a75a79c0c79aa6f.json | 74 +++++ ...2d28066719fab4319880dfdd2de215c08ba6a.json | 50 ++++ ...b6332ad23bf70286f29665f5b3e556aa32c5c.json | 12 + ...273b352f5c02457bbbaea8aabb737622ccb8d.json | 12 + CHANGELOG.md | 2 + crates/cli/src/apply.rs | 43 ++- crates/cli/src/main.rs | 12 +- .../20260523003752_attribute_lookup.sql | 11 + crates/core/src/cache/mod.rs | 268 +++++++++++++++++- crates/core/src/hive/executor.rs | 19 +- crates/core/src/hive/mod.rs | 18 +- crates/core/src/hive/plan.rs | 72 ++++- crates/core/src/hive/steps/evaluate.rs | 23 +- crates/nix_client/src/lib.rs | 2 +- 15 files changed, 590 insertions(+), 40 deletions(-) create mode 100644 .sqlx/query-6d7658c36b965ffe57cbaaa5550cd0a9c2b45ae2feeda76f7002b334bac28674.json create mode 100644 .sqlx/query-6e4989977a0257d5f67151d58936f863ece674126914d7146a75a79c0c79aa6f.json create mode 100644 .sqlx/query-7b75a4af74b33f56f2ceeea1e462d28066719fab4319880dfdd2de215c08ba6a.json create mode 100644 .sqlx/query-7d06a5ac5518429aa99edd5d878b6332ad23bf70286f29665f5b3e556aa32c5c.json create mode 100644 .sqlx/query-d2e92efc45d4c8b951656a48329273b352f5c02457bbbaea8aabb737622ccb8d.json create mode 100644 crates/core/src/cache/migrations/20260523003752_attribute_lookup.sql diff --git a/.sqlx/query-6d7658c36b965ffe57cbaaa5550cd0a9c2b45ae2feeda76f7002b334bac28674.json b/.sqlx/query-6d7658c36b965ffe57cbaaa5550cd0a9c2b45ae2feeda76f7002b334bac28674.json new file mode 100644 index 00000000..001f848c --- /dev/null +++ b/.sqlx/query-6d7658c36b965ffe57cbaaa5550cd0a9c2b45ae2feeda76f7002b334bac28674.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "delete from evaluation_cache where output_path_digest = $1 and output_path_name = $2", + "describe": { + "columns": [], + "parameters": { + "Right": 2 + }, + "nullable": [] + }, + "hash": "6d7658c36b965ffe57cbaaa5550cd0a9c2b45ae2feeda76f7002b334bac28674" +} diff --git a/.sqlx/query-6e4989977a0257d5f67151d58936f863ece674126914d7146a75a79c0c79aa6f.json b/.sqlx/query-6e4989977a0257d5f67151d58936f863ece674126914d7146a75a79c0c79aa6f.json new file mode 100644 index 00000000..9b2fd67b --- /dev/null +++ b/.sqlx/query-6e4989977a0257d5f67151d58936f863ece674126914d7146a75a79c0c79aa6f.json @@ -0,0 +1,74 @@ +{ + "db_name": "SQLite", + "query": "select rowid, flake_path_digest, flake_path_name, output_path_digest, output_path_name from evaluation_cache where rowid > $1 order by rowid asc limit 50", + "describe": { + "columns": [ + { + "name": "rowid", + "ordinal": 0, + "type_info": "Integer", + "origin": { + "Table": { + "table": "evaluation_cache", + "name": "rowid" + } + } + }, + { + "name": "flake_path_digest", + "ordinal": 1, + "type_info": "Blob", + "origin": { + "Table": { + "table": "evaluation_cache", + "name": "flake_path_digest" + } + } + }, + { + "name": "flake_path_name", + "ordinal": 2, + "type_info": "Text", + "origin": { + "Table": { + "table": "evaluation_cache", + "name": "flake_path_name" + } + } + }, + { + "name": "output_path_digest", + "ordinal": 3, + "type_info": "Blob", + "origin": { + "Table": { + "table": "evaluation_cache", + "name": "output_path_digest" + } + } + }, + { + "name": "output_path_name", + "ordinal": 4, + "type_info": "Text", + "origin": { + "Table": { + "table": "evaluation_cache", + "name": "output_path_name" + } + } + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + false, + false, + false, + false + ] + }, + "hash": "6e4989977a0257d5f67151d58936f863ece674126914d7146a75a79c0c79aa6f" +} diff --git a/.sqlx/query-7b75a4af74b33f56f2ceeea1e462d28066719fab4319880dfdd2de215c08ba6a.json b/.sqlx/query-7b75a4af74b33f56f2ceeea1e462d28066719fab4319880dfdd2de215c08ba6a.json new file mode 100644 index 00000000..c7462712 --- /dev/null +++ b/.sqlx/query-7b75a4af74b33f56f2ceeea1e462d28066719fab4319880dfdd2de215c08ba6a.json @@ -0,0 +1,50 @@ +{ + "db_name": "SQLite", + "query": "\n select\n evaluation_cache.output_path_digest,\n evaluation_cache.output_path_name,\n evaluation_cache.node_name\n from\n evaluation_cache\n where\n evaluation_cache.flake_path_digest = $1\n and evaluation_cache.flake_path_name = $2\n and evaluation_cache.flake_hash_sri = $3\n and evaluation_cache.node_name in (select value from json_each($4))\n ", + "describe": { + "columns": [ + { + "name": "output_path_digest", + "ordinal": 0, + "type_info": "Blob", + "origin": { + "Table": { + "table": "evaluation_cache", + "name": "output_path_digest" + } + } + }, + { + "name": "output_path_name", + "ordinal": 1, + "type_info": "Text", + "origin": { + "Table": { + "table": "evaluation_cache", + "name": "output_path_name" + } + } + }, + { + "name": "node_name", + "ordinal": 2, + "type_info": "Text", + "origin": { + "Table": { + "table": "evaluation_cache", + "name": "node_name" + } + } + } + ], + "parameters": { + "Right": 4 + }, + "nullable": [ + false, + false, + false + ] + }, + "hash": "7b75a4af74b33f56f2ceeea1e462d28066719fab4319880dfdd2de215c08ba6a" +} diff --git a/.sqlx/query-7d06a5ac5518429aa99edd5d878b6332ad23bf70286f29665f5b3e556aa32c5c.json b/.sqlx/query-7d06a5ac5518429aa99edd5d878b6332ad23bf70286f29665f5b3e556aa32c5c.json new file mode 100644 index 00000000..db1e643c --- /dev/null +++ b/.sqlx/query-7d06a5ac5518429aa99edd5d878b6332ad23bf70286f29665f5b3e556aa32c5c.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "\n insert into evaluation_cache (\n flake_path_digest,\n flake_path_name,\n flake_hash_sri,\n node_name,\n output_path_digest,\n output_path_name\n ) values ($1, $2, $3, $4, $5, $6)\n ", + "describe": { + "columns": [], + "parameters": { + "Right": 6 + }, + "nullable": [] + }, + "hash": "7d06a5ac5518429aa99edd5d878b6332ad23bf70286f29665f5b3e556aa32c5c" +} diff --git a/.sqlx/query-d2e92efc45d4c8b951656a48329273b352f5c02457bbbaea8aabb737622ccb8d.json b/.sqlx/query-d2e92efc45d4c8b951656a48329273b352f5c02457bbbaea8aabb737622ccb8d.json new file mode 100644 index 00000000..dcb25d0a --- /dev/null +++ b/.sqlx/query-d2e92efc45d4c8b951656a48329273b352f5c02457bbbaea8aabb737622ccb8d.json @@ -0,0 +1,12 @@ +{ + "db_name": "SQLite", + "query": "delete from evaluation_cache where flake_path_digest = $1 and flake_path_name = $2 and output_path_digest = $3 and output_path_name = $4", + "describe": { + "columns": [], + "parameters": { + "Right": 4 + }, + "nullable": [] + }, + "hash": "d2e92efc45d4c8b951656a48329273b352f5c02457bbbaea8aabb737622ccb8d" +} diff --git a/CHANGELOG.md b/CHANGELOG.md index 95a5b3bc..0d9ea3f5 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 rust nix daemon client if `--experimental-nix-client` is passed. - Node `target` liveliness is now determined directly by initiating a nix daemon handshake if `--experimental-nix-client` is passed. +- Node evaluations are now cached between wire invocations in flake hives, + invalidating old caches with a connection to the local nix daemon. ### Changed diff --git a/crates/cli/src/apply.rs b/crates/cli/src/apply.rs index f1dcb2cc..2b94b99b 100644 --- a/crates/cli/src/apply.rs +++ b/crates/cli/src/apply.rs @@ -10,7 +10,9 @@ use std::io::Read; use std::sync::Arc; use std::sync::atomic::AtomicBool; use thiserror::Error; +use tokio::sync::oneshot; use tracing::{error, info}; +use wire_core::cache::InspectionCache; use wire_core::hive::executor::execute; use wire_core::hive::node::{Name, Node}; use wire_core::hive::plan::{Goal, plan_for_node}; @@ -102,6 +104,7 @@ pub async fn apply( partition: Partitions, make_goal: F, mut modifiers: SubCommandModifiers, + cache: Arc>, ) -> Result<()> where F: Fn(&Name, &Node) -> Goal, @@ -122,6 +125,16 @@ where .map(|(name, _)| name.clone()) .collect(); + let mut cached_evaluations = if let Some(ref cache) = *cache.clone() + && let HiveLocation::Flake { ref prefetch, .. } = *location + { + cache + .get_evaluations(prefetch, &selected_names, should_quit.clone()) + .await + } else { + None + }; + let num_selected = selected_names.len(); let partitioned_names = partition_arr(selected_names, &partition); @@ -137,6 +150,8 @@ where let _ = tx.send(UiMessage::AddMany(partitioned_names.clone())); } + let mut evaluation_cache_tasks = Vec::new(); + let mut set = hive .nodes .iter_mut() @@ -151,8 +166,30 @@ where location.clone(), &modifiers, should_quit.clone(), + cached_evaluations + .as_mut() + .and_then(|cache| cache.remove(name)), ); - execute(plan).map(move |result| (name, result)) + + let (sender, receiver) = oneshot::channel(); + + let location = location.clone(); + let cache = cache.clone(); + let cache_name = name.clone(); + + evaluation_cache_tasks.push(tokio::spawn(async move { + if let Some(ref cache) = *cache + && let HiveLocation::Flake { ref prefetch, .. } = *location + && let Ok(evaluated_path) = receiver.await + { + cache + .store_evaluation(prefetch, &cache_name, evaluated_path) + .await; + } + })); + + let name_clone = name.clone(); + execute(plan, sender).map(move |result| (name_clone, result)) }) .peekable(); @@ -163,6 +200,10 @@ where let futures = futures::stream::iter(set).buffer_unordered(args.parallel); let result = futures.collect::>().await; + for task in evaluation_cache_tasks { + let _ = task.await; + } + let (successful, errors): (Vec<_>, Vec<_>) = result .into_iter() diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index cd148475..35fa38ae 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -78,7 +78,7 @@ async fn main() -> Result<()> { let signals_task = tokio::spawn(handle_signals(signals, should_shutdown.clone())); let location = get_hive_location(args.path, modifiers).await?; - let cache = InspectionCache::new().await; + let cache = Arc::new(InspectionCache::new().await); match args.command { cli::Commands::Apply(apply_args) => { @@ -90,7 +90,7 @@ async fn main() -> Result<()> { apply::apply( &mut hive, - should_shutdown, + should_shutdown.clone(), location, apply_args.common, Partitions::default(), @@ -109,6 +109,7 @@ async fn main() -> Result<()> { }) }, modifiers, + cache.clone(), ) .await?; } @@ -117,12 +118,13 @@ async fn main() -> Result<()> { apply::apply( &mut hive, - should_shutdown, + should_shutdown.clone(), location, build_args.common, build_args.partition.unwrap_or_default(), |_name, _node| Goal::Build, modifiers, + cache.clone(), ) .await?; } @@ -150,8 +152,8 @@ async fn main() -> Result<()> { }), } - if let Some(cache) = cache { - cache.gc().await.into_diagnostic()?; + if let Some(cache) = &*cache { + cache.gc(should_shutdown).await.into_diagnostic()?; } signals_handle.close(); diff --git a/crates/core/src/cache/migrations/20260523003752_attribute_lookup.sql b/crates/core/src/cache/migrations/20260523003752_attribute_lookup.sql new file mode 100644 index 00000000..c0e39377 --- /dev/null +++ b/crates/core/src/cache/migrations/20260523003752_attribute_lookup.sql @@ -0,0 +1,11 @@ +create table evaluation_cache ( + flake_path_digest blob, + flake_path_name text, + flake_hash_sri text, + node_name text, + + output_path_digest blob not null, + output_path_name text not null, + + primary key (flake_path_digest, flake_path_name, flake_hash_sri, node_name) +) strict; diff --git a/crates/core/src/cache/mod.rs b/crates/core/src/cache/mod.rs index 299245d3..63d0721a 100644 --- a/crates/core/src/cache/mod.rs +++ b/crates/core/src/cache/mod.rs @@ -2,21 +2,26 @@ // Copyright 2024-2025 wire Contributors use std::{ + collections::HashMap, env, path::{Path, PathBuf}, + sync::{Arc, atomic::AtomicBool}, }; +use itertools::Itertools; use sqlx::{ Pool, Sqlite, migrate::{Migrator, MigrateError}, sqlite::{SqliteConnectOptions, SqlitePoolOptions}, }; use tokio::fs::create_dir_all; -use tracing::{debug, error, info, trace}; +use tracing::{Level, debug, error, info, instrument, trace}; +use wire_nix_client::NixClient; use crate::{ SafeStorePath, - hive::{FlakePrefetch, Hive}, + commands::trace_nix_log_message, + hive::{FlakePrefetch, Hive, node::Name}, }; #[derive(Clone)] @@ -258,7 +263,151 @@ impl InspectionCache { } } - pub async fn gc(&self) -> Result<(), sqlx::Error> { + #[instrument(skip(self), ret(level = Level::DEBUG))] + pub async fn get_evaluations( + &self, + prefetch: &FlakePrefetch, + nodes: &[Name], + should_quit: Arc, + ) -> Option>> { + struct Query { + output_path_digest: Vec, + output_path_name: String, + node_name: String, + } + + let mut client = NixClient::open_local(trace_nix_log_message, should_quit, false).await.inspect_err(|err| error!(err = ?err, "failed to open local nix client to verify cached evaluations")).ok()?; + + let store_path_digest = &prefetch.store_path.digest()[..]; + let store_path_name = prefetch.store_path.name(); + let flake_hash_sri = prefetch.hash.to_sri_string(); + let nodes_json = serde_json::to_string(nodes).unwrap_or_else(|_| "[]".to_string()); + + let evaluation_cache = sqlx::query_as!( + Query, + " + select + evaluation_cache.output_path_digest, + evaluation_cache.output_path_name, + evaluation_cache.node_name + from + evaluation_cache + where + evaluation_cache.flake_path_digest = $1 + and evaluation_cache.flake_path_name = $2 + and evaluation_cache.flake_hash_sri = $3 + and evaluation_cache.node_name in (select value from json_each($4)) + ", + store_path_digest, + store_path_name, + flake_hash_sri, + nodes_json + ) + .fetch_all(&self.pool) + .await + .inspect_err(|x| error!("failed to fetch cached node evaluations: {x}")) + .ok()?; + + let evaluation_cache: HashMap<_, _> = evaluation_cache + .into_iter() + .map(|x| { + ( + x.node_name, + SafeStorePath::::from_name_and_digest( + &x.output_path_name, + &x.output_path_digest, + ) + .inspect_err(|err| error!("failed to parse StorePath from cache: {err:?}")) + .ok(), + ) + }) + .filter_map(|(name, path)| path.map(|path| (name, path))) + .filter_map(|(name, path)| { + nodes + .iter() + .find(|x| x.0.as_ref() == name.as_str()) + .map(|name| (name.clone(), path)) + }) + .collect(); + + let valid_paths = client + .query_valid_paths(evaluation_cache.values().cloned().collect_vec(), false) + .await + .inspect_err( + |err| error!(err = ?err, "failed to query valid paths for evaluation cache"), + ) + .ok()?; + + // delete paths that no longer exist in the nix store + let invalid_paths: Vec<_> = evaluation_cache + .iter() + .filter(|(_, path)| !valid_paths.contains(path)) + .map(|(_, path)| (path.digest().to_vec(), path.name().clone())) + .collect(); + + for (output_path_digest, output_path_name) in invalid_paths { + if let Err(err) = sqlx::query!( + "delete from evaluation_cache where output_path_digest = $1 and output_path_name = $2", + output_path_digest, + output_path_name + ) + .execute(&self.pool) + .await + { + error!(err = ?err, "failed to delete invalid evaluation cache entry"); + } + } + + Some( + evaluation_cache + .into_iter() + .filter(|(_, path)| valid_paths.contains(path)) + .collect(), + ) + } + + pub async fn store_evaluation( + &self, + prefetch: &FlakePrefetch, + name: &Name, + path: SafeStorePath, + ) { + trace!(evaluated_path = ?path, prefetch = ?prefetch, "storing evaluated output"); + + let store_path_digest = &prefetch.store_path.digest()[..]; + let store_path_name = prefetch.store_path.name(); + let output_path_digest = &path.digest()[..]; + let output_path_name = path.name(); + let flake_hash_sri = prefetch.hash.to_sri_string(); + let node_name = name.to_string(); + + let cached_evaluation = sqlx::query!( + " + insert into evaluation_cache ( + flake_path_digest, + flake_path_name, + flake_hash_sri, + node_name, + output_path_digest, + output_path_name + ) values ($1, $2, $3, $4, $5, $6) + ", + store_path_digest, + store_path_name, + flake_hash_sri, + node_name, + output_path_digest, + output_path_name + ) + .execute(&self.pool) + .await; + + if let Err(err) = cached_evaluation { + error!("could not insert cached_evaluation: {err}"); + } + } + + pub async fn gc(&self, should_quit: Arc) -> Result<(), sqlx::Error> { // keep newest 30 AND // delete caches that refer to a blob w/ wrong schema sqlx::query!( @@ -305,6 +454,119 @@ where .execute(&self.pool) .await?; + self.gc_evaluation_cache(should_quit).await?; + + Ok(()) + } + + pub async fn gc_evaluation_cache( + &self, + should_quit: Arc, + ) -> Result<(), sqlx::Error> { + struct EvaluationPaths { + flake_path_digest: Vec, + flake_path_name: String, + output_path_digest: Vec, + output_path_name: String, + rowid: i64, + } + + let mut previous_rowid = 0; + + let mut client = + match NixClient::open_local(trace_nix_log_message, should_quit, false).await { + Ok(c) => c, + Err(err) => { + error!(err = ?err, "failed to open local nix client for gc"); + return Ok(()); + } + }; + + // delete caches whose flake/output paths no longer exist in the nix store + loop { + let evaluation_paths: Vec<_> = sqlx::query_as!( + EvaluationPaths, + "select rowid, flake_path_digest, flake_path_name, output_path_digest, output_path_name from evaluation_cache where rowid > $1 order by rowid asc limit 50", + previous_rowid + ) + .fetch_all(&self.pool) + .await?; + + if evaluation_paths.is_empty() { + break; + } + + previous_rowid = evaluation_paths + .last() + .map(|r| r.rowid) + .unwrap_or(previous_rowid); + + // build list of all store paths to check + let mut all_paths: Vec> = Vec::new(); + + for path in &evaluation_paths { + if let Ok(p) = SafeStorePath::::from_name_and_digest( + &path.flake_path_name, + &path.flake_path_digest, + ) { + all_paths.push(p); + } + if let Ok(p) = SafeStorePath::::from_name_and_digest( + &path.output_path_name, + &path.output_path_digest, + ) { + all_paths.push(p); + } + } + + // query which cached paths are valid in the nix store + let valid_paths = match client.query_valid_paths(all_paths.clone(), false).await { + Ok(v) => v, + Err(err) => { + error!(err = ?err, "failed to query valid paths for gc"); + return Ok(()); + } + }; + + let valid_set: std::collections::HashSet<_> = valid_paths.into_iter().collect(); + + // delete invalid evaluation_cache entries + for path in &evaluation_paths { + let mut is_invalid = false; + + if let Ok(p) = SafeStorePath::::from_name_and_digest( + &path.flake_path_name, + &path.flake_path_digest, + ) && !valid_set.contains(&p) + { + is_invalid = true; + } + + if let Ok(p) = SafeStorePath::::from_name_and_digest( + &path.output_path_name, + &path.output_path_digest, + ) && !valid_set.contains(&p) + { + is_invalid = true; + } + + if !is_invalid { + continue; + } + + let _ = sqlx::query!( + "delete from evaluation_cache where flake_path_digest = $1 and flake_path_name = $2 and output_path_digest = $3 and output_path_name = $4", + path.flake_path_digest.clone(), + path.flake_path_name, + path.output_path_digest.clone(), + path.output_path_name + ) + .execute(&self.pool) + .await + .inspect_err(|err| error!(err = ?err, "failed to delete from evaluation path")); + } + } + Ok(()) } } diff --git a/crates/core/src/hive/executor.rs b/crates/core/src/hive/executor.rs index caf496cf..fca2f8c9 100644 --- a/crates/core/src/hive/executor.rs +++ b/crates/core/src/hive/executor.rs @@ -6,6 +6,7 @@ use crate::{ use std::debug_assert_matches; use std::sync::Arc; +use tokio::sync::oneshot; use tracing::{Instrument, Span, debug, error, event, instrument}; use crate::{ @@ -38,6 +39,7 @@ async fn evaluate_task( hive_location: Arc, name: Name, modifiers: SubCommandModifiers, + on_new_evaluation: oneshot::Sender>, ) { let output = evaluate_hive_attribute(&hive_location, &EvalGoal::GetTopLevel(&name), modifiers) .await @@ -57,6 +59,10 @@ async fn evaluate_task( debug!(output = ?output, done = true); + if let Ok(ref path) = output { + let _ = on_new_evaluation.send(path.clone()); + } + let _ = tx.send(output); } @@ -64,7 +70,10 @@ async fn evaluate_task( /// Performs some optimisations such as greedily executing evaluation before /// other steps independent of evaluation's result. #[instrument(skip_all, fields(node = %plan.context.name))] -pub async fn execute(mut plan: NodePlan) -> Result<(), HiveLibError> { +pub async fn execute( + mut plan: NodePlan, + on_new_evaluation: oneshot::Sender>, +) -> Result<(), HiveLibError> { app_shutdown_guard(&plan.context)?; let (tx, rx) = tokio::sync::oneshot::channel(); @@ -90,6 +99,7 @@ pub async fn execute(mut plan: NodePlan) -> Result<(), HiveLibError> { plan.context.hive_location.clone(), plan.context.name.clone(), plan.context.modifiers, + on_new_evaluation, ) .in_current_span(), ); @@ -143,6 +153,8 @@ pub async fn execute(mut plan: NodePlan) -> Result<(), HiveLibError> { #[cfg(test)] mod tests { + use tokio::sync::oneshot; + use crate::{ SubCommandModifiers, errors::HiveLibError, @@ -182,9 +194,12 @@ mod tests { location.clone().into(), &SubCommandModifiers::default(), should_quit.clone(), + None, ); - let status = execute(plan).await; + let channel = oneshot::channel(); + + let status = execute(plan, channel.0).await; assert_matches!(status, Err(HiveLibError::Sigint)); } diff --git a/crates/core/src/hive/mod.rs b/crates/core/src/hive/mod.rs index 525d1dfc..69796044 100644 --- a/crates/core/src/hive/mod.rs +++ b/crates/core/src/hive/mod.rs @@ -53,12 +53,12 @@ impl Hive { #[instrument(skip_all, name = "eval_hive")] pub async fn new_from_path( location: &HiveLocation, - cache: Option, + cache: Arc>, modifiers: SubCommandModifiers, ) -> Result { info!("evaluating hive {location:?}"); - if let Some(ref cache) = cache + if let Some(ref cache) = *cache && let HiveLocation::Flake { prefetch, .. } = location && let Some(hive) = cache.get_hive(prefetch).await { @@ -71,7 +71,7 @@ impl Hive { HiveLibError::HiveInitialisationError(HiveInitialisationError::ParseEvaluateError(err)) })?; - if let Some(cache) = cache + if let Some(ref cache) = *cache && let HiveLocation::Flake { prefetch, .. } = location { cache.store_hive(prefetch, &output).await; @@ -316,7 +316,7 @@ mod tests { async fn test_hive_file() { let location = location!(get_test_path!()); - let hive = Hive::new_from_path(&location, None, SubCommandModifiers::default()) + let hive = Hive::new_from_path(&location, None.into(), SubCommandModifiers::default()) .await .unwrap(); @@ -342,7 +342,7 @@ mod tests { async fn non_trivial_hive() { let location = location!(get_test_path!()); - let hive = Hive::new_from_path(&location, None, SubCommandModifiers::default()) + let hive = Hive::new_from_path(&location, None.into(), SubCommandModifiers::default()) .await .unwrap(); @@ -389,7 +389,7 @@ mod tests { ) .await .unwrap(); - let hive = Hive::new_from_path(&location, None, SubCommandModifiers::default()) + let hive = Hive::new_from_path(&location, None.into(), SubCommandModifiers::default()) .await .unwrap(); @@ -416,7 +416,7 @@ mod tests { let location = location!(get_test_path!()); assert_matches!( - Hive::new_from_path(&location, None, SubCommandModifiers::default()).await, + Hive::new_from_path(&location, None.into(), SubCommandModifiers::default()).await, Err(HiveLibError::NixEvalError { source: CommandError::CommandFailed { logs, @@ -433,7 +433,7 @@ mod tests { let location = location!(get_test_path!()); assert_matches!( - Hive::new_from_path(&location, None, SubCommandModifiers::default()).await, + Hive::new_from_path(&location, None.into(), SubCommandModifiers::default()).await, Err(HiveLibError::NixEvalError { source: CommandError::CommandFailed { logs, @@ -451,7 +451,7 @@ mod tests { location.push("non_trivial_hive"); let location = location!(location); - let mut hive = Hive::new_from_path(&location, None, SubCommandModifiers::default()) + let mut hive = Hive::new_from_path(&location, None.into(), SubCommandModifiers::default()) .await .unwrap(); diff --git a/crates/core/src/hive/plan.rs b/crates/core/src/hive/plan.rs index dd279c93..cf13fae1 100644 --- a/crates/core/src/hive/plan.rs +++ b/crates/core/src/hive/plan.rs @@ -6,7 +6,7 @@ use std::{ use tokio::sync::RwLock; use crate::{ - SubCommandModifiers, + SafeStorePath, SubCommandModifiers, hive::{ HiveLocation, node::{ @@ -121,6 +121,7 @@ fn apply_plan( modifiers: SubCommandModifiers, hive_location: Arc, should_quit: Arc, + cached_evaluation: Option>, ) -> NodePlan { let ApplyGoalArgs { goal, @@ -135,6 +136,7 @@ fn apply_plan( let mut steps: Vec = Vec::new(); let mut end: Vec = Vec::new(); let target = SharedTarget(Arc::new(RwLock::new(node.target.clone()))); + let has_cached_evaluation = cached_evaluation.is_some(); if !*should_apply_locally { steps.push(Step::Ping(Ping { @@ -143,7 +145,7 @@ fn apply_plan( } if !matches!(goal, ApplyGoal::Keys) { - steps.push(Step::Evaluate(Evaluate)); + steps.push(Step::Evaluate(Evaluate { cached_evaluation })); } if !matches!(goal, ApplyGoal::Keys) @@ -212,7 +214,7 @@ fn apply_plan( build_id_names: Arc::new(Mutex::new(HashMap::new())), }, steps, - greedy_evaluate: !matches!(&goal, ApplyGoal::Keys), + greedy_evaluate: !matches!(&goal, ApplyGoal::Keys) && !has_cached_evaluation, ignore_failed_ping: matches!(handle_unreachable, HandleUnreachable::Ignore), } } @@ -225,7 +227,10 @@ pub fn plan_for_node( hive_location: Arc, modifiers: &SubCommandModifiers, should_quit: Arc, + cached_evaluation: Option>, ) -> NodePlan { + let greedy_evaluate = cached_evaluation.is_none(); + match goal { Goal::Build => NodePlan { context: Context { @@ -237,13 +242,21 @@ pub fn plan_for_node( build_id_names: Arc::new(Mutex::new(HashMap::new())), }, steps: vec![ - Step::Evaluate(Evaluate), + Step::Evaluate(Evaluate { cached_evaluation }), Step::Build(Build { target: None }), ], - greedy_evaluate: true, + greedy_evaluate, ignore_failed_ping: false, }, - Goal::Apply(args) => apply_plan(args, node, &name, *modifiers, hive_location, should_quit), + Goal::Apply(args) => apply_plan( + args, + node, + &name, + *modifiers, + hive_location, + should_quit, + cached_evaluation, + ), } } @@ -304,11 +317,18 @@ mod tests { location.clone().into(), &SubCommandModifiers::default(), should_quit.clone(), + None, ); assert_eq!( plan.steps, - vec![Evaluate.into(), Build { target: None }.into()] + vec![ + Evaluate { + cached_evaluation: None + } + .into(), + Build { target: None }.into() + ] ); } @@ -337,6 +357,7 @@ mod tests { location.clone().into(), &SubCommandModifiers::default(), should_quit.clone(), + None, ); assert_eq!( @@ -346,7 +367,10 @@ mod tests { target: target.clone() } .into(), - crate::hive::steps::evaluate::Evaluate.into(), + crate::hive::steps::evaluate::Evaluate { + cached_evaluation: None + } + .into(), crate::hive::steps::push::PushEvaluatedOutput { substitute_on_destination: true, target: target.clone() @@ -378,6 +402,7 @@ mod tests { location.clone().into(), &SubCommandModifiers::default(), should_quit.clone(), + None, ); assert_eq!( @@ -387,7 +412,10 @@ mod tests { target: target.clone() } .into(), - crate::hive::steps::evaluate::Evaluate.into(), + crate::hive::steps::evaluate::Evaluate { + cached_evaluation: None + } + .into(), crate::hive::steps::build::Build { target: None }.into(), crate::hive::steps::push::PushBuildOutput { substitute_on_destination: true, @@ -428,6 +456,7 @@ mod tests { location.clone().into(), &SubCommandModifiers::default(), should_quit.clone(), + None, ); assert_eq!( @@ -488,6 +517,7 @@ mod tests { location.clone().into(), &SubCommandModifiers::default(), should_quit.clone(), + None, ); assert_eq!( @@ -554,6 +584,7 @@ mod tests { location.clone().into(), &SubCommandModifiers::default(), should_quit.clone(), + None, ); assert_eq!( @@ -563,7 +594,10 @@ mod tests { target: target.clone() } .into(), - Evaluate.into(), + Evaluate { + cached_evaluation: None + } + .into(), PushEvaluatedOutput { substitute_on_destination: true, target: target.clone() @@ -598,6 +632,7 @@ mod tests { location.clone().into(), &SubCommandModifiers::default(), should_quit.clone(), + None, ); assert_eq!( @@ -607,7 +642,10 @@ mod tests { target: target.clone() } .into(), - Evaluate.into(), + Evaluate { + cached_evaluation: None + } + .into(), PushEvaluatedOutput { substitute_on_destination: true, target: target.clone() @@ -654,6 +692,7 @@ mod tests { location.clone().into(), &SubCommandModifiers::default(), should_quit.clone(), + None, ); assert_eq!( @@ -663,7 +702,10 @@ mod tests { target: target.clone() } .into(), - Evaluate.into(), + Evaluate { + cached_evaluation: None + } + .into(), PushEvaluatedOutput { substitute_on_destination: true, target: target.clone() @@ -705,12 +747,16 @@ mod tests { location.clone().into(), &SubCommandModifiers::default(), should_quit.clone(), + None, ); assert_eq!( plan.steps, vec![ - Evaluate.into(), + Evaluate { + cached_evaluation: None + } + .into(), Build { target: None }.into(), SwitchToConfiguration { goal: SwitchToConfigurationGoal::Switch, diff --git a/crates/core/src/hive/steps/evaluate.rs b/crates/core/src/hive/steps/evaluate.rs index 2573b8a3..7b227988 100644 --- a/crates/core/src/hive/steps/evaluate.rs +++ b/crates/core/src/hive/steps/evaluate.rs @@ -3,15 +3,18 @@ use std::fmt::Display; -use tracing::instrument; +use tracing::{info, instrument}; use crate::{ - HiveLibError, + HiveLibError, SafeStorePath, hive::node::{Context, ExecuteStep}, }; #[derive(Debug, PartialEq)] -pub struct Evaluate; +pub struct Evaluate { + /// evaluation that was previously built & cached + pub cached_evaluation: Option>, +} impl Display for Evaluate { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { @@ -22,9 +25,17 @@ impl Display for Evaluate { impl ExecuteStep for Evaluate { #[instrument(skip_all, name = "eval")] async fn execute(&self, ctx: &mut Context) -> Result<(), HiveLibError> { - let rx = ctx.state.evaluation_rx.take().unwrap(); - - ctx.state.evaluation = Some(rx.await.unwrap()?); + if let Some(ref cached_evaluation) = self.cached_evaluation { + info!( + "Skipping evaluation, cached as {}", + cached_evaluation.to_absolute_path() + ); + ctx.state.evaluation = Some(cached_evaluation.clone()); + } else { + let rx = ctx.state.evaluation_rx.take().unwrap(); + + ctx.state.evaluation = Some(rx.await.unwrap()?); + } Ok(()) } diff --git a/crates/nix_client/src/lib.rs b/crates/nix_client/src/lib.rs index 9d0ef6f6..17b0127f 100644 --- a/crates/nix_client/src/lib.rs +++ b/crates/nix_client/src/lib.rs @@ -767,7 +767,7 @@ where return Err(NixDaemonClientError::NixDaemonProtocolVersion { wanted: ProtocolVersion::from_parts(1, 19), have: self.writer.version(), - operation: "QueryMissing".into(), + operation: "QueryDerivationOutputMap".into(), }); }