diff --git a/pkg/activity/redis.go b/pkg/activity/redis.go index d1d9bb50..4f0f7944 100644 --- a/pkg/activity/redis.go +++ b/pkg/activity/redis.go @@ -43,7 +43,7 @@ func (s *RedisStore) IncrementMany(ctx context.Context, events []Event) error { aggregated := make(map[counterKey]uint32) for _, event := range events { - if event.EnvUUID == "" || event.NodeUUID == "" || event.Type >= EventTypeCount { + if event.EnvUUID == "" || event.Type >= EventTypeCount { continue } @@ -51,11 +51,20 @@ func (s *RedisStore) IncrementMany(ctx context.Context, events []Event) error { continue } - key := counterKey{ - key: DayKey(s.prefix, event.EnvUUID, event.NodeUUID, event.At), - offset: bitOffset(event.Type, bucketHour(event.At)), + offset := bitOffset(event.Type, bucketHour(event.At)) + if event.NodeUUID != "" { + nodeKey := counterKey{ + key: DayKey(s.prefix, event.EnvUUID, event.NodeUUID, event.At), + offset: offset, + } + aggregated[nodeKey] += uint32(event.Count) + } + + envKey := counterKey{ + key: EnvDayKey(s.prefix, event.EnvUUID, event.At), + offset: offset, } - aggregated[key] += uint32(event.Count) + aggregated[envKey] += uint32(event.Count) } if len(aggregated) == 0 { @@ -95,17 +104,7 @@ func (s *RedisStore) ReadSeries(ctx context.Context, envUUID string, nodeUUIDs [ pipe := s.client.Pipeline() fetches := make([]dayFetch, 0, len(nodeUUIDs)*days) for _, nodeUUID := range nodeUUIDs { - out[nodeUUID] = NodeTileSeries{ - Start: start, - BucketSeconds: BucketSeconds, - Enroll: make([]uint16, bucketCount), - Config: make([]uint16, bucketCount), - Status: make([]uint16, bucketCount), - Result: make([]uint16, bucketCount), - QueryRead: make([]uint16, bucketCount), - QueryWrite: make([]uint16, bucketCount), - Total: make([]uint16, bucketCount), - } + out[nodeUUID] = newSeries(start, bucketCount) for dayIndex := 0; dayIndex < days; dayIndex++ { day := start.Add(time.Duration(dayIndex) * 24 * time.Hour) @@ -161,6 +160,80 @@ func (s *RedisStore) ReadSeries(ctx context.Context, envUUID string, nodeUUIDs [ return out, nil } +// ReadEnvSeries returns a dense environment activity series for the requested day window. +func (s *RedisStore) ReadEnvSeries(ctx context.Context, envUUID string, end time.Time, days int) (EnvSeries, error) { + if days <= 0 { + days = 1 + } + + start := dayStart(end).Add(-time.Duration(days-1) * 24 * time.Hour) + bucketCount := days * BucketsPerDay + series := newSeries(start, bucketCount) + + pipe := s.client.Pipeline() + fetches := make([]*redis.StringCmd, 0, days) + for dayIndex := 0; dayIndex < days; dayIndex++ { + day := start.Add(time.Duration(dayIndex) * 24 * time.Hour) + fetches = append(fetches, pipe.Get(ctx, EnvDayKey(s.prefix, envUUID, day))) + } + + if len(fetches) == 0 { + return series, nil + } + + if _, err := pipe.Exec(ctx); err != nil && !errors.Is(err, redis.Nil) { + return EnvSeries{}, err + } + + for dayIndex, cmd := range fetches { + blob, err := cmd.Bytes() + if err != nil && !errors.Is(err, redis.Nil) { + return EnvSeries{}, err + } + if len(blob) == 0 { + continue + } + + fillSeries(&series, dayIndex*BucketsPerDay, decodeDay(blob)) + } + + return series, nil +} + +func newSeries(start time.Time, bucketCount int) NodeTileSeries { + return NodeTileSeries{ + Start: start, + BucketSeconds: BucketSeconds, + Enroll: make([]uint16, bucketCount), + Config: make([]uint16, bucketCount), + Status: make([]uint16, bucketCount), + Result: make([]uint16, bucketCount), + QueryRead: make([]uint16, bucketCount), + QueryWrite: make([]uint16, bucketCount), + Total: make([]uint16, bucketCount), + } +} + +func fillSeries(series *NodeTileSeries, base int, decoded [EventTypeCount][BucketsPerDay]uint16) { + for hour := 0; hour < BucketsPerDay; hour++ { + idx := base + hour + series.Enroll[idx] = decoded[EventEnroll][hour] + series.Config[idx] = decoded[EventConfig][hour] + series.Status[idx] = decoded[EventStatus][hour] + series.Result[idx] = decoded[EventResult][hour] + series.QueryRead[idx] = decoded[EventQueryRead][hour] + series.QueryWrite[idx] = decoded[EventQueryWrite][hour] + series.Total[idx] = saturatingSum( + decoded[EventEnroll][hour], + decoded[EventConfig][hour], + decoded[EventStatus][hour], + decoded[EventResult][hour], + decoded[EventQueryRead][hour], + decoded[EventQueryWrite][hour], + ) + } +} + func decodeDay(blob []byte) [EventTypeCount][BucketsPerDay]uint16 { var out [EventTypeCount][BucketsPerDay]uint16 diff --git a/pkg/activity/redis_test.go b/pkg/activity/redis_test.go index 3c01ffdf..55489d23 100644 --- a/pkg/activity/redis_test.go +++ b/pkg/activity/redis_test.go @@ -24,6 +24,12 @@ func TestDayKeyUsesEnvNodeAndUTCDate(t *testing.T) { if got := dayStart(day); !got.Equal(wantDayStart) { t.Fatalf("expected UTC day start %s, got %s", wantDayStart, got) } + + gotEnv := EnvDayKey("nodeact:v1", "ENV1", day) + wantEnv := "nodeact:v1:env:ENV1:20260615" + if gotEnv != wantEnv { + t.Fatalf("expected env key %q, got %q", wantEnv, gotEnv) + } } func TestStoreIncrementManyAndReadSeries(t *testing.T) { @@ -89,6 +95,12 @@ func TestStoreIncrementManyAndReadSeries(t *testing.T) { if got := fake.expireFor("nodeact:v1:ENV1:NODE1:20260615"); got != 24*time.Hour { t.Fatalf("expected 24h TTL for 20260615 key, got %s", got) } + if got := fake.expireFor("nodeact:v1:env:ENV1:20260614"); got != 24*time.Hour { + t.Fatalf("expected 24h TTL for env 20260614 key, got %s", got) + } + if got := fake.expireFor("nodeact:v1:env:ENV1:20260615"); got != 24*time.Hour { + t.Fatalf("expected 24h TTL for env 20260615 key, got %s", got) + } empty := out["NODE2"] if len(empty.Total) != 48 { @@ -100,3 +112,55 @@ func TestStoreIncrementManyAndReadSeries(t *testing.T) { } } } + +func TestStoreReadEnvSeries(t *testing.T) { + client, fake := newTestRedisClient(t) + store := NewRedisStore(client, "nodeact:v1", 7, 24*time.Hour) + + ctx := context.Background() + end := time.Date(2026, 6, 15, 23, 50, 0, 0, time.UTC) + + err := store.IncrementMany(ctx, []Event{ + {EnvUUID: "ENV1", NodeUUID: "NODE1", Type: EventStatus, At: time.Date(2026, 6, 15, 11, 20, 0, 0, time.UTC), Count: 2}, + {EnvUUID: "ENV1", NodeUUID: "NODE2", Type: EventStatus, At: time.Date(2026, 6, 15, 11, 40, 0, 0, time.UTC), Count: 1}, + {EnvUUID: "ENV1", NodeUUID: "NODE2", Type: EventQueryRead, At: time.Date(2026, 6, 15, 12, 5, 0, 0, time.UTC), Count: 4}, + {EnvUUID: "ENV1", NodeUUID: "NODE3", Type: EventEnroll, At: time.Date(2026, 6, 15, 12, 10, 0, 0, time.UTC), Count: 1}, + {EnvUUID: "ENV2", NodeUUID: "NODE9", Type: EventResult, At: time.Date(2026, 6, 15, 12, 10, 0, 0, time.UTC), Count: 7}, + }) + if err != nil { + t.Fatalf("increment failed: %v", err) + } + + series, err := store.ReadEnvSeries(ctx, "ENV1", end, 1) + if err != nil { + t.Fatalf("read env series failed: %v", err) + } + + if len(series.Total) != 24 { + t.Fatalf("expected 24 buckets, got %d", len(series.Total)) + } + + wantStart := time.Date(2026, 6, 15, 0, 0, 0, 0, time.UTC) + if !series.Start.Equal(wantStart) { + t.Fatalf("expected start %s, got %s", wantStart, series.Start) + } + + if series.Status[11] != 3 { + t.Fatalf("expected env status bucket 11 to be 3, got %d", series.Status[11]) + } + if series.QueryRead[12] != 4 { + t.Fatalf("expected env query-read bucket 12 to be 4, got %d", series.QueryRead[12]) + } + if series.Enroll[12] != 1 { + t.Fatalf("expected env enroll bucket 12 to be 1, got %d", series.Enroll[12]) + } + if series.Total[11] != 3 || series.Total[12] != 5 { + t.Fatalf("unexpected env totals bucket11=%d bucket12=%d", series.Total[11], series.Total[12]) + } + if series.Result[12] != 0 { + t.Fatalf("expected ENV2 result data not to leak into ENV1, got %d", series.Result[12]) + } + if got := fake.expireFor("nodeact:v1:env:ENV1:20260615"); got != 24*time.Hour { + t.Fatalf("expected env key TTL to be 24h, got %s", got) + } +} diff --git a/pkg/activity/types.go b/pkg/activity/types.go index 1a66a52a..e2c26b26 100644 --- a/pkg/activity/types.go +++ b/pkg/activity/types.go @@ -49,11 +49,19 @@ type NodeTileSeries struct { Total []uint16 `json:"total"` } +// EnvSeries is a dense activity series ready for environment-level graphs. +type EnvSeries = NodeTileSeries + // DayKey returns the Redis key for one node's UTC activity day blob. func DayKey(prefix, envUUID, nodeUUID string, day time.Time) string { return fmt.Sprintf("%s:%s:%s:%s", prefix, envUUID, nodeUUID, day.UTC().Format("20060102")) } +// EnvDayKey returns the Redis key for one environment's UTC activity day blob. +func EnvDayKey(prefix, envUUID string, day time.Time) string { + return fmt.Sprintf("%s:env:%s:%s", prefix, envUUID, day.UTC().Format("20060102")) +} + func bucketHour(t time.Time) int { return t.UTC().Hour() }