Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
rust nix daemon client if `--experimental-nix-client` is passed.
- Node `target` liveliness is now determined directly by initiating a nix daemon
handshake if `--experimental-nix-client` is passed.
- Node evaluations are now cached between wire invocations in flake hives,
invalidating old caches with a connection to the local nix daemon.

### Changed

Expand Down
43 changes: 42 additions & 1 deletion crates/cli/src/apply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use std::io::Read;
use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use thiserror::Error;
use tokio::sync::oneshot;
use tracing::{error, info};
use wire_core::cache::InspectionCache;
use wire_core::hive::executor::execute;
use wire_core::hive::node::{Name, Node};
use wire_core::hive::plan::{Goal, plan_for_node};
Expand Down Expand Up @@ -102,6 +104,7 @@ pub async fn apply<F>(
partition: Partitions,
make_goal: F,
mut modifiers: SubCommandModifiers,
cache: Arc<Option<InspectionCache>>,
) -> Result<()>
where
F: Fn(&Name, &Node) -> Goal,
Expand All @@ -122,6 +125,16 @@ where
.map(|(name, _)| name.clone())
.collect();

let mut cached_evaluations = if let Some(ref cache) = *cache.clone()
&& let HiveLocation::Flake { ref prefetch, .. } = *location
{
cache
.get_evaluations(prefetch, &selected_names, should_quit.clone())
.await
} else {
None
};

let num_selected = selected_names.len();

let partitioned_names = partition_arr(selected_names, &partition);
Expand All @@ -137,6 +150,8 @@ where
let _ = tx.send(UiMessage::AddMany(partitioned_names.clone()));
}

let mut evaluation_cache_tasks = Vec::new();

let mut set = hive
.nodes
.iter_mut()
Expand All @@ -151,8 +166,30 @@ where
location.clone(),
&modifiers,
should_quit.clone(),
cached_evaluations
.as_mut()
.and_then(|cache| cache.remove(name)),
);
execute(plan).map(move |result| (name, result))

let (sender, receiver) = oneshot::channel();

let location = location.clone();
let cache = cache.clone();
let cache_name = name.clone();

evaluation_cache_tasks.push(tokio::spawn(async move {
if let Some(ref cache) = *cache
&& let HiveLocation::Flake { ref prefetch, .. } = *location
&& let Ok(evaluated_path) = receiver.await
{
cache
.store_evaluation(prefetch, &cache_name, evaluated_path)
.await;
}
}));

let name_clone = name.clone();
execute(plan, sender).map(move |result| (name_clone, result))
})
.peekable();

Expand All @@ -163,6 +200,10 @@ where
let futures = futures::stream::iter(set).buffer_unordered(args.parallel);
let result = futures.collect::<Vec<_>>().await;

for task in evaluation_cache_tasks {
let _ = task.await;
}

let (successful, errors): (Vec<_>, Vec<_>) =
result
.into_iter()
Expand Down
12 changes: 7 additions & 5 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ async fn main() -> Result<()> {
let signals_task = tokio::spawn(handle_signals(signals, should_shutdown.clone()));

let location = get_hive_location(args.path, modifiers).await?;
let cache = InspectionCache::new().await;
let cache = Arc::new(InspectionCache::new().await);

match args.command {
cli::Commands::Apply(apply_args) => {
Expand All @@ -90,7 +90,7 @@ async fn main() -> Result<()> {

apply::apply(
&mut hive,
should_shutdown,
should_shutdown.clone(),
location,
apply_args.common,
Partitions::default(),
Expand All @@ -109,6 +109,7 @@ async fn main() -> Result<()> {
})
},
modifiers,
cache.clone(),
)
.await?;
}
Expand All @@ -117,12 +118,13 @@ async fn main() -> Result<()> {

apply::apply(
&mut hive,
should_shutdown,
should_shutdown.clone(),
location,
build_args.common,
build_args.partition.unwrap_or_default(),
|_name, _node| Goal::Build,
modifiers,
cache.clone(),
)
.await?;
}
Expand Down Expand Up @@ -150,8 +152,8 @@ async fn main() -> Result<()> {
}),
}

if let Some(cache) = cache {
cache.gc().await.into_diagnostic()?;
if let Some(cache) = &*cache {
cache.gc(should_shutdown).await.into_diagnostic()?;
}

signals_handle.close();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
create table evaluation_cache (
flake_path_digest blob,
flake_path_name text,
flake_hash_sri text,
node_name text,

output_path_digest blob not null,
output_path_name text not null,

primary key (flake_path_digest, flake_path_name, flake_hash_sri, node_name)
) strict;
Loading
Loading