From 115bf342dcd3dd154760b40eec0dd6f358387cad Mon Sep 17 00:00:00 2001 From: A1ex Date: Fri, 15 May 2026 20:47:11 -0400 Subject: [PATCH] feat(backend): sse stream returning stats for every running cluster --- .../src/clusters/clusters.controller.ts | 15 +++++++++++ .../backend/src/clusters/clusters.service.ts | 27 +++++++++++++++++++ .../clusters/dto/get-aggregate-stats.dto.ts | 12 +++++++++ 3 files changed, 54 insertions(+) create mode 100644 packages/backend/src/clusters/dto/get-aggregate-stats.dto.ts diff --git a/packages/backend/src/clusters/clusters.controller.ts b/packages/backend/src/clusters/clusters.controller.ts index 2223296..a99e2e9 100644 --- a/packages/backend/src/clusters/clusters.controller.ts +++ b/packages/backend/src/clusters/clusters.controller.ts @@ -29,6 +29,7 @@ import { Observable, timer, exhaustMap, map, from } from 'rxjs'; import { AuthGuard } from '../auth/guards/jwt.guard.js'; import { ClustersService } from './clusters.service.js'; +import { GetAggregateStatsZodDto } from './dto/get-aggregate-stats.dto.js'; import { GetClusterLogsQueryZodDto, GetClusterLogsZodDto } from './dto/get-cluster-logs.dto.js'; import { GetClusterStatsZodDto } from './dto/get-cluster-stats.dto.js'; import { GetClusterZodDto } from './dto/get-cluster.dto.js'; @@ -68,6 +69,20 @@ export class ClustersController { ); } + @Sse('stats/stream') + @ApiOkResponse({ + description: + 'SSE stream that emits the stats of every running cluster at a configurable interval (default 5s). Clusters whose stats failed to fetch are omitted.', + type: GetAggregateStatsZodDto, + }) + @ApiProduces('text/event-stream') + streamAggregateStats(@Query() query: SseIntervalQueryZodDto): Observable { + return timer(0, query.interval * 1000).pipe( + exhaustMap(() => from(this.clustersService.aggregateStats())), + map((stats) => ({ data: stats }) as MessageEvent), + ); + } + @Get(':id') @ApiOkResponse({ description: 'The cluster based on the given ID.', diff --git a/packages/backend/src/clusters/clusters.service.ts b/packages/backend/src/clusters/clusters.service.ts index 3edbeb1..12748fd 100644 --- a/packages/backend/src/clusters/clusters.service.ts +++ b/packages/backend/src/clusters/clusters.service.ts @@ -6,6 +6,8 @@ import { BadRequestException, Injectable, NotFoundException } from '@nestjs/comm import { DockerService } from '../docker/docker.service.js'; import { PrismaService } from '../prisma/prisma.service.js'; +import { GetAggregateStatsDto } from './dto/get-aggregate-stats.dto.js'; + @Injectable() export class ClustersService { constructor( @@ -144,4 +146,29 @@ export class ClustersService { return await this.dockerService.getContainerStats(resource.containerId); } + + async aggregateStats(): Promise { + const clusters = await this.prismaService.cluster.findMany({ + where: { status: 'RUNNING', containerId: { not: null } }, + select: { id: true, containerId: true }, + }); + + const runnable = clusters.filter((c): c is { id: number; containerId: string } => c.containerId !== null); + + const results = await Promise.allSettled( + runnable.map(async (c) => ({ + id: c.id, + stats: await this.dockerService.getContainerStats(c.containerId), + })), + ); + + const aggregate: GetAggregateStatsDto = {}; + for (const result of results) { + if (result.status === 'fulfilled') { + aggregate[result.value.id] = result.value.stats; + } + } + + return aggregate; + } } diff --git a/packages/backend/src/clusters/dto/get-aggregate-stats.dto.ts b/packages/backend/src/clusters/dto/get-aggregate-stats.dto.ts new file mode 100644 index 0000000..6ce1e14 --- /dev/null +++ b/packages/backend/src/clusters/dto/get-aggregate-stats.dto.ts @@ -0,0 +1,12 @@ +import { createZodDto } from 'nestjs-zod'; +import { z } from 'zod'; + +import { GetClusterStatsSchema } from './get-cluster-stats.dto.js'; + +export const GetAggregateStatsSchema = z.record(z.coerce.number().int().nonnegative(), GetClusterStatsSchema).meta({ + description: 'Stats of every running cluster, keyed by cluster ID. Clusters whose stats failed to fetch are omitted.', +}); + +export class GetAggregateStatsZodDto extends createZodDto(GetAggregateStatsSchema) {} + +export type GetAggregateStatsDto = z.infer;