From 0949666e7faec84a52b3ac85446440bdb21b1581 Mon Sep 17 00:00:00 2001 From: A1ex Date: Fri, 15 May 2026 20:40:00 -0400 Subject: [PATCH] fix(backend): use `exhaustMap` for SSE routes --- packages/backend/src/clusters/clusters.controller.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/packages/backend/src/clusters/clusters.controller.ts b/packages/backend/src/clusters/clusters.controller.ts index 7a04fb1..2223296 100644 --- a/packages/backend/src/clusters/clusters.controller.ts +++ b/packages/backend/src/clusters/clusters.controller.ts @@ -24,7 +24,7 @@ import { ApiTags, ApiUnauthorizedResponse, } from '@nestjs/swagger'; -import { Observable, timer, switchMap, from } from 'rxjs'; +import { Observable, timer, exhaustMap, map, from } from 'rxjs'; import { AuthGuard } from '../auth/guards/jwt.guard.js'; @@ -63,8 +63,8 @@ export class ClustersController { @ApiProduces('text/event-stream') streamAll(@Query() query: SseIntervalQueryZodDto): Observable { return timer(0, query.interval * 1000).pipe( - switchMap(() => from(this.clustersService.findAll())), - switchMap((clusters) => [{ data: clusters } as MessageEvent]), + exhaustMap(() => from(this.clustersService.findAll())), + map((clusters) => ({ data: clusters }) as MessageEvent), ); } @@ -202,8 +202,8 @@ export class ClustersController { }) streamStats(@Param('id', ParseIntPipe) id: number, @Query() query: SseIntervalQueryZodDto): Observable { return timer(0, query.interval * 1000).pipe( - switchMap(() => from(this.clustersService.stats(id))), - switchMap((stats) => [{ data: stats } as MessageEvent]), + exhaustMap(() => from(this.clustersService.stats(id))), + map((stats) => ({ data: stats }) as MessageEvent), ); } }