diff --git a/packages/browser-rum-core/src/boot/startRum.ts b/packages/browser-rum-core/src/boot/startRum.ts index 34ddb94cfe..e7c240b72d 100644 --- a/packages/browser-rum-core/src/boot/startRum.ts +++ b/packages/browser-rum-core/src/boot/startRum.ts @@ -17,6 +17,8 @@ import { startGlobalContext, startUserContext, startTabContext, + isExperimentalFeatureEnabled, + ExperimentalFeature, } from '@datadog/browser-core' import { createDOMMutationObservable } from '../browser/domMutationObservable' import { createWindowOpenObservable } from '../browser/windowOpenObservable' @@ -24,6 +26,7 @@ import { startInternalContext } from '../domain/contexts/internalContext' import { LifeCycle, LifeCycleEventType } from '../domain/lifeCycle' import { startViewHistory } from '../domain/contexts/viewHistory' import { startRequestCollection } from '../domain/requestCollection' +import { startWebSocketCollection } from '../domain/webSocketCollection' import { startActionCollection } from '../domain/action/actionCollection' import { startErrorCollection } from '../domain/error/errorCollection' import { startResourceCollection } from '../domain/resource/resourceCollection' @@ -229,6 +232,16 @@ export function startRumEventCollection( const vitalCollection = startVitalCollection(lifeCycle, pageStateHistory) + if (configuration.trackWebSockets && isExperimentalFeatureEnabled(ExperimentalFeature.TRACK_WEB_SOCKETS)) { + const webSocketCollection = startWebSocketCollection( + lifeCycle, + configuration, + viewHistory, + vitalCollection.addDurationVital + ) + cleanupTasks.push(webSocketCollection.stop) + } + const internalContext = startInternalContext( configuration.applicationId, sessionManager, diff --git a/packages/browser-rum-core/src/domain/lifeCycle.ts b/packages/browser-rum-core/src/domain/lifeCycle.ts index 94a3c7d763..6662efe9ec 100644 --- a/packages/browser-rum-core/src/domain/lifeCycle.ts +++ b/packages/browser-rum-core/src/domain/lifeCycle.ts @@ -3,6 +3,7 @@ import { AbstractLifeCycle } from '@datadog/browser-core' import type { RumEventDomainContext } from '../domainContext.types' import type { RawRumEvent, AssembledRumEvent } from '../rawRumEvent.types' import type { RequestCompleteEvent, RequestStartEvent } from './requestCollection' +import type { WebSocketCompleteEvent } from './webSocketCollection' import type { AutoAction } from './action/actionCollection' import type { ViewEvent, ViewCreatedEvent, ViewEndedEvent, BeforeViewUpdateEvent } from './view/trackViews' import type { DurationVitalStart } from './vital/vitalCollection' @@ -21,6 +22,7 @@ export const enum LifeCycleEventType { AFTER_VIEW_ENDED, REQUEST_STARTED, REQUEST_COMPLETED, + WEBSOCKET_COMPLETED, // The SESSION_EXPIRED lifecycle event has been introduced to represent when a session has expired // and trigger cleanup tasks related to this, prior to renewing the session. Its implementation is @@ -66,6 +68,7 @@ declare const LifeCycleEventTypeAsConst: { AFTER_VIEW_ENDED: LifeCycleEventType.AFTER_VIEW_ENDED REQUEST_STARTED: LifeCycleEventType.REQUEST_STARTED REQUEST_COMPLETED: LifeCycleEventType.REQUEST_COMPLETED + WEBSOCKET_COMPLETED: LifeCycleEventType.WEBSOCKET_COMPLETED SESSION_EXPIRED: LifeCycleEventType.SESSION_EXPIRED SESSION_RENEWED: LifeCycleEventType.SESSION_RENEWED PAGE_MAY_EXIT: LifeCycleEventType.PAGE_MAY_EXIT @@ -88,6 +91,7 @@ export interface LifeCycleEventMap { [LifeCycleEventTypeAsConst.AFTER_VIEW_ENDED]: ViewEndedEvent [LifeCycleEventTypeAsConst.REQUEST_STARTED]: RequestStartEvent [LifeCycleEventTypeAsConst.REQUEST_COMPLETED]: RequestCompleteEvent + [LifeCycleEventTypeAsConst.WEBSOCKET_COMPLETED]: WebSocketCompleteEvent [LifeCycleEventTypeAsConst.SESSION_EXPIRED]: void [LifeCycleEventTypeAsConst.SESSION_RENEWED]: void [LifeCycleEventTypeAsConst.PAGE_MAY_EXIT]: PageMayExitEvent diff --git a/packages/browser-rum-core/src/domain/resource/matchRequestResourceEntry.ts b/packages/browser-rum-core/src/domain/resource/matchRequestResourceEntry.ts new file mode 100644 index 0000000000..ae6404b667 --- /dev/null +++ b/packages/browser-rum-core/src/domain/resource/matchRequestResourceEntry.ts @@ -0,0 +1,64 @@ +import type { Duration, RelativeTime } from '@datadog/browser-core' +import { addDuration } from '@datadog/browser-core' +import type { RumPerformanceResourceTiming } from '../../browser/performanceObservable' +import type { RequestCompleteEvent } from '../requestCollection' +import { hasValidResourceEntryDuration, hasValidResourceEntryTimings } from './resourceUtils' + +interface Timing { + startTime: RelativeTime + duration: Duration +} + +const alreadyMatchedEntries = new WeakSet() + +/** + * Look for corresponding timing in resource timing buffer + * + * Observations: + * - Timing (start, end) are nested inside the request (start, end) + * - Some timing can be not exactly nested, being off by < 1 ms + * + * Strategy: + * - from valid nested entries (with 1 ms error margin) + * - filter out timing that were already matched to a request + * - then, if a single timing match, return the timing + * - otherwise we can't decide, return undefined + */ +export function matchRequestResourceEntry(request: RequestCompleteEvent) { + if (!performance || !('getEntriesByName' in performance)) { + return + } + const sameNameEntries = performance.getEntriesByName(request.url, 'resource') as RumPerformanceResourceTiming[] + + if (!sameNameEntries.length || !('toJSON' in sameNameEntries[0])) { + return + } + + const candidates = sameNameEntries + .filter((entry) => !alreadyMatchedEntries.has(entry)) + .filter((entry) => hasValidResourceEntryDuration(entry) && hasValidResourceEntryTimings(entry)) + .filter((entry) => + isBetween( + entry, + request.startClocks.relative, + endTime({ startTime: request.startClocks.relative, duration: request.duration }) + ) + ) + + if (candidates.length === 1) { + alreadyMatchedEntries.add(candidates[0]) + + return candidates[0].toJSON() as RumPerformanceResourceTiming + } + + return +} + +function endTime(timing: Timing) { + return addDuration(timing.startTime, timing.duration) +} + +function isBetween(timing: Timing, start: RelativeTime, end: RelativeTime) { + const errorMargin = 1 as Duration + return timing.startTime >= start - errorMargin && endTime(timing) <= addDuration(end, errorMargin) +} diff --git a/packages/browser-rum-core/src/domain/resource/resourceCollection.spec.ts b/packages/browser-rum-core/src/domain/resource/resourceCollection.spec.ts index 419cd1eafc..923a87b2f6 100644 --- a/packages/browser-rum-core/src/domain/resource/resourceCollection.spec.ts +++ b/packages/browser-rum-core/src/domain/resource/resourceCollection.spec.ts @@ -22,6 +22,7 @@ import { LifeCycle, LifeCycleEventType } from '../lifeCycle' import type { RequestCompleteEvent } from '../requestCollection' import { getDocumentTraceId } from '../tracing/getDocumentTraceId' import { createSpanIdentifier, createTraceIdentifier } from '../tracing/identifier' +import type { WebSocketCompleteEvent } from '../webSocketCollection' import { REQUEST_MATCHING_DELAY, startResourceCollection } from './resourceCollection' function buildMatchHeadersForAllUrls(headerNames: MatchOption[]): MatchHeader[] { @@ -1278,6 +1279,129 @@ describe('resourceCollection', () => { }) }) + describe('websocket', () => { + const wsUrl = 'wss://example.com/socket' + const ONE_MILLISECOND_IN_NANOSECONDS = 1e6 + + function toServerDurationFromMs(durationInMilliseconds: number): ServerDuration { + return (durationInMilliseconds * ONE_MILLISECOND_IN_NANOSECONDS) as ServerDuration + } + + function getRawWebsocketResourceEvent(index = 0): RawRumResourceEvent { + return rawRumEvents[index].rawRumEvent as RawRumResourceEvent + } + + function getWebsocketResource(index = 0) { + return getRawWebsocketResourceEvent(index).resource + } + + function notifyWebSocket(overrides: Partial = {}) { + const defaultStartTime = 1_700_000_000_000 as TimeStamp + const defaultStartRelativeTime = 200 as RelativeTime + const defaultEndTime = 1_700_000_005_000 as TimeStamp + const defaultEndRelativeTime = 5_200 as RelativeTime + const defaultMessagesIn = { count: 3, size: 300 } + const defaultMessagesOut = { count: 2, size: 200 } + const defaultCloseCode = 1000 + + const event: WebSocketCompleteEvent = { + connectionId: 'connection-uuid', + url: wsUrl, + startClocks: { relative: defaultStartRelativeTime, timeStamp: defaultStartTime }, + endClocks: { relative: defaultEndRelativeTime, timeStamp: defaultEndTime }, + messagesIn: defaultMessagesIn, + messagesOut: defaultMessagesOut, + longestSilence: 0 as Duration, + bufferedAmountMax: 0, + handshakeSucceeded: false, + trackingEndReason: 'close_event', + closeCode: defaultCloseCode, + closeReason: 'bye', + wasClean: true, + ...overrides, + } + lifeCycle.notify(LifeCycleEventType.WEBSOCKET_COMPLETED, event) + runTasks() + return event + } + + it('emits a resource event with type=websocket on close', () => { + setupResourceCollection() + + const protocol = 'chat.v1' + const viewId = 'view-1' + const timeToFirstMessageIn = 10 as Duration + const timeToFirstMessageOut = 25 as Duration + const lastMessageAt = 1_700_000_004_000 as TimeStamp + const longestSilence = 200 as Duration + const bufferedAmountMax = 1024 + const idleDurationBeforeClose = 1000 as Duration + const setupDuration = 42 as Duration + + const event = notifyWebSocket({ + protocol, + startViewId: viewId, + endViewId: viewId, + firstMessageInOffset: timeToFirstMessageIn, + firstMessageOutOffset: timeToFirstMessageOut, + lastMessageAt, + longestSilence, + bufferedAmountMax, + idleDurationBeforeClose, + setupDuration, + handshakeSucceeded: true, + }) + + const expectedEventCount = 1 + const expectedResourceDuration = toServerDurationFromMs(event.endClocks.relative - event.startClocks.relative) + + expect(rawRumEvents.length).toBe(expectedEventCount) + + const rawEvent = getRawWebsocketResourceEvent() + expect(rawEvent.resource.type).toBe(ResourceType.WEBSOCKET) + expect(rawEvent.resource.status_code).toBeUndefined() + expect(rawEvent.resource.url).toBe(wsUrl) + expect(rawEvent.resource.duration).toBe(expectedResourceDuration) + expect(rawEvent.date).toBe(event.startClocks.timeStamp) + expect(rawEvent.resource.websocket).toEqual({ + connection_id: event.connectionId, + handshake_succeeded: true, + start_time: event.startClocks.timeStamp, + end_time: event.endClocks.timeStamp, + start_view_id: viewId, + end_view_id: viewId, + tracking_end_reason: 'close_event', + close_code: event.closeCode, + close_reason: 'bye', + was_clean: true, + messages_in: event.messagesIn, + messages_out: event.messagesOut, + time_to_first_message_in: timeToFirstMessageIn, + time_to_first_message_out: timeToFirstMessageOut, + last_message_at: lastMessageAt, + longest_silence: longestSilence, + idle_duration_before_close: idleDurationBeforeClose, + buffered_amount_max: bufferedAmountMax, + protocol, + setup_duration: setupDuration, + }) + }) + + it('emits an event spanning two views', () => { + setupResourceCollection() + const startViewId = 'view-a' + const endViewId = 'view-b' + notifyWebSocket({ startViewId, endViewId }) + + const expectedEventCount = 1 + expect(rawRumEvents.length).toBe(expectedEventCount) + + const websocket = getWebsocketResource().websocket! + expect(websocket.start_view_id).toBe(startViewId) + expect(websocket.end_view_id).toBe(endViewId) + }) + }) + function runTasks() { // Request-type entries are queued through a `setTimeout(…, REQUEST_MATCHING_DELAY)` before // they reach the task queue — advance past it so they get pushed. diff --git a/packages/browser-rum-core/src/domain/resource/resourceCollection.ts b/packages/browser-rum-core/src/domain/resource/resourceCollection.ts index 35ad6d1cc5..37b95f4602 100644 --- a/packages/browser-rum-core/src/domain/resource/resourceCollection.ts +++ b/packages/browser-rum-core/src/domain/resource/resourceCollection.ts @@ -1,46 +1,50 @@ import type { Duration } from '@datadog/browser-core' import { + addTelemetryDebug, combine, - generateUUID, - toServerDuration, - relativeToClocks, createTaskQueue, + display, + elapsed, + generateUUID, + matchList, mockable, + relativeToClocks, + RequestType, + ResourceType, runOnReadyState, - matchList, safeTruncate, - display, - addTelemetryDebug, - RequestType, setTimeout, + toServerDuration, } from '@datadog/browser-core' -import type { MatchHeader, RumConfiguration } from '../configuration' -import { RumPerformanceEntryType, createPerformanceObservable } from '../../browser/performanceObservable' +import { createPerformanceObservable, RumPerformanceEntryType } from '../../browser/performanceObservable' +import { getNavigationEntry } from '../../browser/performanceUtils' import type { RumResourceEventDomainContext } from '../../domainContext.types' import type { NetworkHeaders, RawRumResourceEvent, ResourceRequest, ResourceResponse } from '../../rawRumEvent.types' import { RumEventType } from '../../rawRumEvent.types' -import type { RawRumEventCollectedData, LifeCycle } from '../lifeCycle' +import type { MatchHeader, RumConfiguration } from '../configuration' +import { startEventTracker } from '../eventTracker' +import { extractRegexMatch } from '../extractRegexMatch' +import type { LifeCycle, RawRumEventCollectedData } from '../lifeCycle' import { LifeCycleEventType } from '../lifeCycle' import type { RequestCompleteEvent } from '../requestCollection' -import { createSpanIdentifier } from '../tracing/identifier' import { getDocumentTraceId } from '../tracing/getDocumentTraceId' -import { getNavigationEntry } from '../../browser/performanceUtils' -import { startEventTracker } from '../eventTracker' -import { extractRegexMatch } from '../extractRegexMatch' +import { createSpanIdentifier } from '../tracing/identifier' +import type { WebSocketCompleteEvent } from '../webSocketCollection' +import type { GraphQlMetadata } from './graphql' +import { extractGraphQlMetadata, findGraphQlConfiguration } from './graphql' +import { createRequestRegistry } from './requestRegistry' +import type { ResourceLikeEntry } from './resourceUtils' import { + computeResourceEntryDeliveryType, computeResourceEntryDetails, computeResourceEntryDuration, - computeResourceEntryType, - computeResourceEntrySize, computeResourceEntryProtocol, - computeResourceEntryDeliveryType, + computeResourceEntrySize, + computeResourceEntryType, isResourceEntryRequestType, + isResourceEntryWebSocketType, sanitizeIfLongDataUrl, } from './resourceUtils' -import type { ResourceLikeEntry } from './resourceUtils' -import { createRequestRegistry } from './requestRegistry' -import type { GraphQlMetadata } from './graphql' -import { extractGraphQlMetadata, findGraphQlConfiguration } from './graphql' import type { ManualResourceData } from './trackManualResources' import { trackManualResources } from './trackManualResources' @@ -52,11 +56,19 @@ export function startResourceCollection(lifeCycle: LifeCycle, configuration: Rum const taskQueue = mockable(createTaskQueue)() const requestRegistry = createRequestRegistry(lifeCycle) + lifeCycle.subscribe(LifeCycleEventType.WEBSOCKET_COMPLETED, (event: WebSocketCompleteEvent) => { + handleResource(() => assembleWebSocketResource(event)) + }) + const performanceResourceSubscription = createPerformanceObservable(configuration, { type: RumPerformanceEntryType.RESOURCE, buffered: true, }).subscribe((entries) => { for (const entry of entries) { + if (isResourceEntryWebSocketType(entry)) { + continue + } + if (isResourceEntryRequestType(entry)) { // The PerformanceObserver callback can fire before the fetch's resolve microtask runs // (notably on Firefox), so the matching REQUEST_COMPLETED isn't in the registry yet. @@ -109,6 +121,55 @@ export function startResourceCollection(lifeCycle: LifeCycle, configuration: Rum } } +function assembleWebSocketResource( + event: WebSocketCompleteEvent +): RawRumEventCollectedData | undefined { + const duration = elapsed(event.startClocks.timeStamp, event.endClocks.timeStamp) + + const rawRumEvent: RawRumResourceEvent = { + date: event.startClocks.timeStamp, + type: RumEventType.RESOURCE, + resource: { + id: generateUUID(), + type: ResourceType.WEBSOCKET, + url: event.url, + duration: toServerDuration(duration), + websocket: { + connection_id: event.connectionId, + handshake_succeeded: event.handshakeSucceeded, + start_time: event.startClocks.timeStamp, + end_time: event.endClocks.timeStamp, + start_view_id: event.startViewId, + end_view_id: event.endViewId, + tracking_end_reason: event.trackingEndReason, + close_code: event.closeCode, + close_reason: event.closeReason, + was_clean: event.wasClean, + messages_in: event.messagesIn, + messages_out: event.messagesOut, + time_to_first_message_in: event.firstMessageInOffset, + time_to_first_message_out: event.firstMessageOutOffset, + last_message_at: event.lastMessageAt, + longest_silence: event.longestSilence, + idle_duration_before_close: event.idleDurationBeforeClose, + buffered_amount_max: event.bufferedAmountMax, + protocol: event.protocol, + setup_duration: event.setupDuration, + }, + }, + _dd: { + discarded: false, + }, + } + + return { + startClocks: event.startClocks, + duration, + rawRumEvent, + domainContext: {}, + } +} + function assembleResource( entry: ResourceLikeEntry, request: RequestCompleteEvent | undefined, diff --git a/packages/browser-rum-core/src/domain/resource/resourceUtils.ts b/packages/browser-rum-core/src/domain/resource/resourceUtils.ts index 11d68b5a9d..d46fafd3ee 100644 --- a/packages/browser-rum-core/src/domain/resource/resourceUtils.ts +++ b/packages/browser-rum-core/src/domain/resource/resourceUtils.ts @@ -73,6 +73,10 @@ export function isResourceEntryRequestType(entry: ResourceLikeEntry): entry is R return entry.initiatorType === 'xmlhttprequest' || entry.initiatorType === 'fetch' } +export function isResourceEntryWebSocketType(entry: RumPerformanceResourceTiming) { + return /^wss?:\/\//i.test(entry.name) +} + export function computeResourceEntryDuration(entry: ResourceLikeEntry): Duration { const { duration, startTime, responseEnd } = entry // For navigation timing, `duration` is the page load time rather than the response time, diff --git a/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts b/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts new file mode 100644 index 0000000000..e623bca6ac --- /dev/null +++ b/packages/browser-rum-core/src/domain/webSocketCollection.spec.ts @@ -0,0 +1,460 @@ +import type { ClocksState, Duration, RelativeTime, WebSocketContext } from '@datadog/browser-core' +import { elapsed, initWebSocketObservable, Observable, relativeToClocks } from '@datadog/browser-core' +import { registerCleanupTask } from '@datadog/browser-core/test' +import { mockRumConfiguration, mockViewHistory } from '../../test' +import { VitalType } from '../rawRumEvent.types' +import type { ViewHistoryEntry } from './contexts/viewHistory' +import { LifeCycle, LifeCycleEventType } from './lifeCycle' +import type { DurationVital } from './vital/vitalCollection' +import type { WebSocketCompleteEvent } from './webSocketCollection' +import { startWebSocketCollection, trackWebSocket, WEBSOCKET_CONNECTING_VITAL_NAME } from './webSocketCollection' + +describe('webSocketCollection', () => { + let lifeCycle: LifeCycle + let wsObservable: Observable + let completed: WebSocketCompleteEvent[] + let wsInstance: WebSocket + + beforeEach(() => { + lifeCycle = new LifeCycle() + wsObservable = new Observable() + completed = [] + wsInstance = {} as WebSocket + lifeCycle.subscribe(LifeCycleEventType.WEBSOCKET_COMPLETED, (webSocket) => { + completed.push(webSocket) + }) + }) + + function startTracking( + viewHistory = mockViewHistory(), + addDurationVital: (vital: DurationVital) => void = jasmine.createSpy() + ) { + return trackWebSocket(lifeCycle, wsObservable, viewHistory, addDurationVital) + } + + function notifyConnecting( + startRelative = 0, + url = 'wss://example.com/socket', + startClocks?: ClocksState, + protocols?: string | string[] + ) { + wsObservable.notify({ + state: 'connecting', + instance: wsInstance, + url, + ...(protocols !== undefined ? { protocols } : {}), + startClocks: startClocks ?? relativeToClocks(startRelative as RelativeTime), + }) + } + + function notifyOpen(openRelative = 10, protocol = '', openClocks?: ClocksState) { + wsObservable.notify({ + state: 'open', + instance: wsInstance, + openClocks: openClocks ?? relativeToClocks(openRelative as RelativeTime), + protocol, + }) + } + + function notifyMessageIn(at: number, size: number) { + wsObservable.notify({ state: 'message-in', instance: wsInstance, size, at: relativeToClocks(at as RelativeTime) }) + } + + function notifyMessageOut(at: number, size: number, bufferedAmountPreSend = 0) { + wsObservable.notify({ + state: 'message-out', + instance: wsInstance, + size, + bufferedAmountPreSend, + at: relativeToClocks(at as RelativeTime), + }) + } + + function notifyClose(at: number, code: number, reason: string, wasClean: boolean, atClocks?: ClocksState) { + wsObservable.notify({ + state: 'close', + instance: wsInstance, + code, + reason, + wasClean, + at: atClocks ?? relativeToClocks(at as RelativeTime), + }) + } + + it('emits a completed event on close with tracking_end_reason="close_event"', () => { + const url = 'wss://example.com/socket' + const protocol = 'chat.v1' + const messageInSize = 100 + const messageOutSize = 50 + const bufferedAmount = 8 + const closeCode = 1000 + const closeReason = 'bye' + + startTracking() + notifyConnecting(0, url) + notifyOpen(10, protocol) + notifyMessageIn(20, messageInSize) + notifyMessageOut(30, messageOutSize, bufferedAmount) + notifyClose(40, closeCode, closeReason, true) + + expect(completed.length).toBe(1) + const webSocket = completed[0] + expect(webSocket.trackingEndReason).toBe('close_event') + expect(webSocket.closeCode).toBe(closeCode) + expect(webSocket.closeReason).toBe(closeReason) + expect(webSocket.wasClean).toBeTrue() + expect(webSocket.url).toBe(url) + expect(webSocket.protocol).toBe(protocol) + expect(webSocket.messagesIn).toEqual({ count: 1, size: messageInSize }) + expect(webSocket.messagesOut).toEqual({ count: 1, size: messageOutSize }) + expect(webSocket.bufferedAmountMax).toBe(bufferedAmount) + }) + + it('generates a unique connection_id per connection', () => { + startTracking() + notifyConnecting() + notifyClose(1, 1000, 'a', true) + const firstId = completed[0].connectionId + + wsInstance = {} as WebSocket + notifyConnecting() + notifyClose(1, 1000, 'b', true) + + expect(completed[1].connectionId).not.toBe(firstId) + }) + + it('records firstMessageInOffset / firstMessageOutOffset as offsets from open', () => { + const openAt = 10 + const firstMessageInAt = 13 + const firstMessageOutAt = 17 + + startTracking() + notifyConnecting() + notifyOpen(openAt) + notifyMessageIn(firstMessageInAt, 1) + notifyMessageIn(25, 1) // not first; should not update + notifyMessageOut(firstMessageOutAt, 1) + notifyClose(30, 1000, 'bye', true) + + const webSocket = completed[0] + expect(webSocket.firstMessageInOffset).toBe((firstMessageInAt - openAt) as Duration) + expect(webSocket.firstMessageOutOffset).toBe((firstMessageOutAt - openAt) as Duration) + }) + + it('tracks longestSilence across consecutive messages (in or out)', () => { + const previousMessageAt = 25 + const longestGap = 15 + + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(20, 1) // first - no gap + notifyMessageOut(previousMessageAt, 1) // gap = 5 + notifyMessageIn(previousMessageAt + longestGap, 1) // gap = longestGap (max) + notifyMessageOut(50, 1) // gap = 10 + notifyClose(100, 1000, 'bye', true) + + expect(completed[0].longestSilence).toBe(longestGap as Duration) + }) + + it('records idleDurationBeforeClose from last message to close', () => { + const lastMessageAt = 20 + const closeAt = 50 + + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(lastMessageAt, 1) + notifyClose(closeAt, 1000, 'bye', true) + + expect(completed[0].idleDurationBeforeClose).toBe((closeAt - lastMessageAt) as Duration) + }) + + it('leaves idleDurationBeforeClose undefined when no message was received', () => { + startTracking() + notifyConnecting() + notifyOpen(10) + notifyClose(50, 1000, 'bye', true) + + expect(completed[0].idleDurationBeforeClose).toBeUndefined() + }) + + it('records setupDuration as elapsed time from connecting to open', () => { + const startAt = 0 as RelativeTime + const openAt = 10 as RelativeTime + const startClocks = relativeToClocks(startAt) + const openClocks = relativeToClocks(openAt) + const expectedSetupDuration = elapsed(startClocks.timeStamp, openClocks.timeStamp) + + startTracking() + notifyConnecting(startAt, 'wss://example.com/socket', startClocks) + notifyOpen(openAt, '', openClocks) + notifyClose(40, 1000, 'bye', true) + + expect(completed[0].setupDuration).toBe(expectedSetupDuration) + }) + + describe('handshakeSucceeded', () => { + it('is true when the open event fired before completion', () => { + startTracking() + notifyConnecting() + notifyOpen(10) + notifyClose(40, 1000, 'bye', true) + expect(completed[0].handshakeSucceeded).toBeTrue() + + const tracker = startTracking() + notifyConnecting() + notifyOpen(10) + tracker.flushOpenConnections('session_end') + expect(completed[1].handshakeSucceeded).toBeTrue() + }) + + it('is false when the open event never fired before completion', () => { + startTracking() + notifyConnecting() + notifyClose(25, 1006, 'abnormal', false) + expect(completed[0].handshakeSucceeded).toBeFalse() + + const tracker = startTracking() + notifyConnecting() + tracker.flushOpenConnections('session_end') + expect(completed[1].handshakeSucceeded).toBeFalse() + }) + }) + + it('records setupDuration as elapsed time from connecting to close when open never fires', () => { + const startAt = 0 as RelativeTime + const closeAt = 25 as RelativeTime + const closeCode = 1006 + const closeReason = 'abnormal' + const startClocks = relativeToClocks(startAt) + const closeClocks = relativeToClocks(closeAt) + const expectedSetupDuration = elapsed(startClocks.timeStamp, closeClocks.timeStamp) + + startTracking() + notifyConnecting(startAt, 'wss://example.com/socket', startClocks) + notifyClose(closeAt, closeCode, closeReason, false, closeClocks) + + expect(completed[0].setupDuration).toBe(expectedSetupDuration) + }) + + it('records setupDuration on session_end flush when the connection never opened', () => { + const tracker = startTracking() + notifyConnecting() + tracker.flushOpenConnections('session_end') + + const webSocket = completed[0] + expect(webSocket.setupDuration).toBe(elapsed(webSocket.startClocks.timeStamp, webSocket.endClocks.timeStamp)) + }) + + it('does not extend setupDuration on session_end flush after open', () => { + const startAt = 0 as RelativeTime + const openAt = 10 as RelativeTime + const startClocks = relativeToClocks(startAt) + const openClocks = relativeToClocks(openAt) + const expectedSetupDuration = elapsed(startClocks.timeStamp, openClocks.timeStamp) + + const tracker = startTracking() + notifyConnecting(startAt, 'wss://example.com/socket', startClocks) + notifyOpen(openAt, '', openClocks) + tracker.flushOpenConnections('session_end') + + expect(completed[0].setupDuration).toBe(expectedSetupDuration) + }) + + it('samples buffered_amount_max from message-out events', () => { + const peakBufferedAmount = 100 + + startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageOut(20, 1, 10) + notifyMessageOut(30, 1, peakBufferedAmount) + notifyMessageOut(40, 1, 50) + notifyClose(50, 1000, 'bye', true) + + expect(completed[0].bufferedAmountMax).toBe(peakBufferedAmount) + }) + + it('captures startViewId and endViewId from viewHistory', () => { + const viewByRelative: Record = { + 0: { id: 'view-start', startClocks: relativeToClocks(0 as RelativeTime) }, + 100: { id: 'view-end', startClocks: relativeToClocks(100 as RelativeTime) }, + } + const viewHistory = mockViewHistory() + spyOn(viewHistory, 'findView').and.callFake((startTime?: RelativeTime) => + startTime !== undefined ? viewByRelative[startTime as number] : undefined + ) + + startTracking(viewHistory) + notifyConnecting() + notifyClose(100, 1000, 'bye', true) + + const webSocket = completed[0] + expect(webSocket.startViewId).toBe('view-start') + expect(webSocket.endViewId).toBe('view-end') + }) + + it('flushOpenConnections finalizes still-open connections with tracking_end_reason="session_end"', () => { + const tracker = startTracking() + notifyConnecting() + notifyOpen(10) + notifyMessageIn(20, 1) + + tracker.flushOpenConnections('session_end') + + expect(completed.length).toBe(1) + expect(completed[0].trackingEndReason).toBe('session_end') + expect(completed[0].handshakeSucceeded).toBeTrue() + expect(completed[0].closeCode).toBeUndefined() + expect(completed[0].closeReason).toBeUndefined() + expect(completed[0].wasClean).toBeUndefined() + }) + + it('does not finalize twice when close arrives after flushOpenConnections', () => { + const tracker = startTracking() + notifyConnecting() + notifyOpen(10) + tracker.flushOpenConnections('session_end') + notifyClose(20, 1000, 'bye', true) + + expect(completed.length).toBe(1) + expect(completed[0].trackingEndReason).toBe('session_end') + }) + + it('stop() unsubscribes from the observable and ignores further events', () => { + const tracker = startTracking() + notifyConnecting() + tracker.stop() + notifyClose(20, 1000, 'bye', true) + + expect(completed.length).toBe(0) + }) + + describe('websocket-connecting vital', () => { + it('emits a duration-0 vital on connecting when trackWebSockets is true', () => { + const addDurationVital = jasmine.createSpy<(vital: DurationVital) => void>() + startTracking(mockViewHistory(), addDurationVital) + notifyConnecting() + + expect(addDurationVital).toHaveBeenCalledOnceWith( + jasmine.objectContaining({ + name: WEBSOCKET_CONNECTING_VITAL_NAME, + type: VitalType.DURATION, + duration: 0, + }) + ) + }) + + it('uses the same id as the subsequent WEBSOCKET_COMPLETED connectionId', () => { + const addDurationVital = jasmine.createSpy<(vital: DurationVital) => void>() + startTracking(mockViewHistory(), addDurationVital) + notifyConnecting() + notifyClose(1, 1000, 'bye', true) + + expect(addDurationVital).toHaveBeenCalledOnceWith(jasmine.objectContaining({ id: completed[0].connectionId })) + }) + + it('includes url, protocols, and startViewId in the vital context', () => { + const viewByRelative: Record = { + 0: { id: 'view-start', startClocks: relativeToClocks(0 as RelativeTime) }, + } + const viewHistory = mockViewHistory() + spyOn(viewHistory, 'findView').and.callFake((startTime?: RelativeTime) => + startTime !== undefined ? viewByRelative[startTime as number] : undefined + ) + const addDurationVital = jasmine.createSpy<(vital: DurationVital) => void>() + const url = 'wss://example.com/socket' + const protocols = ['chat.v1', 'json'] + + startTracking(viewHistory, addDurationVital) + notifyConnecting(0, url, undefined, protocols) + + expect(addDurationVital).toHaveBeenCalledOnceWith( + jasmine.objectContaining({ + context: { + url, + protocols, + startViewId: 'view-start', + }, + }) + ) + }) + }) + + describe('startWebSocketCollection', () => { + const wsInstance = {} as WebSocket + const wsUrl = 'wss://example.com/socket' + + function notifyConnectionConnecting(startRelative = 0 as RelativeTime) { + initWebSocketObservable({}).notify({ + state: 'connecting', + instance: wsInstance, + url: wsUrl, + startClocks: relativeToClocks(startRelative), + }) + } + + it('finalizes open connections with tracking_end_reason="session_end" when the session expires', () => { + const collection = startWebSocketCollection( + lifeCycle, + mockRumConfiguration(), + mockViewHistory(), + jasmine.createSpy() + ) + registerCleanupTask(() => collection.stop()) + notifyConnectionConnecting() + + lifeCycle.notify(LifeCycleEventType.SESSION_EXPIRED) + + expect(completed.length).toBe(1) + expect(completed[0].trackingEndReason).toBe('session_end') + expect(completed[0].handshakeSucceeded).toBeFalse() + expect(completed[0].closeCode).toBeUndefined() + expect(completed[0].closeReason).toBeUndefined() + expect(completed[0].wasClean).toBeUndefined() + }) + + it('finalizes open connections with tracking_end_reason="session_end" when stop() is called', () => { + const collection = startWebSocketCollection( + lifeCycle, + mockRumConfiguration(), + mockViewHistory(), + jasmine.createSpy() + ) + notifyConnectionConnecting() + collection.stop() + + expect(completed.length).toBe(1) + expect(completed[0].trackingEndReason).toBe('session_end') + expect(completed[0].handshakeSucceeded).toBeFalse() + expect(completed[0].closeCode).toBeUndefined() + }) + + it('ignores further WebSocket events from the same instance after stop()', () => { + const collection = startWebSocketCollection( + lifeCycle, + mockRumConfiguration(), + mockViewHistory(), + jasmine.createSpy() + ) + notifyConnectionConnecting() + collection.stop() + + const eventCountAfterStop = completed.length + + // After stop(), the tracker has unsubscribed from the observable: further notifications + // about the same instance must be ignored so we don't double-emit or leak state. + initWebSocketObservable({}).notify({ + state: 'close', + instance: wsInstance, + code: 1000, + reason: 'bye', + wasClean: true, + at: relativeToClocks(1000 as RelativeTime), + }) + + expect(completed.length).toBe(eventCountAfterStop) + }) + }) +}) diff --git a/packages/browser-rum-core/src/domain/webSocketCollection.ts b/packages/browser-rum-core/src/domain/webSocketCollection.ts new file mode 100644 index 0000000000..f5239e4b57 --- /dev/null +++ b/packages/browser-rum-core/src/domain/webSocketCollection.ts @@ -0,0 +1,237 @@ +import type { ClocksState, Duration, Observable, TimeStamp, WebSocketContext } from '@datadog/browser-core' +import { clocksNow, elapsed, generateUUID, initWebSocketObservable, sanitize } from '@datadog/browser-core' +import { VitalType } from '../rawRumEvent.types' +import type { ViewHistory } from './contexts/viewHistory' +import type { RumConfiguration } from './configuration' +import type { LifeCycle } from './lifeCycle' +import { LifeCycleEventType } from './lifeCycle' +import type { DurationVital } from './vital/vitalCollection' + +export const WEBSOCKET_CONNECTING_VITAL_NAME = 'websocket-connecting' + +export type WebSocketTrackingEndReason = 'close_event' | 'session_end' + +export interface WebSocketCompleteEvent { + connectionId: string + url: string + protocol?: string + startClocks: ClocksState + endClocks: ClocksState + startViewId?: string + endViewId?: string + messagesIn: { count: number; size: number } + messagesOut: { count: number; size: number } + firstMessageInOffset?: Duration + firstMessageOutOffset?: Duration + lastMessageAt?: TimeStamp + longestSilence: Duration + bufferedAmountMax: number + idleDurationBeforeClose?: Duration + closeCode?: number + closeReason?: string + wasClean?: boolean + handshakeSucceeded: boolean + trackingEndReason: WebSocketTrackingEndReason + setupDuration?: Duration +} + +interface WebSocketConnection { + connectionId: string + url: string + protocol?: string + startClocks: ClocksState + openClocks?: ClocksState + startViewId?: string + messagesIn: { count: number; size: number } + messagesOut: { count: number; size: number } + firstMessageInOffset?: Duration + firstMessageOutOffset?: Duration + lastMessageAt?: TimeStamp + longestSilence: Duration + bufferedAmountMax: number + setupDuration?: Duration +} + +export interface WebSocketConnectionTracker { + flushOpenConnections: (reason: WebSocketTrackingEndReason) => void + stop: () => void +} + +export function startWebSocketCollection( + lifeCycle: LifeCycle, + configuration: RumConfiguration, + viewHistory: ViewHistory, + addDurationVital: (vital: DurationVital) => void +) { + const tracker = trackWebSocket( + lifeCycle, + initWebSocketObservable({ allowUntrustedEvents: configuration.allowUntrustedEvents }), + viewHistory, + addDurationVital + ) + + const sessionExpiredSubscription = lifeCycle.subscribe(LifeCycleEventType.SESSION_EXPIRED, () => { + tracker.flushOpenConnections('session_end') + }) + + return { + stop: () => { + sessionExpiredSubscription.unsubscribe() + tracker.flushOpenConnections('session_end') + tracker.stop() + }, + } +} + +export function trackWebSocket( + lifeCycle: LifeCycle, + webSocketContextObservable: Observable, + viewHistory: ViewHistory, + addDurationVital: (vital: DurationVital) => void +): WebSocketConnectionTracker { + const webSocketRegistry = new Map() + + const subscription = webSocketContextObservable.subscribe((context) => { + switch (context.state) { + case 'connecting': { + const connectionId = generateUUID() + const startViewId = viewHistory.findView(context.startClocks.relative)?.id + const webSocket: WebSocketConnection = { + connectionId, + url: context.url, + startClocks: context.startClocks, + startViewId, + messagesIn: { count: 0, size: 0 }, + messagesOut: { count: 0, size: 0 }, + longestSilence: 0 as Duration, + bufferedAmountMax: 0, + } + webSocketRegistry.set(context.instance, webSocket) + + addDurationVital({ + id: connectionId, + name: WEBSOCKET_CONNECTING_VITAL_NAME, + type: VitalType.DURATION, + startClocks: context.startClocks, + duration: 0 as Duration, + context: sanitize({ + url: context.url, + ...(context.protocols !== undefined ? { protocols: context.protocols } : {}), + ...(startViewId !== undefined ? { startViewId } : {}), + }), + }) + return + } + case 'open': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + webSocket.openClocks = context.openClocks + webSocket.protocol = context.protocol + webSocket.setupDuration = elapsed(webSocket.startClocks.timeStamp, context.openClocks.timeStamp) + return + } + case 'message-in': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + webSocket.messagesIn.count += 1 + webSocket.messagesIn.size += context.size + recordMessageTiming(webSocket, context.at, 'in') + return + } + case 'message-out': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + webSocket.messagesOut.count += 1 + webSocket.messagesOut.size += context.size + if (context.bufferedAmountPreSend > webSocket.bufferedAmountMax) { + webSocket.bufferedAmountMax = context.bufferedAmountPreSend + } + recordMessageTiming(webSocket, context.at, 'out') + return + } + case 'close': { + const webSocket = webSocketRegistry.get(context.instance) + if (!webSocket) { + return + } + webSocketRegistry.delete(context.instance) + lifeCycle.notify(LifeCycleEventType.WEBSOCKET_COMPLETED, buildCompletedEvent(webSocket, context, 'close_event')) + return + } + } + }) + + function buildCompletedEvent( + webSocket: WebSocketConnection, + endInfo: { at: ClocksState; code?: number; reason?: string; wasClean?: boolean }, + trackingEndReason: WebSocketTrackingEndReason + ): WebSocketCompleteEvent { + const endClocks = endInfo.at + const endViewId = viewHistory.findView(endClocks.relative)?.id + const idleDurationBeforeClose = + webSocket.lastMessageAt !== undefined ? elapsed(webSocket.lastMessageAt, endClocks.timeStamp) : undefined + + return { + connectionId: webSocket.connectionId, + url: webSocket.url, + protocol: webSocket.protocol, + startClocks: webSocket.startClocks, + endClocks, + startViewId: webSocket.startViewId, + endViewId, + messagesIn: webSocket.messagesIn, + messagesOut: webSocket.messagesOut, + firstMessageInOffset: webSocket.firstMessageInOffset, + firstMessageOutOffset: webSocket.firstMessageOutOffset, + lastMessageAt: webSocket.lastMessageAt, + longestSilence: webSocket.longestSilence, + bufferedAmountMax: webSocket.bufferedAmountMax, + idleDurationBeforeClose, + closeCode: endInfo.code, + closeReason: endInfo.reason, + wasClean: endInfo.wasClean, + handshakeSucceeded: webSocket.openClocks !== undefined, + trackingEndReason, + setupDuration: webSocket.setupDuration ?? elapsed(webSocket.startClocks.timeStamp, endClocks.timeStamp), + } + } + + return { + flushOpenConnections: (reason) => { + const at = clocksNow() + webSocketRegistry.forEach((webSocket) => { + lifeCycle.notify(LifeCycleEventType.WEBSOCKET_COMPLETED, buildCompletedEvent(webSocket, { at }, reason)) + }) + webSocketRegistry.clear() + }, + stop: () => { + subscription.unsubscribe() + webSocketRegistry.clear() + }, + } +} + +function recordMessageTiming(webSocket: WebSocketConnection, at: ClocksState, direction: 'in' | 'out') { + if (webSocket.openClocks) { + const offset = elapsed(webSocket.openClocks.timeStamp, at.timeStamp) + if (direction === 'in' && webSocket.firstMessageInOffset === undefined) { + webSocket.firstMessageInOffset = offset + } else if (direction === 'out' && webSocket.firstMessageOutOffset === undefined) { + webSocket.firstMessageOutOffset = offset + } + } + + if (webSocket.lastMessageAt !== undefined) { + const gap = elapsed(webSocket.lastMessageAt, at.timeStamp) + if (gap > webSocket.longestSilence) { + webSocket.longestSilence = gap + } + } + webSocket.lastMessageAt = at.timeStamp +} diff --git a/packages/browser-rum-core/src/domainContext.types.ts b/packages/browser-rum-core/src/domainContext.types.ts index 9b18e87a47..5d81c93d74 100644 --- a/packages/browser-rum-core/src/domainContext.types.ts +++ b/packages/browser-rum-core/src/domainContext.types.ts @@ -9,7 +9,7 @@ export type RumEventDomainContext = T extends type : T extends typeof RumEventType.ACTION ? RumActionEventDomainContext : T extends typeof RumEventType.RESOURCE - ? RumResourceEventDomainContext | RumManualResourceEventDomainContext + ? RumResourceEventDomainContext | RumManualResourceEventDomainContext | RumWebSocketResourceEventDomainContext : T extends typeof RumEventType.ERROR ? RumErrorEventDomainContext : T extends typeof RumEventType.LONG_TASK @@ -48,6 +48,8 @@ export interface RumManualResourceEventDomainContext { isManual: true } +export type RumWebSocketResourceEventDomainContext = Record + export interface RumErrorEventDomainContext { error: unknown handlingStack?: string diff --git a/scripts/dev-server/lib/server.ts b/scripts/dev-server/lib/server.ts index c045a925a8..d19330af71 100644 --- a/scripts/dev-server/lib/server.ts +++ b/scripts/dev-server/lib/server.ts @@ -31,6 +31,7 @@ export function runServer({ writeIntakeFile = true }: { writeIntakeFile?: boolea app.use((_req, res, next) => { res.setHeader('Document-Policy', 'js-profiling') + res.setHeader('Access-Control-Allow-Origin', '*') next() })