diff --git a/dimos/memory2/stream.py b/dimos/memory2/stream.py index 8f0a767aa7..d0f4f14ca0 100644 --- a/dimos/memory2/stream.py +++ b/dimos/memory2/stream.py @@ -196,6 +196,36 @@ def limit(self, k: int) -> Stream[T, O]: def offset(self, n: int) -> Stream[T, O]: return self._replace_query(offset_val=n) + # Time windowing — None means unbounded on that side. ``*_time`` is relative + # to the stream's first observation; ``*_timestamp`` is absolute epoch seconds. + def from_time(self, seconds: float | None) -> Stream[T, O]: + """Keep observations from ``seconds`` after the first (relative).""" + if seconds is None: + return self + try: + t0 = self.first().ts + except LookupError: + return self # already empty → empty window, not a crash + return self.after(t0 + seconds) + + def to_time(self, seconds: float | None) -> Stream[T, O]: + """Keep ``seconds`` of observations from the current start (relative duration).""" + if seconds is None: + return self + try: + t0 = self.first().ts + except LookupError: + return self + return self.before(t0 + seconds) + + def from_timestamp(self, ts: float | None) -> Stream[T, O]: + """Keep observations after absolute epoch ``ts``.""" + return self if ts is None else self.after(ts) + + def to_timestamp(self, ts: float | None) -> Stream[T, O]: + """Keep observations up to absolute epoch ``ts``.""" + return self if ts is None else self.before(ts) + def search(self, query: Embedding, k: int | None = None) -> Stream[T, EmbeddedObservation[T]]: """Rank observations by cosine similarity to *query*. diff --git a/dimos/memory2/test_stream.py b/dimos/memory2/test_stream.py index 32cb542b83..fe92650a12 100644 --- a/dimos/memory2/test_stream.py +++ b/dimos/memory2/test_stream.py @@ -781,3 +781,59 @@ def consumer(): assert results == ["b", "d"] assert results == ["b", "d"] assert results == ["b", "d"] + + +class TestTimeWindowing: + """``*_time`` is relative to the first observation, ``*_timestamp`` is absolute. + + A trailing ``to_time`` measures a duration from the current start, so chaining + reads as "skip, then take the following N seconds". + """ + + def test_from_time_skips_relative_seconds(self, make_stream): + stream = make_stream(10) # ts 0..9 + assert [o.data for o in stream.from_time(3)] == [40, 50, 60, 70, 80, 90] + + def test_to_time_keeps_leading_seconds(self, make_stream): + stream = make_stream(10) + assert [o.data for o in stream.to_time(3)] == [0, 10, 20] + + def test_chained_skip_then_following_duration(self, make_stream): + stream = make_stream(10) # ts 0..9 + # skip first 2s, then the following 3s -> ts 3, 4, 5 + assert [o.data for o in stream.from_time(2).to_time(3)] == [30, 40, 50] + + def test_from_timestamp_is_absolute(self, make_stream): + stream = make_stream(10, start_ts=1000.0) # ts 1000..1009 + assert [o.data for o in stream.from_timestamp(1003.0)] == [40, 50, 60, 70, 80, 90] + + def test_to_timestamp_is_absolute(self, make_stream): + stream = make_stream(10, start_ts=1000.0) + assert [o.data for o in stream.to_timestamp(1003.0)] == [0, 10, 20] + + def test_absolute_range(self, make_stream): + stream = make_stream(10, start_ts=1000.0) + # all between 1002 and 1006 -> ts 1003, 1004, 1005 + windowed = stream.from_timestamp(1002.0).to_timestamp(1006.0) + assert [o.data for o in windowed] == [30, 40, 50] + + def test_absolute_start_relative_duration(self, make_stream): + stream = make_stream(10, start_ts=1000.0) + # seek to 1003, then the following 3s -> ts 1004, 1005, 1006 + assert [o.data for o in stream.from_timestamp(1003.0).to_time(3)] == [40, 50, 60] + + def test_none_bounds_are_noops(self, make_stream): + stream = make_stream(5) + full = [0, 10, 20, 30, 40] + assert [o.data for o in stream.from_time(None)] == full + assert [o.data for o in stream.to_time(None)] == full + assert [o.data for o in stream.from_timestamp(None)] == full + assert [o.data for o in stream.to_timestamp(None)] == full + + def test_relative_bounds_on_empty_yield_empty(self, make_stream): + # Relative bounds resolve an anchor from first(); an empty slice (here, + # seeking past all data) must yield empty, not raise LookupError. + stream = make_stream(10, start_ts=1000.0) + assert stream.from_timestamp(9999.0).to_time(30).to_list() == [] + assert make_stream(0).from_time(5).to_list() == [] + assert make_stream(0).to_time(5).to_list() == []