Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
105 changes: 89 additions & 16 deletions pkg/activity/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,19 +43,28 @@ func (s *RedisStore) IncrementMany(ctx context.Context, events []Event) error {

aggregated := make(map[counterKey]uint32)
for _, event := range events {
if event.EnvUUID == "" || event.NodeUUID == "" || event.Type >= EventTypeCount {
if event.EnvUUID == "" || event.Type >= EventTypeCount {
continue
}

if event.Count == 0 {
continue
}

key := counterKey{
key: DayKey(s.prefix, event.EnvUUID, event.NodeUUID, event.At),
offset: bitOffset(event.Type, bucketHour(event.At)),
offset := bitOffset(event.Type, bucketHour(event.At))
if event.NodeUUID != "" {
nodeKey := counterKey{
key: DayKey(s.prefix, event.EnvUUID, event.NodeUUID, event.At),
offset: offset,
}
aggregated[nodeKey] += uint32(event.Count)
}

envKey := counterKey{
key: EnvDayKey(s.prefix, event.EnvUUID, event.At),
offset: offset,
}
aggregated[key] += uint32(event.Count)
aggregated[envKey] += uint32(event.Count)
}

if len(aggregated) == 0 {
Expand Down Expand Up @@ -95,17 +104,7 @@ func (s *RedisStore) ReadSeries(ctx context.Context, envUUID string, nodeUUIDs [
pipe := s.client.Pipeline()
fetches := make([]dayFetch, 0, len(nodeUUIDs)*days)
for _, nodeUUID := range nodeUUIDs {
out[nodeUUID] = NodeTileSeries{
Start: start,
BucketSeconds: BucketSeconds,
Enroll: make([]uint16, bucketCount),
Config: make([]uint16, bucketCount),
Status: make([]uint16, bucketCount),
Result: make([]uint16, bucketCount),
QueryRead: make([]uint16, bucketCount),
QueryWrite: make([]uint16, bucketCount),
Total: make([]uint16, bucketCount),
}
out[nodeUUID] = newSeries(start, bucketCount)

for dayIndex := 0; dayIndex < days; dayIndex++ {
day := start.Add(time.Duration(dayIndex) * 24 * time.Hour)
Expand Down Expand Up @@ -161,6 +160,80 @@ func (s *RedisStore) ReadSeries(ctx context.Context, envUUID string, nodeUUIDs [
return out, nil
}

// ReadEnvSeries returns a dense environment activity series for the requested day window.
func (s *RedisStore) ReadEnvSeries(ctx context.Context, envUUID string, end time.Time, days int) (EnvSeries, error) {
if days <= 0 {
days = 1
}

start := dayStart(end).Add(-time.Duration(days-1) * 24 * time.Hour)
bucketCount := days * BucketsPerDay
series := newSeries(start, bucketCount)

pipe := s.client.Pipeline()
fetches := make([]*redis.StringCmd, 0, days)
for dayIndex := 0; dayIndex < days; dayIndex++ {
day := start.Add(time.Duration(dayIndex) * 24 * time.Hour)
fetches = append(fetches, pipe.Get(ctx, EnvDayKey(s.prefix, envUUID, day)))
}

if len(fetches) == 0 {
return series, nil
}

if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) {
return EnvSeries{}, err
}

for dayIndex, cmd := range fetches {
blob, err := cmd.Bytes()
if err != nil && !errors.Is(err, redis.Nil) {
return EnvSeries{}, err
}
if len(blob) == 0 {
continue
}

fillSeries(&series, dayIndex*BucketsPerDay, decodeDay(blob))
}

return series, nil
}

func newSeries(start time.Time, bucketCount int) NodeTileSeries {
return NodeTileSeries{
Start: start,
BucketSeconds: BucketSeconds,
Enroll: make([]uint16, bucketCount),
Config: make([]uint16, bucketCount),
Status: make([]uint16, bucketCount),
Result: make([]uint16, bucketCount),
QueryRead: make([]uint16, bucketCount),
QueryWrite: make([]uint16, bucketCount),
Total: make([]uint16, bucketCount),
}
}

func fillSeries(series *NodeTileSeries, base int, decoded [EventTypeCount][BucketsPerDay]uint16) {
for hour := 0; hour < BucketsPerDay; hour++ {
idx := base + hour
series.Enroll[idx] = decoded[EventEnroll][hour]
series.Config[idx] = decoded[EventConfig][hour]
series.Status[idx] = decoded[EventStatus][hour]
series.Result[idx] = decoded[EventResult][hour]
series.QueryRead[idx] = decoded[EventQueryRead][hour]
series.QueryWrite[idx] = decoded[EventQueryWrite][hour]
series.Total[idx] = saturatingSum(
decoded[EventEnroll][hour],
decoded[EventConfig][hour],
decoded[EventStatus][hour],
decoded[EventResult][hour],
decoded[EventQueryRead][hour],
decoded[EventQueryWrite][hour],
)
}
}

func decodeDay(blob []byte) [EventTypeCount][BucketsPerDay]uint16 {
var out [EventTypeCount][BucketsPerDay]uint16

Expand Down
64 changes: 64 additions & 0 deletions pkg/activity/redis_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,12 @@ func TestDayKeyUsesEnvNodeAndUTCDate(t *testing.T) {
if got := dayStart(day); !got.Equal(wantDayStart) {
t.Fatalf("expected UTC day start %s, got %s", wantDayStart, got)
}

gotEnv := EnvDayKey("nodeact:v1", "ENV1", day)
wantEnv := "nodeact:v1:env:ENV1:20260615"
if gotEnv != wantEnv {
t.Fatalf("expected env key %q, got %q", wantEnv, gotEnv)
}
}

func TestStoreIncrementManyAndReadSeries(t *testing.T) {
Expand Down Expand Up @@ -89,6 +95,12 @@ func TestStoreIncrementManyAndReadSeries(t *testing.T) {
if got := fake.expireFor("nodeact:v1:ENV1:NODE1:20260615"); got != 24*time.Hour {
t.Fatalf("expected 24h TTL for 20260615 key, got %s", got)
}
if got := fake.expireFor("nodeact:v1:env:ENV1:20260614"); got != 24*time.Hour {
t.Fatalf("expected 24h TTL for env 20260614 key, got %s", got)
}
if got := fake.expireFor("nodeact:v1:env:ENV1:20260615"); got != 24*time.Hour {
t.Fatalf("expected 24h TTL for env 20260615 key, got %s", got)
}

empty := out["NODE2"]
if len(empty.Total) != 48 {
Expand All @@ -100,3 +112,55 @@ func TestStoreIncrementManyAndReadSeries(t *testing.T) {
}
}
}

func TestStoreReadEnvSeries(t *testing.T) {
client, fake := newTestRedisClient(t)
store := NewRedisStore(client, "nodeact:v1", 7, 24*time.Hour)

ctx := context.Background()
end := time.Date(2026, 6, 15, 23, 50, 0, 0, time.UTC)

err := store.IncrementMany(ctx, []Event{
{EnvUUID: "ENV1", NodeUUID: "NODE1", Type: EventStatus, At: time.Date(2026, 6, 15, 11, 20, 0, 0, time.UTC), Count: 2},
{EnvUUID: "ENV1", NodeUUID: "NODE2", Type: EventStatus, At: time.Date(2026, 6, 15, 11, 40, 0, 0, time.UTC), Count: 1},
{EnvUUID: "ENV1", NodeUUID: "NODE2", Type: EventQueryRead, At: time.Date(2026, 6, 15, 12, 5, 0, 0, time.UTC), Count: 4},
{EnvUUID: "ENV1", NodeUUID: "NODE3", Type: EventEnroll, At: time.Date(2026, 6, 15, 12, 10, 0, 0, time.UTC), Count: 1},
{EnvUUID: "ENV2", NodeUUID: "NODE9", Type: EventResult, At: time.Date(2026, 6, 15, 12, 10, 0, 0, time.UTC), Count: 7},
})
if err != nil {
t.Fatalf("increment failed: %v", err)
}

series, err := store.ReadEnvSeries(ctx, "ENV1", end, 1)
if err != nil {
t.Fatalf("read env series failed: %v", err)
}

if len(series.Total) != 24 {
t.Fatalf("expected 24 buckets, got %d", len(series.Total))
}

wantStart := time.Date(2026, 6, 15, 0, 0, 0, 0, time.UTC)
if !series.Start.Equal(wantStart) {
t.Fatalf("expected start %s, got %s", wantStart, series.Start)
}

if series.Status[11] != 3 {
t.Fatalf("expected env status bucket 11 to be 3, got %d", series.Status[11])
}
if series.QueryRead[12] != 4 {
t.Fatalf("expected env query-read bucket 12 to be 4, got %d", series.QueryRead[12])
}
if series.Enroll[12] != 1 {
t.Fatalf("expected env enroll bucket 12 to be 1, got %d", series.Enroll[12])
}
if series.Total[11] != 3 || series.Total[12] != 5 {
t.Fatalf("unexpected env totals bucket11=%d bucket12=%d", series.Total[11], series.Total[12])
}
if series.Result[12] != 0 {
t.Fatalf("expected ENV2 result data not to leak into ENV1, got %d", series.Result[12])
}
if got := fake.expireFor("nodeact:v1:env:ENV1:20260615"); got != 24*time.Hour {
t.Fatalf("expected env key TTL to be 24h, got %s", got)
}
}
8 changes: 8 additions & 0 deletions pkg/activity/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,19 @@ type NodeTileSeries struct {
Total []uint16 `json:"total"`
}

// EnvSeries is a dense activity series ready for environment-level graphs.
type EnvSeries = NodeTileSeries

// DayKey returns the Redis key for one node's UTC activity day blob.
func DayKey(prefix, envUUID, nodeUUID string, day time.Time) string {
return fmt.Sprintf("%s:%s:%s:%s", prefix, envUUID, nodeUUID, day.UTC().Format("20060102"))
}

// EnvDayKey returns the Redis key for one environment's UTC activity day blob.
func EnvDayKey(prefix, envUUID string, day time.Time) string {
return fmt.Sprintf("%s:env:%s:%s", prefix, envUUID, day.UTC().Format("20060102"))
}

func bucketHour(t time.Time) int {
return t.UTC().Hour()
}
Expand Down
Loading