From 28dec3a926eb5dc543fac11a13ebef5dde1537e2 Mon Sep 17 00:00:00 2001 From: David W Bitner Date: Thu, 20 Nov 2025 11:33:06 -0600 Subject: [PATCH 1/8] Add ability to compare/sort/merge items using a sortby config --- crates/core/Cargo.toml | 3 + crates/core/src/lib.rs | 2 + crates/core/src/sort.rs | 525 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 530 insertions(+) create mode 100644 crates/core/src/sort.rs diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index ebbf6bf82..3937e5e85 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -44,10 +44,13 @@ indexmap.workspace = true log.workspace = true mime.workspace = true parquet = { workspace = true, optional = true } +compare = "0.1.0" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, features = ["preserve_order"] } stac-derive.workspace = true thiserror.workspace = true +stream-kmerge = "0.2.0" +futures = { workspace = true } tracing.workspace = true url = { workspace = true, features = ["serde"] } wkb = { workspace = true, optional = true } diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 7bed8dc05..1d309479a 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -123,6 +123,8 @@ pub mod link; mod migrate; pub mod mime; mod ndjson; +/// Sort STAC items. +pub mod sort; mod statistics; mod value; mod version; diff --git a/crates/core/src/sort.rs b/crates/core/src/sort.rs new file mode 100644 index 000000000..5a6109b41 --- /dev/null +++ b/crates/core/src/sort.rs @@ -0,0 +1,525 @@ +use crate::Item; +use compare::Compare; +use futures::Stream; +use serde::Deserialize; +use serde_json::Value; +use std::cmp::Ordering; +use stream_kmerge::kmerge_by; + +#[derive(Debug, Deserialize)] +struct SortConfig { + sortby: Vec, +} + +#[derive(Debug, Deserialize, Clone)] +struct SortField { + field: String, + direction: Direction, +} + +#[derive(Debug, Deserialize, PartialEq, Clone, Copy)] +#[serde(rename_all = "lowercase")] +enum Direction { + Asc, + Desc, +} + +/// A comparator for STAC Items. +/// +/// This struct implements [compare::Compare] for [Item], allowing it to be used +/// to sort items based on a configuration. +/// +/// # Examples +/// +/// ``` +/// use stac::{Item, sort::ItemComparator}; +/// use serde_json::json; +/// +/// let mut items = vec![Item::new("b"), Item::new("a")]; +/// let config = json!({ +/// "sortby": [ +/// { "field": "id", "direction": "asc" } +/// ] +/// }); +/// let comparator = ItemComparator::new(config).unwrap(); +/// comparator.sort(&mut items); +/// assert_eq!(items[0].id, "a"); +/// ``` +#[derive(Debug, Clone)] +pub struct ItemComparator { + sort_fields: Vec, +} + +impl ItemComparator { + /// Creates a new `ItemComparator` from a JSON configuration. + /// + /// The configuration should be a JSON object with a `sortby` field, which is + /// a list of objects with `field` and `direction` fields. + /// + /// # Examples + /// + /// ``` + /// use stac::sort::ItemComparator; + /// use serde_json::json; + /// + /// let config = json!({ + /// "sortby": [ + /// { "field": "id", "direction": "asc" } + /// ] + /// }); + /// let comparator = ItemComparator::new(config).unwrap(); + /// ``` + pub fn new(config: Value) -> Result { + let config: SortConfig = serde_json::from_value(config)?; + Ok(Self { + sort_fields: config.sortby, + }) + } + + /// Sorts a mutable slice of items. + /// + /// # Examples + /// + /// ``` + /// use stac::{Item, sort::ItemComparator}; + /// use serde_json::json; + /// + /// let mut items = vec![Item::new("b"), Item::new("a")]; + /// let config = json!({ + /// "sortby": [ + /// { "field": "id", "direction": "asc" } + /// ] + /// }); + /// let comparator = ItemComparator::new(config).unwrap(); + /// comparator.sort(&mut items); + /// assert_eq!(items[0].id, "a"); + /// ``` + pub fn sort(&self, items: &mut [Item]) { + items.sort_by(|a, b| self.compare(a, b)); + } +} + +impl Default for ItemComparator { + /// Creates a new `ItemComparator` with the default sort order. + /// + /// The default sort order is `datetime` descending, followed by `id` ascending. + fn default() -> Self { + Self { + sort_fields: vec![ + SortField { + field: "datetime".to_string(), + direction: Direction::Desc, + }, + SortField { + field: "id".to_string(), + direction: Direction::Asc, + }, + ], + } + } +} + +impl Compare for ItemComparator { + fn compare(&self, l: &Item, r: &Item) -> Ordering { + for sort_field in &self.sort_fields { + let ord = compare_field(l, r, &sort_field.field); + if ord != Ordering::Equal { + return match sort_field.direction { + Direction::Asc => ord, + Direction::Desc => ord.reverse(), + }; + } + } + Ordering::Equal + } +} + +fn compare_field(l: &Item, r: &Item, field: &str) -> Ordering { + match field { + "id" => l.id.cmp(&r.id), + "datetime" => { + let l_dt = l.properties.datetime.or(l.properties.start_datetime); + let r_dt = r.properties.datetime.or(r.properties.start_datetime); + l_dt.cmp(&r_dt) + } + "start_datetime" => { + let l_dt = l.properties.start_datetime.or(l.properties.datetime); + let r_dt = r.properties.start_datetime.or(r.properties.datetime); + l_dt.cmp(&r_dt) + } + "end_datetime" => l.properties.end_datetime.cmp(&r.properties.end_datetime), + "title" => l.properties.title.cmp(&r.properties.title), + "description" => l.properties.description.cmp(&r.properties.description), + "created" => l.properties.created.cmp(&r.properties.created), + "updated" => l.properties.updated.cmp(&r.properties.updated), + "collection" => l.collection.cmp(&r.collection), + _ => { + compare_values(l.properties.additional_fields.get(field), r.properties.additional_fields.get(field)) + } + } +} + +fn compare_values(l: Option<&Value>, r: Option<&Value>) -> Ordering { + match (l, r) { + (None, None) => Ordering::Equal, + (None, Some(_)) => Ordering::Less, + (Some(_), None) => Ordering::Greater, + (Some(l_val), Some(r_val)) => compare_json_values(l_val, r_val), + } +} + +fn compare_json_values(l: &Value, r: &Value) -> Ordering { + match (l, r) { + (Value::Null, Value::Null) => Ordering::Equal, + (Value::Null, _) => Ordering::Less, + (_, Value::Null) => Ordering::Greater, + (Value::Bool(a), Value::Bool(b)) => a.cmp(b), + (Value::Number(a), Value::Number(b)) => { + if let (Some(a_f), Some(b_f)) = (a.as_f64(), b.as_f64()) { + a_f.partial_cmp(&b_f).unwrap_or(Ordering::Equal) + } else if let (Some(a_i), Some(b_i)) = (a.as_i64(), b.as_i64()) { + a_i.cmp(&b_i) + } else if let (Some(a_u), Some(b_u)) = (a.as_u64(), b.as_u64()) { + a_u.cmp(&b_u) + } else { + Ordering::Equal + } + } + (Value::String(a), Value::String(b)) => a.cmp(b), + (Value::Array(a), Value::Array(b)) => { + let len = std::cmp::min(a.len(), b.len()); + for i in 0..len { + let ord = compare_json_values(&a[i], &b[i]); + if ord != Ordering::Equal { + return ord; + } + } + a.len().cmp(&b.len()) + } + (Value::Object(_), Value::Object(_)) => Ordering::Equal, + (Value::Bool(_), _) => Ordering::Less, + (_, Value::Bool(_)) => Ordering::Greater, + (Value::Number(_), _) => Ordering::Less, + (_, Value::Number(_)) => Ordering::Greater, + (Value::String(_), _) => Ordering::Less, + (_, Value::String(_)) => Ordering::Greater, + (Value::Array(_), _) => Ordering::Less, + (_, Value::Array(_)) => Ordering::Greater, + } +} + +/// Creates a function that returns a struct that implements Compare from the compare crate that can be used to compare stac items. +/// +/// # Examples +/// +/// ``` +/// use stac::{Item, sort::item_comparator}; +/// use serde_json::json; +/// +/// let mut items = vec![Item::new("b"), Item::new("a")]; +/// let config = json!({ +/// "sortby": [ +/// { "field": "id", "direction": "asc" } +/// ] +/// }); +/// let comparator = item_comparator(config).unwrap(); +/// comparator.sort(&mut items); +/// assert_eq!(items[0].id, "a"); +/// ``` +pub fn item_comparator(config: Value) -> Result { + ItemComparator::new(config) +} + +/// Sorts multiple streams of items into a single sorted stream. +/// +/// # Examples +/// +/// ``` +/// use stac::{Item, sort::sort_streams}; +/// use serde_json::json; +/// use futures::stream::{self, StreamExt}; +/// +/// # tokio_test::block_on(async { +/// let stream1 = stream::iter(vec![Item::new("a"), Item::new("c")]); +/// let stream2 = stream::iter(vec![Item::new("b"), Item::new("d")]); +/// let config = json!({ +/// "sortby": [ +/// { "field": "id", "direction": "asc" } +/// ] +/// }); +/// let mut sorted = sort_streams(vec![stream1, stream2], config).unwrap(); +/// assert_eq!(sorted.next().await.unwrap().id, "a"); +/// assert_eq!(sorted.next().await.unwrap().id, "b"); +/// assert_eq!(sorted.next().await.unwrap().id, "c"); +/// assert_eq!(sorted.next().await.unwrap().id, "d"); +/// # }); +/// ``` +pub fn sort_streams( + streams: I, + config: Value, +) -> Result, serde_json::Error> +where + S: Stream + Unpin, + I: IntoIterator, +{ + let comparator = ItemComparator::new(config)?; + Ok(kmerge_by(streams, move |a, b| { + comparator.compare(a, b).reverse() + })) +} + +#[cfg(test)] +mod tests { + use super::*; + use serde_json::json; + + #[test] + fn test_sort() { + let mut items = vec![ + Item::new("c"), + Item::new("a"), + Item::new("b"), + ]; + let config = json!({ + "sortby": [ + { "field": "id", "direction": "asc" } + ] + }); + let comparator = item_comparator(config).unwrap(); + items.sort_by(|a, b| comparator.compare(a, b)); + assert_eq!(items[0].id, "a"); + assert_eq!(items[1].id, "b"); + assert_eq!(items[2].id, "c"); + } + + #[test] + fn test_sort_desc() { + let mut items = vec![ + Item::new("c"), + Item::new("a"), + Item::new("b"), + ]; + let config = json!({ + "sortby": [ + { "field": "id", "direction": "desc" } + ] + }); + let comparator = item_comparator(config).unwrap(); + items.sort_by(|a, b| comparator.compare(a, b)); + assert_eq!(items[0].id, "c"); + assert_eq!(items[1].id, "b"); + assert_eq!(items[2].id, "a"); + } + + #[test] + fn test_sort_datetime() { + let mut item1 = Item::new("1"); + item1.properties.datetime = Some("2023-01-01T00:00:00Z".parse().unwrap()); + let mut item2 = Item::new("2"); + item2.properties.datetime = Some("2023-01-02T00:00:00Z".parse().unwrap()); + + let mut items = vec![item2.clone(), item1.clone()]; + let config = json!({ + "sortby": [ + { "field": "datetime", "direction": "asc" } + ] + }); + let comparator = item_comparator(config).unwrap(); + items.sort_by(|a, b| comparator.compare(a, b)); + assert_eq!(items[0].id, "1"); + assert_eq!(items[1].id, "2"); + } + + #[test] + fn test_sort_method_direct() { + let mut items = vec![ + Item::new("b"), + Item::new("a"), + ]; + let config = json!({ + "sortby": [ + { "field": "id", "direction": "asc" } + ] + }); + let comparator = item_comparator(config).unwrap(); + comparator.sort(&mut items); + assert_eq!(items[0].id, "a"); + assert_eq!(items[1].id, "b"); + } + + #[test] + fn test_default() { + let mut item1 = Item::new("1"); + item1.properties.datetime = Some("2023-01-01T00:00:00Z".parse().unwrap()); + let mut item2 = Item::new("2"); + item2.properties.datetime = Some("2023-01-02T00:00:00Z".parse().unwrap()); + let mut item3 = Item::new("3"); + item3.properties.datetime = Some("2023-01-01T00:00:00Z".parse().unwrap()); + + let mut items = vec![item1.clone(), item2.clone(), item3.clone()]; + let comparator = ItemComparator::default(); + comparator.sort(&mut items); + + // datetime desc, so item2 (Jan 2) comes first + assert_eq!(items[0].id, "2"); + // then item1 and item3 (Jan 1). id asc, so item1 comes before item3 + assert_eq!(items[1].id, "1"); + assert_eq!(items[2].id, "3"); + } + + #[test] + fn test_sort_datetime_fallback() { + let mut item1 = Item::new("1"); + item1.properties.datetime = None; + item1.properties.start_datetime = Some("2023-01-01T00:00:00Z".parse().unwrap()); + // datetime is None + + let mut item2 = Item::new("2"); + item2.properties.datetime = Some("2023-01-02T00:00:00Z".parse().unwrap()); + // start_datetime is None + + let mut items = vec![item2.clone(), item1.clone()]; + let config = json!({ + "sortby": [ + { "field": "datetime", "direction": "asc" } + ] + }); + let comparator = item_comparator(config).unwrap(); + comparator.sort(&mut items); + + // item1 (Jan 1 via start_datetime) < item2 (Jan 2 via datetime) + assert_eq!(items[0].id, "1"); + assert_eq!(items[1].id, "2"); + } + + #[test] + fn test_sort_start_datetime_fallback() { + let mut item1 = Item::new("1"); + item1.properties.datetime = Some("2023-01-01T00:00:00Z".parse().unwrap()); + // start_datetime is None + + let mut item2 = Item::new("2"); + item2.properties.start_datetime = Some("2023-01-02T00:00:00Z".parse().unwrap()); + // datetime is None + + let mut items = vec![item2.clone(), item1.clone()]; + let config = json!({ + "sortby": [ + { "field": "start_datetime", "direction": "asc" } + ] + }); + let comparator = item_comparator(config).unwrap(); + comparator.sort(&mut items); + + // item1 (Jan 1 via datetime) < item2 (Jan 2 via start_datetime) + assert_eq!(items[0].id, "1"); + assert_eq!(items[1].id, "2"); + } + + #[test] + fn test_complex_sort() { + let mut items = Vec::new(); + + // Collection A, Date 2, ID 1 + let mut item1 = Item::new("1"); + item1.collection = Some("A".to_string()); + item1.properties.datetime = Some("2023-01-02T00:00:00Z".parse().unwrap()); + items.push(item1); + + // Collection A, Date 2, ID 2 + let mut item2 = Item::new("2"); + item2.collection = Some("A".to_string()); + item2.properties.datetime = Some("2023-01-02T00:00:00Z".parse().unwrap()); + items.push(item2); + + // Collection A, Date 1, ID 3 + let mut item3 = Item::new("3"); + item3.collection = Some("A".to_string()); + item3.properties.datetime = Some("2023-01-01T00:00:00Z".parse().unwrap()); + items.push(item3); + + // Collection B, Date 2, ID 4 + let mut item4 = Item::new("4"); + item4.collection = Some("B".to_string()); + item4.properties.datetime = Some("2023-01-02T00:00:00Z".parse().unwrap()); + items.push(item4); + + // Collection B, Date 2, ID 5 + let mut item5 = Item::new("5"); + item5.collection = Some("B".to_string()); + item5.properties.datetime = Some("2023-01-02T00:00:00Z".parse().unwrap()); + items.push(item5); + + // Sort by: Collection ASC, Datetime DESC, ID DESC + let config = json!({ + "sortby": [ + { "field": "collection", "direction": "asc" }, + { "field": "datetime", "direction": "desc" }, + { "field": "id", "direction": "desc" } + ] + }); + + let comparator = item_comparator(config).unwrap(); + comparator.sort(&mut items); + + // Expected order: + // Collection A: + // Date 2: + // ID 2 (desc) + // ID 1 + // Date 1: + // ID 3 + // Collection B: + // Date 2: + // ID 5 (desc) + // ID 4 + + assert_eq!(items[0].id, "2"); + assert_eq!(items[1].id, "1"); + assert_eq!(items[2].id, "3"); + assert_eq!(items[3].id, "5"); + assert_eq!(items[4].id, "4"); + } + + #[test] + fn test_three_fields_with_custom_property() { + let mut items = Vec::new(); + + // Create items with custom property "cloud_cover" + for i in 0..6 { + let mut item = Item::new(i.to_string()); + let _ = item.properties.additional_fields.insert("cloud_cover".to_string(), json!(i % 2)); // 0, 1, 0, 1, 0, 1 + item.properties.datetime = Some("2023-01-01T00:00:00Z".parse().unwrap()); + items.push(item); + } + // Items: + // 0: cloud=0 + // 1: cloud=1 + // 2: cloud=0 + // 3: cloud=1 + // 4: cloud=0 + // 5: cloud=1 + + // Sort by: cloud_cover ASC, id DESC + let config = json!({ + "sortby": [ + { "field": "cloud_cover", "direction": "asc" }, + { "field": "id", "direction": "desc" } + ] + }); + + let comparator = item_comparator(config).unwrap(); + comparator.sort(&mut items); + + // Expected: + // cloud=0: 4, 2, 0 + // cloud=1: 5, 3, 1 + + assert_eq!(items[0].id, "4"); + assert_eq!(items[1].id, "2"); + assert_eq!(items[2].id, "0"); + assert_eq!(items[3].id, "5"); + assert_eq!(items[4].id, "3"); + assert_eq!(items[5].id, "1"); + } +} From 2141284528bbe63f8ccd0f826fac69c387e3e234 Mon Sep 17 00:00:00 2001 From: David W Bitner Date: Thu, 20 Nov 2025 11:41:01 -0600 Subject: [PATCH 2/8] linting --- crates/core/src/sort.rs | 45 +++++++++++++++++------------------------ 1 file changed, 19 insertions(+), 26 deletions(-) diff --git a/crates/core/src/sort.rs b/crates/core/src/sort.rs index 5a6109b41..8996e5c29 100644 --- a/crates/core/src/sort.rs +++ b/crates/core/src/sort.rs @@ -153,9 +153,10 @@ fn compare_field(l: &Item, r: &Item, field: &str) -> Ordering { "created" => l.properties.created.cmp(&r.properties.created), "updated" => l.properties.updated.cmp(&r.properties.updated), "collection" => l.collection.cmp(&r.collection), - _ => { - compare_values(l.properties.additional_fields.get(field), r.properties.additional_fields.get(field)) - } + _ => compare_values( + l.properties.additional_fields.get(field), + r.properties.additional_fields.get(field), + ), } } @@ -187,14 +188,14 @@ fn compare_json_values(l: &Value, r: &Value) -> Ordering { } (Value::String(a), Value::String(b)) => a.cmp(b), (Value::Array(a), Value::Array(b)) => { - let len = std::cmp::min(a.len(), b.len()); - for i in 0..len { - let ord = compare_json_values(&a[i], &b[i]); - if ord != Ordering::Equal { - return ord; - } - } - a.len().cmp(&b.len()) + let len = std::cmp::min(a.len(), b.len()); + for i in 0..len { + let ord = compare_json_values(&a[i], &b[i]); + if ord != Ordering::Equal { + return ord; + } + } + a.len().cmp(&b.len()) } (Value::Object(_), Value::Object(_)) => Ordering::Equal, (Value::Bool(_), _) => Ordering::Less, @@ -275,11 +276,7 @@ mod tests { #[test] fn test_sort() { - let mut items = vec![ - Item::new("c"), - Item::new("a"), - Item::new("b"), - ]; + let mut items = vec![Item::new("c"), Item::new("a"), Item::new("b")]; let config = json!({ "sortby": [ { "field": "id", "direction": "asc" } @@ -294,11 +291,7 @@ mod tests { #[test] fn test_sort_desc() { - let mut items = vec![ - Item::new("c"), - Item::new("a"), - Item::new("b"), - ]; + let mut items = vec![Item::new("c"), Item::new("a"), Item::new("b")]; let config = json!({ "sortby": [ { "field": "id", "direction": "desc" } @@ -332,10 +325,7 @@ mod tests { #[test] fn test_sort_method_direct() { - let mut items = vec![ - Item::new("b"), - Item::new("a"), - ]; + let mut items = vec![Item::new("b"), Item::new("a")]; let config = json!({ "sortby": [ { "field": "id", "direction": "asc" } @@ -488,7 +478,10 @@ mod tests { // Create items with custom property "cloud_cover" for i in 0..6 { let mut item = Item::new(i.to_string()); - let _ = item.properties.additional_fields.insert("cloud_cover".to_string(), json!(i % 2)); // 0, 1, 0, 1, 0, 1 + let _ = item + .properties + .additional_fields + .insert("cloud_cover".to_string(), json!(i % 2)); // 0, 1, 0, 1, 0, 1 item.properties.datetime = Some("2023-01-01T00:00:00Z".parse().unwrap()); items.push(item); } From 8a91febfbf947fd6891e04b811c031663f2383da Mon Sep 17 00:00:00 2001 From: David W Bitner Date: Thu, 20 Nov 2025 11:43:46 -0600 Subject: [PATCH 3/8] add more tests --- crates/core/src/sort.rs | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/crates/core/src/sort.rs b/crates/core/src/sort.rs index 8996e5c29..eb327085d 100644 --- a/crates/core/src/sort.rs +++ b/crates/core/src/sort.rs @@ -515,4 +515,30 @@ mod tests { assert_eq!(items[4].id, "3"); assert_eq!(items[5].id, "1"); } + + #[test] + fn test_sort_streams() { + use futures::stream::{self, StreamExt}; + + let stream1 = stream::iter(vec![Item::new("a"), Item::new("c")]); + let stream2 = stream::iter(vec![Item::new("b"), Item::new("d")]); + let config = json!({ + "sortby": [ + { "field": "id", "direction": "asc" } + ] + }); + let mut sorted = sort_streams(vec![stream1, stream2], config).unwrap(); + + let mut items = Vec::new(); + tokio_test::block_on(async { + while let Some(item) = sorted.next().await { + items.push(item); + } + }); + + assert_eq!(items[0].id, "a"); + assert_eq!(items[1].id, "b"); + assert_eq!(items[2].id, "c"); + assert_eq!(items[3].id, "d"); + } } From 93c708b160561356307575b8cc51f96f3c6567ef Mon Sep 17 00:00:00 2001 From: David W Bitner Date: Thu, 20 Nov 2025 11:46:31 -0600 Subject: [PATCH 4/8] add changelog --- crates/core/CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/crates/core/CHANGELOG.md b/crates/core/CHANGELOG.md index ab683f923..40307c352 100644 --- a/crates/core/CHANGELOG.md +++ b/crates/core/CHANGELOG.md @@ -8,6 +8,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), ### Added +- Add `stac::sort` module for sorting items and streams of items based on a JSON configuration ([#856](https://github.com/stac-utils/rustac/pull/856)) - Add `max_row_group_size` parameter to `geoparquet::WriterBuilder` with default value of `150_000` ([#846](https://github.com/stac-utils/rustac/pull/846)) ## [0.14.0] - 2025-11-14 From d09c84e7d84ccdc7d377bc536754bc6669630379 Mon Sep 17 00:00:00 2001 From: David W Bitner Date: Thu, 20 Nov 2025 12:12:37 -0600 Subject: [PATCH 5/8] remove usage of the compare trait --- crates/core/Cargo.toml | 1 - crates/core/src/sort.rs | 35 ++++++++++++++++------------------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 3937e5e85..d1f161406 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -44,7 +44,6 @@ indexmap.workspace = true log.workspace = true mime.workspace = true parquet = { workspace = true, optional = true } -compare = "0.1.0" serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true, features = ["preserve_order"] } stac-derive.workspace = true diff --git a/crates/core/src/sort.rs b/crates/core/src/sort.rs index eb327085d..9b92f1c10 100644 --- a/crates/core/src/sort.rs +++ b/crates/core/src/sort.rs @@ -1,5 +1,4 @@ use crate::Item; -use compare::Compare; use futures::Stream; use serde::Deserialize; use serde_json::Value; @@ -26,8 +25,7 @@ enum Direction { /// A comparator for STAC Items. /// -/// This struct implements [compare::Compare] for [Item], allowing it to be used -/// to sort items based on a configuration. +/// This struct allows it to be used to sort items based on a configuration. /// /// # Examples /// @@ -97,6 +95,20 @@ impl ItemComparator { pub fn sort(&self, items: &mut [Item]) { items.sort_by(|a, b| self.compare(a, b)); } + + /// Compares two items. + pub fn compare(&self, l: &Item, r: &Item) -> Ordering { + for sort_field in &self.sort_fields { + let ord = compare_field(l, r, &sort_field.field); + if ord != Ordering::Equal { + return match sort_field.direction { + Direction::Asc => ord, + Direction::Desc => ord.reverse(), + }; + } + } + Ordering::Equal + } } impl Default for ItemComparator { @@ -119,21 +131,6 @@ impl Default for ItemComparator { } } -impl Compare for ItemComparator { - fn compare(&self, l: &Item, r: &Item) -> Ordering { - for sort_field in &self.sort_fields { - let ord = compare_field(l, r, &sort_field.field); - if ord != Ordering::Equal { - return match sort_field.direction { - Direction::Asc => ord, - Direction::Desc => ord.reverse(), - }; - } - } - Ordering::Equal - } -} - fn compare_field(l: &Item, r: &Item, field: &str) -> Ordering { match field { "id" => l.id.cmp(&r.id), @@ -209,7 +206,7 @@ fn compare_json_values(l: &Value, r: &Value) -> Ordering { } } -/// Creates a function that returns a struct that implements Compare from the compare crate that can be used to compare stac items. +/// Creates a function that returns a struct that can be used to compare stac items. /// /// # Examples /// From 3df3f3389b4ad2a242c13d684c9b6f24134d698a Mon Sep 17 00:00:00 2001 From: David W Bitner Date: Tue, 10 Mar 2026 16:21:28 -0500 Subject: [PATCH 6/8] sortable impl, feature guard streams --- crates/core/Cargo.toml | 5 ++-- crates/core/src/item_collection.rs | 28 +++++++++++++++++ crates/core/src/lib.rs | 1 + crates/core/src/sort.rs | 48 +++++++++++++++++++++++++++++- 4 files changed, 79 insertions(+), 3 deletions(-) diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 095c555fe..33a861ccc 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -12,6 +12,7 @@ repository.workspace = true rust-version.workspace = true [features] +stream = ["dep:futures", "dep:stream-kmerge"] std = [] geo = ["dep:geo"] geoarrow = [ @@ -51,8 +52,8 @@ serde_json = { workspace = true, features = ["preserve_order"] } serde_urlencoded.workspace = true stac-derive = { version = "0.3.0", path = "../derive" } thiserror.workspace = true -stream-kmerge = "0.2.0" -futures = { workspace = true } +stream-kmerge = { version = "0.2.0", optional = true } +futures = { workspace = true, optional = true } tracing.workspace = true url = { workspace = true, features = ["serde"] } wkb = { workspace = true, optional = true } diff --git a/crates/core/src/item_collection.rs b/crates/core/src/item_collection.rs index 5c0f6186c..d99e72fd1 100644 --- a/crates/core/src/item_collection.rs +++ b/crates/core/src/item_collection.rs @@ -56,6 +56,34 @@ pub struct ItemCollection { self_href: Option, } +impl ItemCollection { + /// Sorts the items in this collection. + /// + /// If a `config` is provided, it must be a valid JSON representation of a SortConfig. + /// If `None`, the default [`crate::sort::ItemComparator`] is used. + /// + /// # Examples + /// + /// ``` + /// use stac::{Item, ItemCollection}; + /// use serde_json::json; + /// + /// let mut ic = ItemCollection::from(vec![Item::new("b"), Item::new("a")]); + /// ic.sort(Some(json!({ + /// "sortby": [ + /// { "field": "id", "direction": "asc" } + /// ] + /// }))).unwrap(); + /// assert_eq!(ic.items[0].id, "a"); + /// ``` + pub fn sort(&mut self, config: Option) -> Result<()> { + use crate::sort::Sortable; + let items = std::mem::take(&mut self.items); + self.items = items.sort(config)?; + Ok(()) + } +} + impl From> for ItemCollection { fn from(items: Vec) -> Self { ItemCollection { diff --git a/crates/core/src/lib.rs b/crates/core/src/lib.rs index 519edf0fb..74e372c39 100644 --- a/crates/core/src/lib.rs +++ b/crates/core/src/lib.rs @@ -151,6 +151,7 @@ pub use json::{FromJson, ToJson}; pub use link::{Link, Links}; pub use migrate::Migrate; pub use ndjson::{FromNdjson, ToNdjson}; +pub use sort::Sortable; pub use statistics::Statistics; pub use value::Value; pub use version::Version; diff --git a/crates/core/src/sort.rs b/crates/core/src/sort.rs index 9b92f1c10..50a9ea563 100644 --- a/crates/core/src/sort.rs +++ b/crates/core/src/sort.rs @@ -1,10 +1,15 @@ use crate::Item; -use futures::Stream; use serde::Deserialize; use serde_json::Value; use std::cmp::Ordering; + + +#[cfg(feature = "stream")] +use futures::Stream; +#[cfg(feature = "stream")] use stream_kmerge::kmerge_by; + #[derive(Debug, Deserialize)] struct SortConfig { sortby: Vec, @@ -48,7 +53,44 @@ pub struct ItemComparator { sort_fields: Vec, } + +/// A trait for sorting iterables of STAC items. +pub trait Sortable { + /// Sorts the internal items. + /// + /// # Examples + /// + /// ``` + /// use stac::Item; + /// use stac::sort::Sortable; + /// use serde_json::json; + /// + /// let items = vec![Item::new("1"), Item::new("2")]; + /// let config = json!({ + /// "sortby": [ + /// { "field": "id", "direction": "desc" } + /// ] + /// }); + /// let items = items.sort(Some(config)).unwrap(); + /// assert_eq!(items[0].id, "2"); + /// ``` + fn sort(self, config: Option) -> crate::Result>; +} + +impl> Sortable for I { + fn sort(self, config: Option) -> crate::Result> { + let comparator = match config { + Some(config) => ItemComparator::new(config)?, + None => ItemComparator::default(), + }; + let mut items: Vec<_> = self.into_iter().collect(); + comparator.sort(&mut items); + Ok(items) + } +} + impl ItemComparator { + /// Creates a new `ItemComparator` from a JSON configuration. /// /// The configuration should be a JSON object with a `sortby` field, which is @@ -252,6 +294,9 @@ pub fn item_comparator(config: Value) -> Result( streams: I, config: Value, @@ -513,6 +558,7 @@ mod tests { assert_eq!(items[5].id, "1"); } + #[cfg(feature = "stream")] #[test] fn test_sort_streams() { use futures::stream::{self, StreamExt}; From 1f2a3d582632084226244342df6711d5170324ab Mon Sep 17 00:00:00 2001 From: David W Bitner Date: Tue, 10 Mar 2026 16:32:03 -0500 Subject: [PATCH 7/8] lint --- crates/core/src/sort.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/crates/core/src/sort.rs b/crates/core/src/sort.rs index 50a9ea563..493e34ade 100644 --- a/crates/core/src/sort.rs +++ b/crates/core/src/sort.rs @@ -3,13 +3,11 @@ use serde::Deserialize; use serde_json::Value; use std::cmp::Ordering; - #[cfg(feature = "stream")] use futures::Stream; #[cfg(feature = "stream")] use stream_kmerge::kmerge_by; - #[derive(Debug, Deserialize)] struct SortConfig { sortby: Vec, @@ -53,7 +51,6 @@ pub struct ItemComparator { sort_fields: Vec, } - /// A trait for sorting iterables of STAC items. pub trait Sortable { /// Sorts the internal items. @@ -90,7 +87,6 @@ impl> Sortable for I { } impl ItemComparator { - /// Creates a new `ItemComparator` from a JSON configuration. /// /// The configuration should be a JSON object with a `sortby` field, which is @@ -295,7 +291,6 @@ pub fn item_comparator(config: Value) -> Result( streams: I, From a73e1d831ebd68cdbb1a8c158c1aa51ac42fc1e8 Mon Sep 17 00:00:00 2001 From: David W Bitner Date: Wed, 11 Mar 2026 11:49:42 -0500 Subject: [PATCH 8/8] mostly ai generated test for federatedsearch --- crates/core/Cargo.toml | 2 +- crates/core/src/api/client.rs | 113 +++++++++ crates/core/src/api/federated.rs | 385 +++++++++++++++++++++++++++++++ crates/core/src/api/mod.rs | 6 + crates/core/src/sort.rs | 138 +++++++++++ 5 files changed, 643 insertions(+), 1 deletion(-) create mode 100644 crates/core/src/api/federated.rs diff --git a/crates/core/Cargo.toml b/crates/core/Cargo.toml index 33a861ccc..f7d6daf6a 100644 --- a/crates/core/Cargo.toml +++ b/crates/core/Cargo.toml @@ -62,7 +62,7 @@ wkb = { workspace = true, optional = true } assert-json-diff.workspace = true bytes.workspace = true rstest.workspace = true -tokio = { workspace = true, features = ["macros"] } +tokio = { workspace = true, features = ["macros", "rt"] } tokio-test.workspace = true [package.metadata.docs.rs] diff --git a/crates/core/src/api/client.rs b/crates/core/src/api/client.rs index 85b57a76c..a6a0dfe5c 100644 --- a/crates/core/src/api/client.rs +++ b/crates/core/src/api/client.rs @@ -2,6 +2,15 @@ use super::{ItemCollection, Items, Search}; use crate::{Collection, Error, Item}; use std::future::Future; +#[cfg(feature = "stream")] +use std::pin::Pin; + +/// A STAC API Item — a JSON map that may be a full or partial [`Item`]. +/// +/// Re-exported here to avoid importing from `super` in trait signatures. +#[cfg(feature = "stream")] +type ApiItem = super::Item; + /// A client that can search for STAC items. /// /// [`SearchClient::search`] is the only required method. [`SearchClient::item`] @@ -61,6 +70,110 @@ pub trait SearchClient: Send + Sync { } } +/// A client that can produce a paginated stream of STAC API items. +/// +/// This is the streaming counterpart to [`SearchClient::search`]. While +/// `SearchClient` returns a single page, `StreamSearchClient` returns a +/// stream that automatically pages through all results. +/// +/// For types that already implement [`SearchClient`], a default paginating +/// stream can be obtained via the [`stream_search_pages`] helper function. +/// +/// # Examples +/// +/// ```no_run +/// use stac::api::{Search, StreamSearchClient}; +/// use futures::StreamExt; +/// +/// # async fn example(client: impl StreamSearchClient) { +/// let mut stream = client.search_stream(Search::default()).await.unwrap(); +/// while let Some(result) = stream.next().await { +/// let item = result.unwrap(); +/// println!("{:?}", item.get("id")); +/// } +/// # } +/// ``` +#[cfg(feature = "stream")] +pub trait StreamSearchClient: Send + Sync { + /// The error type for this client. + type Error: Send; + + /// Returns a stream of API items by paging through results. + fn search_stream( + &self, + search: Search, + ) -> impl Future< + Output = Result< + Pin> + Send + '_>>, + Self::Error, + >, + > + Send; +} + +/// Creates a paginated stream from a [`SearchClient`]. +/// +/// This helper repeatedly calls [`SearchClient::search`], yielding each item +/// from each page. Pagination tokens are carried forward by merging +/// [`ItemCollection::next`] into [`Search::items::additional_fields`]. +/// +/// Use this inside a [`StreamSearchClient`] implementation to get token-based +/// pagination for free: +/// +/// ```ignore +/// impl StreamSearchClient for MyBackend { +/// type Error = MyError; +/// async fn search_stream(&self, search: Search) +/// -> Result> + Send + '_>>, Self::Error> +/// { +/// Ok(stream_search_pages(self, search)) +/// } +/// } +/// ``` +#[cfg(feature = "stream")] +pub fn stream_search_pages<'a, C>( + client: &'a C, + search: Search, +) -> Pin> + Send + 'a>> +where + C: SearchClient + ?Sized, +{ + use futures::stream; + + let stream = stream::unfold( + (client, Some(search), Vec::::new()), + |(client, next_search_opt, mut buffer)| async move { + // Yield buffered items first. + if !buffer.is_empty() { + let item = buffer.remove(0); + return Some((Ok(item), (client, next_search_opt, buffer))); + } + + // Fetch the next page. + let search = next_search_opt?; + match client.search(search.clone()).await { + Ok(page) => { + if page.items.is_empty() { + return None; + } + let next = page.next.and_then(|next_params| { + let mut next_search = search; + for (k, v) in next_params { + let _ = next_search.items.additional_fields.insert(k, v); + } + Some(next_search) + }); + let mut items = page.items; + let first = items.remove(0); + Some((Ok(first), (client, next, items))) + } + Err(e) => Some((Err(e), (client, None, Vec::new()))), + } + }, + ); + + Box::pin(stream) +} + /// A client that can retrieve STAC collections. /// /// [`CollectionSearchClient::collections`] is the only required method. diff --git a/crates/core/src/api/federated.rs b/crates/core/src/api/federated.rs new file mode 100644 index 000000000..4ba35b886 --- /dev/null +++ b/crates/core/src/api/federated.rs @@ -0,0 +1,385 @@ +use super::{Item, ItemCollection, Search, SearchClient, StreamSearchClient}; +use crate::Error; +use futures::StreamExt; +use std::future::Future; +use std::pin::Pin; + +/// A federated search client that fans out queries to multiple backends +/// and merges results into a single sorted stream. +/// +/// Each inner client is queried in parallel via +/// [`StreamSearchClient::search_stream`], and the per-client streams are +/// merged using [`sort_sortable_streams`](crate::sort::sort_sortable_streams) +/// so that the combined output respects the requested sort order. +/// +/// `FederatedSearchClient` itself implements both [`SearchClient`] and +/// [`StreamSearchClient`], so it can be composed with other clients or nested. +/// +/// # Examples +/// +/// ```no_run +/// use stac::api::{FederatedSearchClient, Search, SearchClient}; +/// +/// # async fn example + stac::api::StreamSearchClient>(clients: Vec) { +/// let federated = FederatedSearchClient::new(clients); +/// let results = federated.search(Search::default()).await.unwrap(); +/// # } +/// ``` +#[derive(Debug)] +pub struct FederatedSearchClient { + clients: Vec, +} + +impl FederatedSearchClient { + /// Creates a new federated client wrapping the given backends. + pub fn new(clients: Vec) -> Self { + Self { clients } + } + + /// Returns a reference to the inner clients. + pub fn clients(&self) -> &[C] { + &self.clients + } +} + +impl SearchClient for FederatedSearchClient +where + C: SearchClient + StreamSearchClient + Send + Sync, + ::Error: Into + Send + 'static, + ::Error: Into + Send + 'static, +{ + type Error = Error; + + fn search( + &self, + search: Search, + ) -> impl Future> + Send { + async move { + let mut stream = StreamSearchClient::search_stream(self, search).await?; + let mut items = Vec::new(); + while let Some(result) = stream.next().await { + items.push(result?); + } + Ok(ItemCollection::from(items)) + } + } +} + +impl StreamSearchClient for FederatedSearchClient +where + C: SearchClient + StreamSearchClient + Send + Sync, + ::Error: Into + Send + 'static, + ::Error: Into + Send + 'static, +{ + type Error = Error; + + fn search_stream( + &self, + search: Search, + ) -> impl Future< + Output = Result< + Pin> + Send + '_>>, + Self::Error, + >, + > + Send { + async move { + // Build the sort config from the search's sortby field. + let sort_config = if search.items.sortby.is_empty() { + // Use the default sort (datetime desc, id asc). + serde_json::json!({ + "sortby": [ + { "field": "datetime", "direction": "desc" }, + { "field": "id", "direction": "asc" } + ] + }) + } else { + serde_json::json!({"sortby": search.items.sortby}) + }; + + // Fan out: get a paging stream from each client. + let mut streams: Vec + Send + '_>>> = + Vec::with_capacity(self.clients.len()); + + for client in &self.clients { + let client_stream = client + .search_stream(search.clone()) + .await + .map_err(Into::into)?; + + // Map Result → Item, filtering out errors. + let mapped = client_stream.filter_map(|result| async move { + match result { + Ok(item) => Some(item), + Err(e) => { + let err: Error = e.into(); + tracing::warn!("federated client error, skipping: {err}"); + None + } + } + }); + streams.push(Box::pin(mapped)); + } + + // Merge all streams using sorted k-way merge. + let merged = + crate::sort::sort_sortable_streams(streams, sort_config).map_err(Error::from)?; + + // Wrap each yielded item back into Ok(...) for the trait signature. + let result_stream = merged.map(Ok); + + let result: Pin> + Send + '_>> = + Box::pin(result_stream); + Ok(result) + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::api::stream_search_pages; + use serde_json::{Map, Value, json}; + + /// A mock search client that returns pre-configured API items with + /// token-based pagination (using a "skip" key). + struct MockClient { + /// Items stored in this mock, already sorted in the order the + /// mock should return them. + items: Vec, + page_size: usize, + } + + impl MockClient { + fn new(items: Vec, page_size: usize) -> Self { + Self { items, page_size } + } + } + + impl SearchClient for MockClient { + type Error = Error; + + async fn search(&self, search: Search) -> Result { + let skip = search + .additional_fields + .get("skip") + .and_then(|v| v.as_u64()) + .unwrap_or(0) as usize; + + let page_items: Vec = self + .items + .iter() + .skip(skip) + .take(self.page_size) + .cloned() + .collect(); + + let mut ic = ItemCollection::from(page_items); + let next_skip = skip + self.page_size; + if next_skip < self.items.len() { + let mut next = Map::new(); + let _ = next.insert("skip".to_string(), json!(next_skip)); + ic.next = Some(next); + } + Ok(ic) + } + } + + impl StreamSearchClient for MockClient { + type Error = Error; + + async fn search_stream( + &self, + search: Search, + ) -> Result< + Pin> + Send + '_>>, + Self::Error, + > { + Ok(stream_search_pages(self, search)) + } + } + + /// Helper to build an API item (JSON map) with id and datetime. + fn make_item(id: &str, datetime: &str) -> Item { + let val = json!({ + "type": "Feature", + "stac_version": "1.0.0", + "id": id, + "geometry": null, + "bbox": null, + "properties": { + "datetime": datetime + }, + "links": [], + "assets": {} + }); + serde_json::from_value::>(val).unwrap() + } + + #[tokio::test] + async fn federated_empty_clients() { + let federated: FederatedSearchClient = FederatedSearchClient::new(vec![]); + let ic = federated.search(Search::default()).await.unwrap(); + assert!(ic.items.is_empty()); + } + + #[tokio::test] + async fn federated_single_client() { + let items = vec![ + make_item("c", "2024-03-01T00:00:00Z"), + make_item("b", "2024-02-01T00:00:00Z"), + make_item("a", "2024-01-01T00:00:00Z"), + ]; + let client = MockClient::new(items, 2); + let federated = FederatedSearchClient::new(vec![client]); + + let mut stream = StreamSearchClient::search_stream(&federated, Search::default()) + .await + .unwrap(); + + // Default sort: datetime desc, id asc — items should come out + // in the order the mock provides them (already datetime desc). + let first = stream.next().await.unwrap().unwrap(); + assert_eq!(first["id"], "c"); + let second = stream.next().await.unwrap().unwrap(); + assert_eq!(second["id"], "b"); + let third = stream.next().await.unwrap().unwrap(); + assert_eq!(third["id"], "a"); + assert!(stream.next().await.is_none()); + } + + #[tokio::test] + async fn federated_merge_sorted_by_datetime_desc() { + // Client 1: items with datetimes 2024-03, 2024-01 + let client1 = MockClient::new( + vec![ + make_item("x", "2024-03-01T00:00:00Z"), + make_item("y", "2024-01-01T00:00:00Z"), + ], + 10, + ); + // Client 2: items with datetimes 2024-04, 2024-02 + let client2 = MockClient::new( + vec![ + make_item("a", "2024-04-01T00:00:00Z"), + make_item("b", "2024-02-01T00:00:00Z"), + ], + 10, + ); + + let federated = FederatedSearchClient::new(vec![client1, client2]); + let mut stream = StreamSearchClient::search_stream(&federated, Search::default()) + .await + .unwrap(); + + // Default sort: datetime desc, id asc. + // Expected order: 2024-04 (a), 2024-03 (x), 2024-02 (b), 2024-01 (y) + let ids: Vec = { + let mut v = Vec::new(); + while let Some(Ok(item)) = stream.next().await { + v.push(item["id"].as_str().unwrap().to_string()); + } + v + }; + assert_eq!(ids, vec!["a", "x", "b", "y"]); + } + + #[tokio::test] + async fn federated_merge_sorted_by_id_asc() { + use crate::api::Sortby; + + let client1 = MockClient::new( + vec![ + make_item("alpha", "2024-01-01T00:00:00Z"), + make_item("charlie", "2024-03-01T00:00:00Z"), + ], + 10, + ); + let client2 = MockClient::new( + vec![ + make_item("bravo", "2024-02-01T00:00:00Z"), + make_item("delta", "2024-04-01T00:00:00Z"), + ], + 10, + ); + + let federated = FederatedSearchClient::new(vec![client1, client2]); + let search = Search::default().sortby(vec![Sortby::asc("id".to_string())]); + let mut stream = StreamSearchClient::search_stream(&federated, search) + .await + .unwrap(); + + let ids: Vec = { + let mut v = Vec::new(); + while let Some(Ok(item)) = stream.next().await { + v.push(item["id"].as_str().unwrap().to_string()); + } + v + }; + assert_eq!(ids, vec!["alpha", "bravo", "charlie", "delta"]); + } + + #[tokio::test] + async fn federated_search_collects_all_items() { + let client1 = MockClient::new( + vec![ + make_item("c", "2024-03-01T00:00:00Z"), + make_item("a", "2024-01-01T00:00:00Z"), + ], + 1, // page size 1 — forces multiple pages + ); + let client2 = MockClient::new(vec![make_item("b", "2024-02-01T00:00:00Z")], 10); + + let federated = FederatedSearchClient::new(vec![client1, client2]); + let ic = federated.search(Search::default()).await.unwrap(); + + // Should have all 3 items + assert_eq!(ic.items.len(), 3); + + // Verify sorted order (datetime desc): c, b, a + let ids: Vec<&str> = ic.items.iter().map(|i| i["id"].as_str().unwrap()).collect(); + assert_eq!(ids, vec!["c", "b", "a"]); + } + + #[tokio::test] + async fn federated_three_clients_interleaved() { + let client1 = MockClient::new(vec![make_item("1", "2024-06-01T00:00:00Z")], 10); + let client2 = MockClient::new(vec![make_item("2", "2024-05-01T00:00:00Z")], 10); + let client3 = MockClient::new(vec![make_item("3", "2024-04-01T00:00:00Z")], 10); + + let federated = FederatedSearchClient::new(vec![client1, client2, client3]); + let mut stream = StreamSearchClient::search_stream(&federated, Search::default()) + .await + .unwrap(); + + let ids: Vec = { + let mut v = Vec::new(); + while let Some(Ok(item)) = stream.next().await { + v.push(item["id"].as_str().unwrap().to_string()); + } + v + }; + assert_eq!(ids, vec!["1", "2", "3"]); + } + + #[tokio::test] + async fn federated_same_datetime_sorts_by_id() { + // When datetimes are equal, default sort uses id asc as tiebreaker. + let client1 = MockClient::new(vec![make_item("zebra", "2024-01-01T00:00:00Z")], 10); + let client2 = MockClient::new(vec![make_item("apple", "2024-01-01T00:00:00Z")], 10); + + let federated = FederatedSearchClient::new(vec![client1, client2]); + let mut stream = StreamSearchClient::search_stream(&federated, Search::default()) + .await + .unwrap(); + + let ids: Vec = { + let mut v = Vec::new(); + while let Some(Ok(item)) = stream.next().await { + v.push(item["id"].as_str().unwrap().to_string()); + } + v + }; + // Same datetime → id asc → apple before zebra + assert_eq!(ids, vec!["apple", "zebra"]); + } +} diff --git a/crates/core/src/api/mod.rs b/crates/core/src/api/mod.rs index e8fbc8e6c..de1063323 100644 --- a/crates/core/src/api/mod.rs +++ b/crates/core/src/api/mod.rs @@ -37,6 +37,8 @@ mod client; mod collections; mod conformance; +#[cfg(feature = "stream")] +mod federated; mod fields; mod filter; mod item_collection; @@ -49,11 +51,15 @@ mod url_builder; #[cfg(feature = "geoarrow")] pub use client::ArrowSearchClient; pub use client::{CollectionSearchClient, SearchClient, TransactionClient}; +#[cfg(feature = "stream")] +pub use client::{StreamSearchClient, stream_search_pages}; pub use collections::Collections; pub use conformance::{ COLLECTIONS_URI, CORE_URI, Conformance, FEATURES_URI, FILTER_URIS, GEOJSON_URI, ITEM_SEARCH_URI, OGC_API_FEATURES_URI, }; +#[cfg(feature = "stream")] +pub use federated::FederatedSearchClient; pub use fields::Fields; pub use filter::Filter; pub use item_collection::{Context, ItemCollection}; diff --git a/crates/core/src/sort.rs b/crates/core/src/sort.rs index 493e34ade..9e2395acb 100644 --- a/crates/core/src/sort.rs +++ b/crates/core/src/sort.rs @@ -26,6 +26,80 @@ enum Direction { Desc, } +/// A trait for items that can be sorted by [`ItemComparator`]. +/// +/// This is implemented for both [`Item`] (the typed STAC struct) and +/// [`serde_json::Map`] (the API-level item representation), +/// so that sorting and stream-merging work with either type. +pub trait SortableItem { + /// Resolves the value of a named field for comparison purposes. + /// + /// Well-known fields like `"id"`, `"datetime"`, `"collection"` etc. should + /// be returned as [`Value`] references. Unknown fields should be looked up + /// in properties / additional fields. + fn resolve_sort_field(&self, field: &str) -> Option; +} + +impl SortableItem for Item { + fn resolve_sort_field(&self, field: &str) -> Option { + match field { + "id" => Some(Value::String(self.id.clone())), + "datetime" => self + .properties + .datetime + .or(self.properties.start_datetime) + .map(|dt| Value::String(dt.to_rfc3339())), + "start_datetime" => self + .properties + .start_datetime + .or(self.properties.datetime) + .map(|dt| Value::String(dt.to_rfc3339())), + "end_datetime" => self + .properties + .end_datetime + .map(|dt| Value::String(dt.to_rfc3339())), + "title" => self + .properties + .title + .as_ref() + .map(|s| Value::String(s.clone())), + "description" => self + .properties + .description + .as_ref() + .map(|s| Value::String(s.clone())), + "created" => self + .properties + .created + .as_ref() + .map(|s| Value::String(s.clone())), + "updated" => self + .properties + .updated + .as_ref() + .map(|s| Value::String(s.clone())), + "collection" => self.collection.as_ref().map(|s| Value::String(s.clone())), + _ => self.properties.additional_fields.get(field).cloned(), + } + } +} + +impl SortableItem for serde_json::Map { + fn resolve_sort_field(&self, field: &str) -> Option { + // First check top-level keys. + if let Some(v) = self.get(field) { + return Some(v.clone()); + } + // Then check inside "properties". + if let Some(Value::Object(props)) = self.get("properties") { + if let Some(v) = props.get(field) { + return Some(v.clone()); + } + } + None + } +} + /// A comparator for STAC Items. /// /// This struct allows it to be used to sort items based on a configuration. @@ -147,6 +221,26 @@ impl ItemComparator { } Ordering::Equal } + + /// Compares two [`SortableItem`]s using the configured sort fields. + /// + /// This is the generic version of [`compare`](Self::compare) that works + /// with any type implementing [`SortableItem`], including both typed + /// [`Item`]s and API-level JSON maps. + pub fn compare_sortable(&self, l: &T, r: &T) -> Ordering { + for sort_field in &self.sort_fields { + let l_val = l.resolve_sort_field(&sort_field.field); + let r_val = r.resolve_sort_field(&sort_field.field); + let ord = compare_values(l_val.as_ref(), r_val.as_ref()); + if ord != Ordering::Equal { + return match sort_field.direction { + Direction::Asc => ord, + Direction::Desc => ord.reverse(), + }; + } + } + Ordering::Equal + } } impl Default for ItemComparator { @@ -306,6 +400,50 @@ where })) } +/// Sorts multiple streams of [`SortableItem`]s into a single sorted stream. +/// +/// This is the generic version of [`sort_streams`] that works with any type +/// implementing [`SortableItem`], including API-level items +/// (`serde_json::Map`). +/// +/// # Examples +/// +/// ``` +/// use stac::sort::sort_sortable_streams; +/// use serde_json::{json, Map, Value}; +/// use futures::stream::{self, StreamExt}; +/// +/// # tokio_test::block_on(async { +/// let item_a: Map = serde_json::from_value(json!({"id": "a"})).unwrap(); +/// let item_b: Map = serde_json::from_value(json!({"id": "b"})).unwrap(); +/// let stream1 = stream::iter(vec![item_a]); +/// let stream2 = stream::iter(vec![item_b]); +/// let config = json!({ +/// "sortby": [ +/// { "field": "id", "direction": "asc" } +/// ] +/// }); +/// let mut sorted = sort_sortable_streams(vec![stream1, stream2], config).unwrap(); +/// let first = sorted.next().await.unwrap(); +/// assert_eq!(first["id"], "a"); +/// # }); +/// ``` +#[cfg(feature = "stream")] +pub fn sort_sortable_streams( + streams: I, + config: Value, +) -> Result, serde_json::Error> +where + T: SortableItem + Unpin, + S: Stream + Unpin, + I: IntoIterator, +{ + let comparator = ItemComparator::new(config)?; + Ok(kmerge_by(streams, move |a, b| { + comparator.compare_sortable(a, b).reverse() + })) +} + #[cfg(test)] mod tests { use super::*;