Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions packages/backend/src/clusters/clusters.controller.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import { Observable, timer, exhaustMap, map, from } from 'rxjs';
import { AuthGuard } from '../auth/guards/jwt.guard.js';

import { ClustersService } from './clusters.service.js';
import { GetAggregateStatsZodDto } from './dto/get-aggregate-stats.dto.js';
import { GetClusterLogsQueryZodDto, GetClusterLogsZodDto } from './dto/get-cluster-logs.dto.js';
import { GetClusterStatsZodDto } from './dto/get-cluster-stats.dto.js';
import { GetClusterZodDto } from './dto/get-cluster.dto.js';
Expand Down Expand Up @@ -68,6 +69,20 @@ export class ClustersController {
);
}

@Sse('stats/stream')
@ApiOkResponse({
description:
'SSE stream that emits the stats of every running cluster at a configurable interval (default 5s). Clusters whose stats failed to fetch are omitted.',
type: GetAggregateStatsZodDto,
})
@ApiProduces('text/event-stream')
streamAggregateStats(@Query() query: SseIntervalQueryZodDto): Observable<MessageEvent> {
return timer(0, query.interval * 1000).pipe(
exhaustMap(() => from(this.clustersService.aggregateStats())),
map((stats) => ({ data: stats }) as MessageEvent),
);
}

@Get(':id')
@ApiOkResponse({
description: 'The cluster based on the given ID.',
Expand Down
27 changes: 27 additions & 0 deletions packages/backend/src/clusters/clusters.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ import { BadRequestException, Injectable, NotFoundException } from '@nestjs/comm
import { DockerService } from '../docker/docker.service.js';
import { PrismaService } from '../prisma/prisma.service.js';

import { GetAggregateStatsDto } from './dto/get-aggregate-stats.dto.js';

@Injectable()
export class ClustersService {
constructor(
Expand Down Expand Up @@ -144,4 +146,29 @@ export class ClustersService {

return await this.dockerService.getContainerStats(resource.containerId);
}

async aggregateStats(): Promise<GetAggregateStatsDto> {
const clusters = await this.prismaService.cluster.findMany({
where: { status: 'RUNNING', containerId: { not: null } },
select: { id: true, containerId: true },
});

const runnable = clusters.filter((c): c is { id: number; containerId: string } => c.containerId !== null);

const results = await Promise.allSettled(
runnable.map(async (c) => ({
id: c.id,
stats: await this.dockerService.getContainerStats(c.containerId),
})),
);

const aggregate: GetAggregateStatsDto = {};
for (const result of results) {
if (result.status === 'fulfilled') {
aggregate[result.value.id] = result.value.stats;
}
}

return aggregate;
}
}
12 changes: 12 additions & 0 deletions packages/backend/src/clusters/dto/get-aggregate-stats.dto.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
import { createZodDto } from 'nestjs-zod';
import { z } from 'zod';

import { GetClusterStatsSchema } from './get-cluster-stats.dto.js';

export const GetAggregateStatsSchema = z.record(z.coerce.number().int().nonnegative(), GetClusterStatsSchema).meta({
description: 'Stats of every running cluster, keyed by cluster ID. Clusters whose stats failed to fetch are omitted.',
});

export class GetAggregateStatsZodDto extends createZodDto(GetAggregateStatsSchema) {}

export type GetAggregateStatsDto = z.infer<typeof GetAggregateStatsSchema>;
Loading