A type-safe NestJS integration SDK for the FastBI API. It exposes every API domain — KPIs, reports, streaming, AI analytics, database connections, dashboards, MCP orchestration, and more — as injectable NestJS services with full TypeScript types.
Scope. This SDK is an integration layer. All business logic, data processing, and computation live in the FastBI Go API. The SDK handles authentication, HTTP transport, type safety, and Server-Sent Event (SSE) streaming.
| Requirement | Version |
|---|---|
| Node.js | >= 20.0 |
| NestJS | >= 10.0 |
| TypeScript | >= 5.0 |
npm install @fastbi/sdk-nestjsPeer dependencies (already present in a standard NestJS project):
npm install @nestjs/common @nestjs/core rxjs reflect-metadata// app.module.ts
import { Module } from '@nestjs/common';
import { FastBIModule } from '@fastbi/sdk-nestjs';
@Module({
imports: [
FastBIModule.forRoot({
baseUrl: 'http://fastbi-api.internal:3050',
clientId: process.env.FASTBI_CLIENT_ID!,
secret: process.env.FASTBI_SECRET!,
}),
],
})
export class AppModule {}import { Injectable } from '@nestjs/common';
import { KPIsService } from '@fastbi/sdk-nestjs';
@Injectable()
export class RevenueService {
constructor(private readonly kpis: KPIsService) {}
async getRevenueKPI(org: string) {
return this.kpis.getByCode('REVENUE_GROWTH_RATE', org);
}
}JWT tokens are fetched and refreshed automatically — you never manage tokens manually.
FastBIModule.forRoot({
baseUrl: 'http://localhost:3050', // FastBI API base URL
clientId: 'lts_a7f_5202l', // Client ID from FastBI
secret: 'k9Hp4$mQ!2vN6rT1', // Corresponding secret
tokenRefreshMarginSeconds: 60, // Refresh token 60s before expiry (default: 60)
timeoutMs: 30_000, // Request timeout in ms (default: 30 000)
})// app.module.ts
import { ConfigModule, ConfigService } from '@nestjs/config';
import { FastBIModule } from '@fastbi/sdk-nestjs';
FastBIModule.forRootAsync({
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
baseUrl: config.getOrThrow('FASTBI_BASE_URL'),
clientId: config.getOrThrow('FASTBI_CLIENT_ID'),
secret: config.getOrThrow('FASTBI_SECRET'),
}),
})| Service | Description |
|---|---|
AuthService |
Token generation, validation, and cache management |
KPIsService |
CRUD operations and version history for KPIs |
SchedulesService |
CRON-based report schedule management |
RunsService |
Report run lifecycle and state machine transitions |
NarrativesService |
AI narrative section generation and retrieval |
JobsService |
Report job queue monitoring and dead-letter retry |
KnowledgeBaseService |
TF-IDF knowledge base retrieval and re-indexing |
ArtifactsService |
Excel, CSV, and PDF export and download |
ConnectionsService |
External database registration, querying, and caching |
DashboardsService |
Dashboard templates, dashboards, and widgets |
StreamService |
SSE subscriptions and real-time pipeline management |
AnalyticsService |
Revenue insights, product mix, and forecasting |
AIService |
Statistical AI: customers, financials, KPI deviations, products |
MCPService |
Multi-agent coordinator: route queries, execute goals |
SalesService |
Natural language sales queries and trend prediction |
MLService |
Linear regression training and prediction |
AccountingService |
Fiscal workflow and compliance calendar management |
// Tokens are managed automatically. Manual access is rarely needed.
const token = await authService.getToken();
const result = await authService.validate(someToken);
// result → { message: 'SUCCESS', status: 'AUTHORIZED' }
// Force re-authentication on the next request
authService.invalidate();// Create
const kpi = await kpis.create({
organization_name: 'acme-corp',
code: 'GROSS_MARGIN',
name: 'Gross Margin',
description: 'Gross margin as a percentage of revenue.',
category: 'margin',
unit: 'percentage',
aggregation: 'avg',
comparison_operator: 'gte',
target_value: 40,
alert_enabled: true,
alert_severity: 'high',
reporting_period: 'monthly',
lookback_days: 30,
});
// Read
const fetched = await kpis.getByCode('GROSS_MARGIN', 'acme-corp');
console.log(fetched.status); // 'green' | 'yellow' | 'red'
// Update
await kpis.update('GROSS_MARGIN', 'acme-corp', { target_value: 45 });
// List (paginated)
const page = await kpis.list({ organization_name: 'acme-corp', limit: 20, offset: 0 });
// Version history
const history = await kpis.getHistory('GROSS_MARGIN', 'acme-corp');
// Soft delete
await kpis.delete('GROSS_MARGIN', 'acme-corp');// Create a weekly schedule
const schedule = await schedules.create({
organization_name: 'acme-corp',
name: 'Weekly Executive Report',
cadence: 'weekly',
timezone: 'America/Sao_Paulo',
export_format: 'pdf',
language: 'en',
});
// Trigger immediately (returns a job_id)
const { job_id } = await schedules.triggerNow('Weekly Executive Report', 'acme-corp');
// List active schedules
const page = await schedules.list({ organization_name: 'acme-corp', is_active: true });
// Deactivate
await schedules.deactivate('Weekly Executive Report', 'acme-corp');The FastBI report run follows a state machine:
scheduled → queued → generating → review_pending → approved
→ formatting → exporting → distributing → delivered
↗
failed ← (any stage)
// Create a run
const run = await runs.create({
organization_name: 'acme-corp',
schedule_name: 'Weekly Executive Report',
});
// Transition state
await runs.transition(run.created_at, { to_status: 'generating' }, 'acme-corp');
// Get review details (sections + overrides)
const review = await runs.getReview(run.id, 'acme-corp');
// Approve
await runs.approveReview(run.id, {
organization_name: 'acme-corp',
reviewer_email: 'cfo@acme.com',
comments: 'Approved.',
});
// Reject (returns run to 'generating')
await runs.rejectReview(run.id, {
organization_name: 'acme-corp',
reviewer_email: 'cfo@acme.com',
comments: 'Revenue figures need correction.',
});
// Override a single narrative section
await runs.overrideSection(run.id, 'exec_summary', {
organization_name: 'acme-corp',
override_by_email: 'analyst@acme.com',
override_text: 'Corrected executive summary text.',
override_reason: 'Source data updated after generation.',
});const result = await narratives.generate({
run_id: run.id,
organization_name: 'acme-corp',
top_k: 5,
lookback_days: 30,
sections: [
{ section_id: 'exec_summary', title: 'Executive Summary', section_type: 'executive' },
{ section_id: 'revenue', title: 'Revenue Analysis', section_type: 'revenue' },
],
});
console.log('Overall integrity:', result.overall_integrity_score);
result.sections.forEach((s) => {
if (s.is_flagged) {
console.warn(`Section "${s.section_title}" flagged: ${s.flag_reason}`);
}
});Narrative sections that deviate more than ±5% from source data are flagged with is_flagged: true. The original AI text is always preserved; overrides are tracked in an append-only table.
// Generate exports
await artifacts.renderPdf(run.id, 'acme-corp');
await artifacts.exportExcel(run.id, 'acme-corp');
await artifacts.exportCsv(run.created_at, 'acme-corp');
// Get artifact metadata
const info = await artifacts.getPdfInfo(run.id);
console.log('File size:', info.file_size_bytes);
// Get download URLs (include Authorization header when requesting)
const pdfUrl = artifacts.getPdfDownloadUrl(run.id);
const excelUrl = artifacts.getExcelDownloadUrl(run.id);
const csvUrl = artifacts.getCsvDownloadUrl(run.created_at);// Register an external data warehouse
await connections.register({
organization_name: 'acme-corp',
connection_name: 'prod-dw',
db_type: 'postgres',
host: 'dw.internal.acme.com',
port: 5432,
database_name: 'analytics',
username: 'reader',
password: process.env.DW_PASSWORD!,
ssl_mode: 'require',
max_rows_limit: 5_000,
query_timeout_seconds: 60,
});
// Execute a parameterized, cached query
const result = await connections.query('prod-dw', {
organization_name: 'acme-corp',
query: 'SELECT region, SUM(amount) FROM sales WHERE date >= $1 GROUP BY region',
params: ['2026-01-01'],
use_cache: true,
cache_ttl_seconds: 3_600,
});
// Batch execute
const batch = await connections.batchQuery({
organization_name: 'acme-corp',
queries: [
{ connection_name: 'prod-dw', query: 'SELECT COUNT(*) FROM orders' },
{ connection_name: 'prod-dw', query: 'SELECT AVG(value) FROM orders' },
],
});All streaming methods return a () => void cleanup function. Call it to close the SSE connection.
// Subscribe to multiple topics
const stop = await stream.subscribe(
['kpi', 'anomaly'],
(event) => {
if (event.type === 'kpi.updated') {
console.log(`${event.payload.code}: ${event.payload.current_value}`);
}
if (event.type === 'anomaly.detected') {
console.warn(`Anomaly: ${event.payload.metric_name}`);
}
},
);
// Close when done
stop();
// Subscribe to a single KPI
const stopKPI = await stream.subscribeToKPI('GROSS_MARGIN', handler);
// Subscribe to anomalies for one org
const stopAnomaly = await stream.subscribeToAnomalies('acme-corp', handler);
// Stream report job progress
const stopJob = await stream.subscribeToReportJob(job_id, (event) => {
if (event.type === 'report.progress') {
console.log(`Progress: ${event.payload.progress}%`);
}
if (event.type === 'report.completed') {
console.log('Report ready:', event.payload.result);
}
});
// Create and stream an aggregation pipeline
await stream.createPipeline({
name: 'revenue-60s-sum',
source_topic: 'kpi',
field: 'payload.current_value',
aggregation: 'sum',
window: { type: 'tumbling', size_seconds: 60 },
});
const stopPipeline = await stream.subscribeToPipeline('revenue-60s-sum', (event) => {
console.log('Window sum:', event.payload);
});All AI methods are pure statistical computation — no external LLM calls.
const params = { organization_name: 'acme-corp', start_date: '2026-01-01', end_date: '2026-06-30' };
// Customer analysis
const atRisk = await ai.getAtRiskCustomers(params);
const segments = await ai.getCustomerSegments(params);
const churn = await ai.getChurnSignals(params);
// Financial
const leaks = await ai.getMarginLeaks(params);
const cogs = await ai.getCogsBreakdown(params);
// KPIs
const deviations = await ai.getKPIDeviations(params);
const trends = await ai.getKPITrends(params);
// Products
const bcg = await ai.getBCGMatrix(params);
const underperforming = await ai.getUnderperformingProducts(params);
// Aggregate intelligence
const health = await ai.getHealthScore(params);
const insights = await ai.getActiveInsights(params);// Route a natural language query to the best-matching analytic skills
const skills = await mcp.routeQuery({
organization_name: 'acme-corp',
query: 'Why did Q2 revenue decline in the South region?',
max_skills: 3,
});
// → [{ skill_name, category, description, score }]
// Execute a named goal across multiple skills
const result = await mcp.executeGoal({
organization_name: 'acme-corp',
goal: 'diagnose_decline',
dimension: 'region',
max_skills: 5,
});
// Full analysis (all domains)
const full = await mcp.runFullAnalysis({ organization_name: 'acme-corp' });
// Rebuild skill index after deploying new analytics endpoints
await mcp.rebuildSkillIndex();// Train a linear regression model
const { model_info } = await ml.train({
organization_name: 'acme-corp',
feature_column: 'ad_spend',
target_column: 'revenue',
data: [
{ ad_spend: 1000, revenue: 8500 },
{ ad_spend: 2000, revenue: 17200 },
],
});
console.log('R²:', model_info.r_squared);
// Single prediction
const { prediction } = await ml.predict({ organization_name: 'acme-corp', feature_value: 5000 });
// Batch predictions
const { predictions } = await ml.batchPredict({
organization_name: 'acme-corp',
feature_values: [3000, 5000, 8000],
});Every failed API call throws an Error enriched with two extra properties:
try {
await kpis.getByCode('NONEXISTENT', 'acme-corp');
} catch (err: unknown) {
const e = err as Error & { statusCode?: number; details?: string };
console.error(e.message); // Human-readable error from the API
console.error(e.statusCode); // HTTP status code (404, 401, 409, 500 …)
console.error(e.details); // Optional technical details
}Common status codes:
| Code | Meaning |
|---|---|
| 400 | Validation error — check request body |
| 401 | Token expired or invalid |
| 404 | Resource not found |
| 409 | State machine conflict (invalid run transition) |
| 500 | Server error |
FASTBI_BASE_URL=http://fastbi-api.internal:3050
FASTBI_CLIENT_ID=lts_a7f_5202l
FASTBI_SECRET=k9Hp4$mQ!2vN6rT1# Install dependencies
npm install
# Build the SDK
npm run build
# Run an example (requires a running FastBI API)
FASTBI_BASE_URL=http://localhost:3050 \
FASTBI_CLIENT_ID=lts_a7f_5202l \
FASTBI_SECRET=k9Hp4$mQ!2vN6rT1 \
npx ts-node examples/01-basic-authentication.tsAvailable examples:
| File | What it covers |
|---|---|
01-basic-authentication.ts |
Token generation and validation |
02-kpi-management.ts |
Full KPI CRUD lifecycle |
03-report-lifecycle.ts |
Schedule → run → narrative → review → PDF export |
04-real-time-streaming.ts |
SSE subscriptions and aggregation pipelines |
05-database-connections.ts |
Register DB, query, batch, cache, saved queries |
06-dashboard-management.ts |
Templates, dashboards, widgets |
07-ai-analysis.ts |
AI stats, NL sales queries, linear regression |
08-mcp-orchestration.ts |
Multi-agent query routing and goal execution |
09-async-module-config.ts |
forRootAsync with ConfigService |
src/
├── index.ts # Public API barrel export
├── fastbi.module.ts # NestJS DynamicModule (forRoot / forRootAsync)
├── fastbi.config.ts # FastBIConfig interface + DI token
├── common/types.ts # Shared types (PaginatedResponse, UUID, RFC3339)
├── auth/ # AuthService — token lifecycle
├── http/ # FastBIClient — Axios with auth interceptor
├── kpis/ # KPIsService + types
├── reports/
│ ├── schedules.service.ts # CRON schedule management
│ ├── runs.service.ts # Run state machine + review
│ ├── narratives.service.ts # AI narrative generation
│ ├── jobs.service.ts # Job queue monitoring
│ ├── knowledge-base.service.ts
│ ├── artifacts.service.ts # Excel / CSV / PDF export
│ └── reports.types.ts # All report-domain types
├── connections/ # External database integration
├── dashboards/ # Templates, dashboards, widgets
├── stream/ # SSE subscriptions + pipeline REST ops
├── analytics/ # Revenue insights, forecasting
├── ai/ # Statistical AI (customers, financials, KPIs, products)
├── mcp/ # Multi-agent coordinator
├── sales/ # NL sales queries + trend prediction
├── ml/ # Linear regression
└── accounting/ # Fiscal workflows + compliance calendar
No business logic — The SDK is a thin HTTP client. All computation runs in the FastBI Go API.
Auto-refresh tokens — AuthService caches the JWT and refreshes it transparently before the configurable margin (default: 60 s before the 10-minute expiry).
Organization scoping — All multi-tenant endpoints receive organization_name (never a raw UUID). The API resolves names to IDs internally.
SSE lifecycle management — Every subscribe* method returns a cleanup function. StreamService also implements OnModuleDestroy to close all open connections on shutdown.
Semantic identifiers in routes — KPIs are identified by code, schedules by name, dashboards by slug, connections by connection_name. No raw UUIDs in URLs.
Consistent error shape — All errors expose .statusCode and .details for programmatic handling.
MIT