Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions dimos/memory2/stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,36 @@ def limit(self, k: int) -> Stream[T, O]:
def offset(self, n: int) -> Stream[T, O]:
return self._replace_query(offset_val=n)

# Time windowing — None means unbounded on that side. ``*_time`` is relative
# to the stream's first observation; ``*_timestamp`` is absolute epoch seconds.
def from_time(self, seconds: float | None) -> Stream[T, O]:
"""Keep observations from ``seconds`` after the first (relative)."""
if seconds is None:
return self
try:
t0 = self.first().ts
except LookupError:
return self # already empty → empty window, not a crash
return self.after(t0 + seconds)

def to_time(self, seconds: float | None) -> Stream[T, O]:
"""Keep ``seconds`` of observations from the current start (relative duration)."""
if seconds is None:
return self
try:
t0 = self.first().ts
except LookupError:
return self
return self.before(t0 + seconds)

def from_timestamp(self, ts: float | None) -> Stream[T, O]:
"""Keep observations after absolute epoch ``ts``."""
return self if ts is None else self.after(ts)

def to_timestamp(self, ts: float | None) -> Stream[T, O]:
"""Keep observations up to absolute epoch ``ts``."""
return self if ts is None else self.before(ts)

def search(self, query: Embedding, k: int | None = None) -> Stream[T, EmbeddedObservation[T]]:
"""Rank observations by cosine similarity to *query*.

Expand Down
56 changes: 56 additions & 0 deletions dimos/memory2/test_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -781,3 +781,59 @@ def consumer():
assert results == ["b", "d"]
assert results == ["b", "d"]
assert results == ["b", "d"]


class TestTimeWindowing:
"""``*_time`` is relative to the first observation, ``*_timestamp`` is absolute.

A trailing ``to_time`` measures a duration from the current start, so chaining
reads as "skip, then take the following N seconds".
"""

def test_from_time_skips_relative_seconds(self, make_stream):
stream = make_stream(10) # ts 0..9
assert [o.data for o in stream.from_time(3)] == [40, 50, 60, 70, 80, 90]

def test_to_time_keeps_leading_seconds(self, make_stream):
stream = make_stream(10)
assert [o.data for o in stream.to_time(3)] == [0, 10, 20]

def test_chained_skip_then_following_duration(self, make_stream):
stream = make_stream(10) # ts 0..9
# skip first 2s, then the following 3s -> ts 3, 4, 5
assert [o.data for o in stream.from_time(2).to_time(3)] == [30, 40, 50]

def test_from_timestamp_is_absolute(self, make_stream):
stream = make_stream(10, start_ts=1000.0) # ts 1000..1009
assert [o.data for o in stream.from_timestamp(1003.0)] == [40, 50, 60, 70, 80, 90]

def test_to_timestamp_is_absolute(self, make_stream):
stream = make_stream(10, start_ts=1000.0)
assert [o.data for o in stream.to_timestamp(1003.0)] == [0, 10, 20]

def test_absolute_range(self, make_stream):
stream = make_stream(10, start_ts=1000.0)
# all between 1002 and 1006 -> ts 1003, 1004, 1005
windowed = stream.from_timestamp(1002.0).to_timestamp(1006.0)
assert [o.data for o in windowed] == [30, 40, 50]

def test_absolute_start_relative_duration(self, make_stream):
stream = make_stream(10, start_ts=1000.0)
# seek to 1003, then the following 3s -> ts 1004, 1005, 1006
assert [o.data for o in stream.from_timestamp(1003.0).to_time(3)] == [40, 50, 60]

def test_none_bounds_are_noops(self, make_stream):
stream = make_stream(5)
full = [0, 10, 20, 30, 40]
assert [o.data for o in stream.from_time(None)] == full
assert [o.data for o in stream.to_time(None)] == full
assert [o.data for o in stream.from_timestamp(None)] == full
assert [o.data for o in stream.to_timestamp(None)] == full

def test_relative_bounds_on_empty_yield_empty(self, make_stream):
# Relative bounds resolve an anchor from first(); an empty slice (here,
# seeking past all data) must yield empty, not raise LookupError.
stream = make_stream(10, start_ts=1000.0)
assert stream.from_timestamp(9999.0).to_time(30).to_list() == []
assert make_stream(0).from_time(5).to_list() == []
assert make_stream(0).to_time(5).to_list() == []
Loading