From 32dd818abb31db150150b412ecfa5daeb77a356c Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 1 Jun 2026 12:38:47 +0800 Subject: [PATCH 1/2] memory2: time windowing on Stream Add from_time/to_time (relative to the first observation) and from_timestamp/to_timestamp (absolute epoch seconds) for windowing a stream by time. A trailing to_time is a duration measured from the current start, so from_time(2).to_time(30) reads as "skip 2s, take the following 30s"; frames mix freely (from_timestamp(ts).to_time(30)). Shared base for the stream-alignment (#2306) and go2dds (#2314) branches, which both need this windowing API. --- dimos/memory2/stream.py | 18 ++++++++++++++ dimos/memory2/test_stream.py | 48 ++++++++++++++++++++++++++++++++++++ 2 files changed, 66 insertions(+) diff --git a/dimos/memory2/stream.py b/dimos/memory2/stream.py index 8f0a767aa7..7a37869f5b 100644 --- a/dimos/memory2/stream.py +++ b/dimos/memory2/stream.py @@ -196,6 +196,24 @@ def limit(self, k: int) -> Stream[T, O]: def offset(self, n: int) -> Stream[T, O]: return self._replace_query(offset_val=n) + # Time windowing — None means unbounded on that side. ``*_time`` is relative + # to the stream's first observation; ``*_timestamp`` is absolute epoch seconds. + def from_time(self, seconds: float | None) -> Stream[T, O]: + """Keep observations from ``seconds`` after the first (relative).""" + return self if seconds is None else self.after(self.first().ts + seconds) + + def to_time(self, seconds: float | None) -> Stream[T, O]: + """Keep ``seconds`` of observations from the current start (relative duration).""" + return self if seconds is None else self.before(self.first().ts + seconds) + + def from_timestamp(self, ts: float | None) -> Stream[T, O]: + """Keep observations after absolute epoch ``ts``.""" + return self if ts is None else self.after(ts) + + def to_timestamp(self, ts: float | None) -> Stream[T, O]: + """Keep observations up to absolute epoch ``ts``.""" + return self if ts is None else self.before(ts) + def search(self, query: Embedding, k: int | None = None) -> Stream[T, EmbeddedObservation[T]]: """Rank observations by cosine similarity to *query*. diff --git a/dimos/memory2/test_stream.py b/dimos/memory2/test_stream.py index 32cb542b83..d17cb157ba 100644 --- a/dimos/memory2/test_stream.py +++ b/dimos/memory2/test_stream.py @@ -781,3 +781,51 @@ def consumer(): assert results == ["b", "d"] assert results == ["b", "d"] assert results == ["b", "d"] + + +class TestTimeWindowing: + """``*_time`` is relative to the first observation, ``*_timestamp`` is absolute. + + A trailing ``to_time`` measures a duration from the current start, so chaining + reads as "skip, then take the following N seconds". + """ + + def test_from_time_skips_relative_seconds(self, make_stream): + stream = make_stream(10) # ts 0..9 + assert [o.data for o in stream.from_time(3)] == [40, 50, 60, 70, 80, 90] + + def test_to_time_keeps_leading_seconds(self, make_stream): + stream = make_stream(10) + assert [o.data for o in stream.to_time(3)] == [0, 10, 20] + + def test_chained_skip_then_following_duration(self, make_stream): + stream = make_stream(10) # ts 0..9 + # skip first 2s, then the following 3s -> ts 3, 4, 5 + assert [o.data for o in stream.from_time(2).to_time(3)] == [30, 40, 50] + + def test_from_timestamp_is_absolute(self, make_stream): + stream = make_stream(10, start_ts=1000.0) # ts 1000..1009 + assert [o.data for o in stream.from_timestamp(1003.0)] == [40, 50, 60, 70, 80, 90] + + def test_to_timestamp_is_absolute(self, make_stream): + stream = make_stream(10, start_ts=1000.0) + assert [o.data for o in stream.to_timestamp(1003.0)] == [0, 10, 20] + + def test_absolute_range(self, make_stream): + stream = make_stream(10, start_ts=1000.0) + # all between 1002 and 1006 -> ts 1003, 1004, 1005 + windowed = stream.from_timestamp(1002.0).to_timestamp(1006.0) + assert [o.data for o in windowed] == [30, 40, 50] + + def test_absolute_start_relative_duration(self, make_stream): + stream = make_stream(10, start_ts=1000.0) + # seek to 1003, then the following 3s -> ts 1004, 1005, 1006 + assert [o.data for o in stream.from_timestamp(1003.0).to_time(3)] == [40, 50, 60] + + def test_none_bounds_are_noops(self, make_stream): + stream = make_stream(5) + full = [0, 10, 20, 30, 40] + assert [o.data for o in stream.from_time(None)] == full + assert [o.data for o in stream.to_time(None)] == full + assert [o.data for o in stream.from_timestamp(None)] == full + assert [o.data for o in stream.to_timestamp(None)] == full From 6ab8f0ae635c162aedf5963c76838c5db3261705 Mon Sep 17 00:00:00 2001 From: Ivan Nikolic Date: Mon, 1 Jun 2026 13:24:55 +0800 Subject: [PATCH 2/2] memory2: from_time/to_time yield empty on empty stream, not LookupError MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit from_time/to_time resolve their anchor via first(), which raised LookupError when the stream was already empty (e.g. from_timestamp(future_ts).to_time(30) — the mixed pattern the PR advertises). Guard the eager first() so an empty slice returns an empty window, matching the lazy from_timestamp/to_timestamp. Adds a regression test. --- dimos/memory2/stream.py | 16 ++++++++++++++-- dimos/memory2/test_stream.py | 8 ++++++++ 2 files changed, 22 insertions(+), 2 deletions(-) diff --git a/dimos/memory2/stream.py b/dimos/memory2/stream.py index 7a37869f5b..d0f4f14ca0 100644 --- a/dimos/memory2/stream.py +++ b/dimos/memory2/stream.py @@ -200,11 +200,23 @@ def offset(self, n: int) -> Stream[T, O]: # to the stream's first observation; ``*_timestamp`` is absolute epoch seconds. def from_time(self, seconds: float | None) -> Stream[T, O]: """Keep observations from ``seconds`` after the first (relative).""" - return self if seconds is None else self.after(self.first().ts + seconds) + if seconds is None: + return self + try: + t0 = self.first().ts + except LookupError: + return self # already empty → empty window, not a crash + return self.after(t0 + seconds) def to_time(self, seconds: float | None) -> Stream[T, O]: """Keep ``seconds`` of observations from the current start (relative duration).""" - return self if seconds is None else self.before(self.first().ts + seconds) + if seconds is None: + return self + try: + t0 = self.first().ts + except LookupError: + return self + return self.before(t0 + seconds) def from_timestamp(self, ts: float | None) -> Stream[T, O]: """Keep observations after absolute epoch ``ts``.""" diff --git a/dimos/memory2/test_stream.py b/dimos/memory2/test_stream.py index d17cb157ba..fe92650a12 100644 --- a/dimos/memory2/test_stream.py +++ b/dimos/memory2/test_stream.py @@ -829,3 +829,11 @@ def test_none_bounds_are_noops(self, make_stream): assert [o.data for o in stream.to_time(None)] == full assert [o.data for o in stream.from_timestamp(None)] == full assert [o.data for o in stream.to_timestamp(None)] == full + + def test_relative_bounds_on_empty_yield_empty(self, make_stream): + # Relative bounds resolve an anchor from first(); an empty slice (here, + # seeking past all data) must yield empty, not raise LookupError. + stream = make_stream(10, start_ts=1000.0) + assert stream.from_timestamp(9999.0).to_time(30).to_list() == [] + assert make_stream(0).from_time(5).to_list() == [] + assert make_stream(0).to_time(5).to_list() == []