Initial commit: SIC harness (backend, web, pi-adapter, configs, docs)

- pnpm monorepo: apps/api (Fastify + SQLite + SSE), apps/web (React+Vite), packages/shared, packages/pi-adapter
- Local auth (admin/webhook-runner roles) + Keycloak JWT ready
- Multi-session chat with reliable history (user persisted before LLM, assistant persisted after stream)
- Markdown knowledge base with /api/docs/search + /api/docs/:id
- YAML webhook catalog with backend-only execution, retry/backoff, audit (webhook_runs), and per-user rate limit
- Skills config (sre-on-call, blameless-postmortem, security-incident) injected into LLM system prompt
- LLM provider failover chain (config/models.yml fallback + LLM_FALLBACK_CHAIN override)
- Context-aware webhooks panel + backend id-mention safety net
- Per-message stats (time/duration/tokens/model), Markdown+GFM render, code & table copy/download buttons
- Vitest suite, end-to-end smoke test (scripts/smoke.mjs), per-session system prompt override
- /metrics Prometheus endpoint + /api/metrics JSON, request-id correlation
- dotenv with explicit repo-root path; envString/envNumber helpers (handles empty-string env)
- Runbooks + SOPs under knowledge/ in English; README, docs, and INDEX.md in English
This commit is contained in:
2026-06-29 16:20:53 +02:00
commit 62728b2200
89 changed files with 11992 additions and 0 deletions

View File

@@ -0,0 +1,76 @@
import type { AuthUser } from "@pi-chat/shared";
import type { FastifyRequest } from "fastify";
import { createRemoteJWKSet, jwtVerify } from "jose";
import { envString } from "../env.js";
type KeycloakClaims = {
sub?: string;
preferred_username?: string;
email?: string;
realm_access?: { roles?: string[] };
resource_access?: Record<string, { roles?: string[] }>;
};
const authMode = () => envString(process.env.AUTH_MODE, "local");
const oidcIssuer = () => envString(process.env.OIDC_ISSUER, "https://auth.rikrdo.com/realms/homelab");
const oidcAudience = () => envString(process.env.OIDC_AUDIENCE, "pi-chat");
let jwks: ReturnType<typeof createRemoteJWKSet> | undefined;
const getLocalUser = (): AuthUser => ({
id: "local-user",
username: "local-user",
roles: ["admin", "webhook-runner"],
});
const bearerTokenFrom = (request: FastifyRequest) => {
const header = request.headers.authorization;
if (!header?.startsWith("Bearer ")) {
throw new Error("auth_missing_bearer_token");
}
return header.slice("Bearer ".length).trim();
};
const rolesFromClaims = (claims: KeycloakClaims) => {
const audience = oidcAudience();
const realmRoles = claims.realm_access?.roles ?? [];
const clientRoles = claims.resource_access?.[audience]?.roles ?? [];
return [...new Set([...realmRoles, ...clientRoles])];
};
const getKeycloakUser = async (request: FastifyRequest): Promise<AuthUser> => {
const issuer = oidcIssuer();
const audience = oidcAudience();
jwks ??= createRemoteJWKSet(new URL(`${issuer}/protocol/openid-connect/certs`));
const { payload } = await jwtVerify(bearerTokenFrom(request), jwks, {
issuer,
audience,
});
const claims = payload as KeycloakClaims;
if (!claims.sub) {
throw new Error("auth_missing_subject");
}
return {
id: claims.sub,
username: claims.preferred_username,
email: claims.email,
roles: rolesFromClaims(claims),
};
};
export const getAuthUser = async (request: FastifyRequest): Promise<AuthUser> => {
if (authMode() === "local") {
return getLocalUser();
}
if (authMode() === "keycloak") {
return getKeycloakUser(request);
}
throw new Error(`auth_mode_not_supported:${authMode()}`);
};

View File

@@ -0,0 +1,6 @@
import type { FastifyInstance } from "fastify";
import { getAuthUser } from "./index.js";
export const registerAuthRoutes = async (app: FastifyInstance) => {
app.get("/api/me", async (request) => ({ user: await getAuthUser(request) }));
};

355
apps/api/src/chat/routes.ts Normal file
View File

@@ -0,0 +1,355 @@
import { createOpenAICompatiblePiAdapter } from "@pi-chat/pi-adapter";
import type { InternalDocReference, RecommendedAction } from "@pi-chat/shared";
import type { FastifyInstance, FastifyReply } from "fastify";
import { z } from "zod";
import { getAuthUser } from "../auth/index.js";
import type { AppDatabase } from "../db/database.js";
import { createDocsRepository, type KnowledgeSearchResult } from "../docs/repository.js";
import { findModelDefinition, getDefaultModelId, resolveFallbackChain, resolveModelApiKey } from "../models/config.js";
import { envNumber } from "../env.js";
import { chatRateLimiterFromEnv } from "../rate-limit.js";
import { getEnabledSkillPrompts } from "../skills/config.js";
import { createMessageRepository, createSessionRepository } from "../sessions/repository.js";
import { createWebhookAuditRepository } from "../webhooks/audit.js";
import { canUseWebhook, loadWebhookDefinitions } from "../webhooks/config.js";
const chatStreamBody = z.object({
sessionId: z.string().min(1),
message: z.string().trim().min(1).max(envNumber(process.env.CHAT_MESSAGE_MAX_CHARS, 8_000)),
model: z.string().trim().default(getDefaultModelId()),
});
const sendEvent = (reply: FastifyReply, event: unknown) => {
reply.raw.write(`data: ${JSON.stringify(event)}\n\n`);
};
const sendAnswerTokens = (reply: FastifyReply, answer: string) => {
for (const token of answer.match(/\S+\s*/g) ?? []) {
sendEvent(reply, { type: "token", token });
}
};
const toDocReference = (doc: KnowledgeSearchResult): InternalDocReference => ({
id: doc.id,
title: doc.title,
source: doc.source,
relevance: doc.relevance,
});
const shouldAutoTitle = (title: string | null) => !title || title === "New session";
const titleFromMessage = (message: string) => {
const normalized = message.replace(/\s+/g, " ").trim();
return normalized.length > 48 ? `${normalized.slice(0, 45)}...` : normalized;
};
// Match a webhook id against a chunk of text. Accepts the full id (with - or _)
// OR its first word (e.g. id "dns-flush" matches "dns" in "flush the dns cache").
// Both sides are normalized to lowercase and dashes/underscores → spaces.
const matchesId = (text: string, id: string): boolean => {
const normalized = text.toLowerCase().replace(/[-_]+/g, " ");
const normalizedId = id.toLowerCase().replace(/[-_]+/g, " ");
if (normalized.includes(normalizedId)) return true;
const firstWord = normalizedId.split(" ")[0] ?? "";
if (firstWord.length >= 3 && new RegExp(`\\b${firstWord}\\b`, "i").test(normalized)) {
return true;
}
return false;
};
const enrichActionsWithMentions = (
userMessage: string,
answer: string,
recommended: RecommendedAction[],
available: RecommendedAction[],
): RecommendedAction[] => {
const seen = new Set(recommended.map((action) => action.id));
const enriched = [...recommended];
for (const candidate of available) {
if (seen.has(candidate.id)) continue;
if (matchesId(userMessage, candidate.id) || matchesId(answer, candidate.id)) {
enriched.push({
...candidate,
// Lower confidence than LLM-recommended ones so the UI can show the
// difference if needed; still actionable.
confidence: 0.4,
reason: "Mentioned in the conversation",
});
seen.add(candidate.id);
}
}
return enriched;
};
export const registerChatRoutes = async (app: FastifyInstance, db: AppDatabase) => {
const sessions = createSessionRepository(db);
const messages = createMessageRepository(db);
const docs = createDocsRepository();
const audit = createWebhookAuditRepository(db);
const rateLimiter = chatRateLimiterFromEnv();
app.post("/api/chat/stream", async (request, reply) => {
const user = await getAuthUser(request);
const decision = rateLimiter.consume(user.id);
if (!decision.ok) {
const retryAfterSec = Math.max(1, Math.ceil(decision.retryAfterMs / 1000));
app.log.warn({ user: user.id, retryAfterSec }, "chat rate limit exceeded");
return reply
.code(429)
.header("retry-after", String(retryAfterSec))
.header("x-ratelimit-remaining", "0")
.send({
error: "rate_limited",
retry_after_ms: decision.retryAfterMs,
});
}
reply.header("x-ratelimit-remaining", String(decision.remaining));
const body = chatStreamBody.parse(request.body);
const session = sessions.get(user.id, body.sessionId);
const selectedModel = findModelDefinition(body.model);
if (!session) {
return reply.code(404).send({ error: "session_not_found" });
}
if (!selectedModel) {
return reply.code(400).send({ error: "model_not_found" });
}
const resolvedKey = resolveModelApiKey(selectedModel);
app.log.debug(
{
model: selectedModel.id,
keyLen: resolvedKey.length,
llmKeyLen: (process.env.LLM_API_KEY ?? "").length,
},
"llm api key resolved",
);
// Build the ordered fallback chain starting at the selected model. The
// adapter is built fresh per model because base URL, key and provider
// model name can differ across chain entries.
const chain = resolveFallbackChain(selectedModel.id);
const chainModels = chain
.map((id) => findModelDefinition(id))
.filter((m): m is NonNullable<ReturnType<typeof findModelDefinition>> => Boolean(m));
const adapters = chainModels.map((model) => ({
model,
pi: createOpenAICompatiblePiAdapter({
baseUrl: model.base_url || process.env.LLM_BASE_URL || "https://api.minimax.io/v1",
apiKey: resolveModelApiKey(model),
defaultModel: model.model,
maxTokens: model.max_tokens,
}),
}));
reply.raw.writeHead(200, {
"content-type": "text/event-stream; charset=utf-8",
"cache-control": "no-cache, no-transform",
connection: "keep-alive",
});
const userMessage = messages.create({
sessionId: body.sessionId,
userId: user.id,
role: "user",
content: body.message,
metadata: { model: selectedModel.id },
});
if (shouldAutoTitle(session.title)) {
sessions.updateTitle(user.id, body.sessionId, titleFromMessage(body.message));
}
sessions.touch(user.id, body.sessionId);
const docResults = await docs.search(body.message, 5);
const internalDocs = docResults.map(toDocReference);
// Soft usage signal: how often has this user run each webhook in the
// recent past. Surfaced as `usageHint` so the LLM can prefer frequently
// used webhooks when ambiguous, and so the UI can show a "Most used" tag.
const usageSinceDays = Math.max(0, envNumber(process.env.WEBHOOK_USAGE_WINDOW_DAYS, 7));
const usageSince = usageSinceDays > 0
? new Date(Date.now() - usageSinceDays * 86_400_000).toISOString()
: new Date(0).toISOString();
const usageMap = usageSinceDays > 0
? audit.usageForUserSince(usageSince, user.id)
: {};
const formatUsageHint = (webhookId: string): string | null => {
const stats = usageMap[webhookId];
if (!stats || stats.runs === 0) return null;
const successPct = Math.round(stats.successRate * 100);
return `${stats.runs} run${stats.runs === 1 ? "" : "s"} in last ${usageSinceDays}d, ${successPct}% success`;
};
const availableActions: RecommendedAction[] = loadWebhookDefinitions()
.filter((webhook) => canUseWebhook(user.roles, webhook))
.map((webhook) => {
const usageHint = formatUsageHint(webhook.id);
return {
type: "webhook" as const,
id: webhook.id,
confidence: 0,
reason: webhook.description ?? webhook.label,
requires_confirmation: webhook.confirmation_required,
...(usageHint ? { usageHint } : {}),
};
});
sendEvent(reply, { type: "docs", docs: docResults });
try {
const history = messages
.listForSession(user.id, body.sessionId)
.filter((message) => message.id !== userMessage.id)
.slice(-12)
.map((message) => ({ role: message.role, content: message.content }));
const t0 = Date.now();
// Walk the fallback chain. The first adapter that returns ok=true
// wins. If a structured error comes back from any one model we move
// to the next; an exception (network/5xx/timeout) also jumps chain.
let chat: Awaited<ReturnType<typeof adapters[number]["pi"]["chat"]>> | null = null;
let usedModelId = selectedModel.id;
let fallbackAttempts = 0;
const failures: Array<{ model: string; reason: string; kind?: string }> = [];
for (const entry of adapters) {
try {
const result = await entry.pi.chat({
message: body.message,
model: entry.model.model,
docs: internalDocs,
availableActions,
history,
skillPrompts: getEnabledSkillPrompts(),
systemPrompt: session.system_prompt,
});
if (result.ok) {
chat = result;
usedModelId = entry.model.id;
break;
}
// Structured error (no_content / json_parse / schema). Try next.
failures.push({
model: entry.model.id,
kind: result.error.kind,
reason: result.error.kind === "no_content"
? result.error.message
: result.error.reason,
});
fallbackAttempts += 1;
chat = result; // keep last error for the controlled fallback path
usedModelId = entry.model.id;
} catch (error) {
// Transport / timeout / 5xx — also fall through.
failures.push({
model: entry.model.id,
reason: error instanceof Error ? error.message : String(error),
});
fallbackAttempts += 1;
app.log.warn(
{ model: entry.model.id, err: error },
"llm call failed, trying next model in fallback chain",
);
}
}
const durationMs = Date.now() - t0;
if (!chat) {
throw new Error("all fallback models failed");
}
// The adapter may return ok=true (well-formed JSON) or ok=false with
// a structured error + safe fallback. In both cases the fallback
// contains a usable `answer` and (possibly empty) actions; we never
// throw on a parse/schema problem — those are operational signal, not
// request failures.
const result = chat.ok ? chat.result : chat.fallback;
if (!chat.ok) {
app.log.warn(
{
kind: chat.error.kind,
reason: chat.error.kind === "no_content" ? chat.error.message : chat.error.reason,
model: usedModelId,
},
"pi-adapter returned a parse/structured error; using safe fallback",
);
}
// Deterministic safety net: if the LLM forgot to put a relevant webhook
// in `recommended_actions` (common with short user prompts), scan both
// the user's input and the model's answer for any role-allowed webhook
// id (or its first word) and synthesize an action so the user can still
// execute it from the right panel.
const recommendedActions = enrichActionsWithMentions(
body.message,
result.answer,
result.recommended_actions,
availableActions,
);
sendAnswerTokens(reply, result.answer);
sendEvent(reply, { type: "actions", actions: recommendedActions });
const assistantMetadata: Record<string, unknown> = {
model: usedModelId,
docs: result.internal_docs,
actions: result.recommended_actions,
usage: { ...(chat.usage ?? {}), durationMs },
};
if (usedModelId !== selectedModel.id || fallbackAttempts > 0) {
assistantMetadata.requested_model = selectedModel.id;
assistantMetadata.fallback_attempts = fallbackAttempts;
assistantMetadata.fallback_chain = chain;
assistantMetadata.fallback_failures = failures;
}
if (!chat.ok) {
assistantMetadata.error_kind = chat.error.kind;
assistantMetadata.error_reason = chat.error.kind === "no_content"
? chat.error.message
: chat.error.reason;
assistantMetadata.fallback = true;
}
messages.create({
sessionId: body.sessionId,
userId: user.id,
role: "assistant",
content: result.answer,
metadata: assistantMetadata,
});
sessions.touch(user.id, body.sessionId);
sendEvent(reply, { type: "done" });
reply.raw.end();
} catch (error) {
const message = "I could not complete the model response. The user message has been saved.";
app.log.error(error);
messages.create({
sessionId: body.sessionId,
userId: user.id,
role: "assistant",
content: message,
metadata: {
model: selectedModel.id,
error: error instanceof Error ? error.message : "unknown_error",
},
});
sessions.touch(user.id, body.sessionId);
sendEvent(reply, { type: "token", token: message });
sendEvent(reply, { type: "error", error: "llm_error" });
sendEvent(reply, { type: "done" });
reply.raw.end();
}
return reply;
});
};

View File

@@ -0,0 +1,27 @@
import Database from "better-sqlite3";
import { mkdirSync } from "node:fs";
import { dirname, resolve } from "node:path";
export type AppDatabase = Database.Database;
function sqlitePathFromUrl(databaseUrl: string): string {
if (databaseUrl.startsWith("sqlite:///")) {
return databaseUrl.replace("sqlite://", "");
}
if (databaseUrl.startsWith("sqlite://")) {
return databaseUrl.replace("sqlite://", "");
}
return databaseUrl;
}
export function openDatabase(databaseUrl = process.env.DATABASE_URL ?? "sqlite://./data/pi-chat.db"): AppDatabase {
const filename = resolve(sqlitePathFromUrl(databaseUrl));
mkdirSync(dirname(filename), { recursive: true });
const db = new Database(filename);
db.pragma("journal_mode = WAL");
db.pragma("foreign_keys = ON");
return db;
}

View File

@@ -0,0 +1,62 @@
import type { AppDatabase } from "./database.js";
export function migrate(db: AppDatabase): void {
db.exec(`
CREATE TABLE IF NOT EXISTS chat_sessions (
id TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
title TEXT,
system_prompt TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_chat_sessions_user_updated
ON chat_sessions(user_id, updated_at DESC);
CREATE TABLE IF NOT EXISTS chat_messages (
id TEXT PRIMARY KEY,
session_id TEXT NOT NULL,
user_id TEXT NOT NULL,
role TEXT NOT NULL,
content TEXT NOT NULL,
metadata TEXT,
created_at TEXT NOT NULL,
FOREIGN KEY (session_id) REFERENCES chat_sessions(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_chat_messages_session_user_created
ON chat_messages(session_id, user_id, created_at ASC);
CREATE TABLE IF NOT EXISTS webhook_runs (
id TEXT PRIMARY KEY,
webhook_id TEXT NOT NULL,
user_id TEXT NOT NULL,
session_id TEXT NOT NULL,
status TEXT NOT NULL,
request_payload TEXT,
response_status INTEGER,
attempts INTEGER NOT NULL DEFAULT 1,
created_at TEXT NOT NULL,
FOREIGN KEY (session_id) REFERENCES chat_sessions(id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_webhook_runs_session_user_created
ON webhook_runs(session_id, user_id, created_at DESC);
`);
// Idempotent additive migrations for existing DBs.
const webhookRunColumns = db
.prepare("PRAGMA table_info(webhook_runs)")
.all() as Array<{ name: string }>;
if (!webhookRunColumns.some((column) => column.name === "attempts")) {
db.exec("ALTER TABLE webhook_runs ADD COLUMN attempts INTEGER NOT NULL DEFAULT 1");
}
const sessionColumns = db
.prepare("PRAGMA table_info(chat_sessions)")
.all() as Array<{ name: string }>;
if (!sessionColumns.some((column) => column.name === "system_prompt")) {
db.exec("ALTER TABLE chat_sessions ADD COLUMN system_prompt TEXT");
}
}

View File

@@ -0,0 +1,255 @@
import { readdirSync, readFileSync, statSync } from "node:fs";
import { relative, resolve } from "node:path";
import YAML from "yaml";
import { envString } from "../env.js";
import { loadRagConfig } from "../rag/config.js";
import { getViaRag, isRagRemote, searchViaRag } from "../rag/client.js";
export type KnowledgeDoc = {
id: string;
title: string;
source: string;
tags: string[];
owner?: string;
updated?: string;
headings: string[];
content: string;
};
export type KnowledgeSearchResult = Omit<KnowledgeDoc, "content"> & {
relevance: number;
excerpt: string;
};
const defaultKnowledgeDir = () => resolve(process.cwd(), "../../knowledge");
const normalizePathId = (source: string) =>
source.replace(/\.md$/i, "").split(/[\\/]/g).join(":");
const walkMarkdownFiles = (dir: string): string[] => {
const entries = readdirSync(dir, { withFileTypes: true });
return entries.flatMap((entry) => {
const fullPath = resolve(dir, entry.name);
if (entry.isDirectory()) {
return walkMarkdownFiles(fullPath);
}
if (entry.isFile() && entry.name.endsWith(".md")) {
return [fullPath];
}
return [];
});
};
const parseFrontmatter = (raw: string) => {
if (!raw.startsWith("---")) {
return { metadata: {}, body: raw };
}
const end = raw.indexOf("\n---", 3);
if (end === -1) {
return { metadata: {}, body: raw };
}
const frontmatter = raw.slice(3, end).trim();
const body = raw.slice(end + 4).trim();
const metadata = YAML.parse(frontmatter) ?? {};
return { metadata, body };
};
const extractHeadings = (body: string) =>
body
.split("\n")
.filter((line) => line.startsWith("#"))
.map((line) => line.replace(/^#+\s*/, "").trim())
.filter(Boolean);
const tokenize = (value: string) =>
value
.toLowerCase()
.split(/[^a-z0-9]+/i)
.map((token) => token.trim())
.filter((token) => token.length >= 2);
const scoreDoc = (doc: KnowledgeDoc, query: string) => {
const tokens = tokenize(query);
if (tokens.length === 0) {
return 0.1;
}
const title = doc.title.toLowerCase();
const tags = doc.tags.join(" ").toLowerCase();
const headings = doc.headings.join(" ").toLowerCase();
const source = doc.source.toLowerCase();
const content = doc.content.toLowerCase();
return tokens.reduce((score, token) => {
if (title.includes(token)) score += 5;
if (tags.includes(token)) score += 4;
if (headings.includes(token)) score += 3;
if (source.includes(token)) score += 2;
if (content.includes(token)) score += 1;
return score;
}, 0);
};
const excerptFor = (content: string, query: string) => {
const token = tokenize(query)[0];
const compact = content.replace(/\s+/g, " ").trim();
if (!token) return compact.slice(0, 220);
const index = compact.toLowerCase().indexOf(token);
if (index === -1) return compact.slice(0, 220);
return compact.slice(Math.max(0, index - 80), index + 140);
};
// RAG-driven tag filter: a doc matches if it has at least one of the
// includeTags (if any) and none of the excludeTags.
const passesTagFilter = (
doc: KnowledgeSearchResult,
includeTags: string[],
excludeTags: string[],
): boolean => {
if (includeTags.length > 0) {
const hasIncluded = doc.tags.some((tag) => includeTags.includes(tag));
if (!hasIncluded) return false;
}
if (excludeTags.length > 0) {
const hasExcluded = doc.tags.some((tag) => excludeTags.includes(tag));
if (hasExcluded) return false;
}
return true;
};
export const loadKnowledgeDocs = (knowledgeDir = envString(process.env.KNOWLEDGE_DIR, defaultKnowledgeDir())): KnowledgeDoc[] => {
const root = resolve(knowledgeDir);
try {
statSync(root);
} catch {
return [];
}
return walkMarkdownFiles(root).map((filePath) => {
const source = relative(root, filePath);
const raw = readFileSync(filePath, "utf8");
const { metadata, body } = parseFrontmatter(raw);
const data = metadata as Record<string, unknown>;
const headings = extractHeadings(body);
return {
id: normalizePathId(source),
title: typeof data.title === "string" ? data.title : headings[0] ?? source,
source,
tags: Array.isArray(data.tags) ? data.tags.map(String) : [],
owner: typeof data.owner === "string" ? data.owner : undefined,
updated: typeof data.updated === "string" ? data.updated : undefined,
headings,
content: body,
};
});
};
export const createDocsRepository = () => {
const rag = loadRagConfig();
const useRemote = isRagRemote(rag);
return {
async search(query: string, limit?: number): Promise<KnowledgeSearchResult[]> {
const effectiveLimit = Math.max(1, limit ?? rag.topK);
if (useRemote) {
try {
return await searchViaRag(rag, query, effectiveLimit);
} catch (error) {
if (!rag.fallbackToLocal) throw error;
}
}
// Local fallback: read from knowledge/ and apply token-overlap scoring.
const fullDocs = loadKnowledgeDocs().filter((doc) =>
passesTagFilter(
{
id: doc.id,
title: doc.title,
source: doc.source,
tags: doc.tags,
owner: doc.owner,
updated: doc.updated,
headings: doc.headings,
relevance: 0,
excerpt: "",
},
rag.includeTags,
rag.excludeTags,
),
);
const scored = fullDocs
.map((doc) => ({
...doc,
relevance: scoreDoc(doc, query),
excerpt: excerptFor(doc.content, query),
}))
.filter((doc) => doc.relevance >= rag.minRelevance && doc.relevance > 0);
return scored
.sort((a, b) => b.relevance - a.relevance || a.title.localeCompare(b.title))
.slice(0, effectiveLimit)
.map(({ content: _content, ...doc }) => doc);
},
async get(id: string): Promise<KnowledgeDoc | undefined> {
if (useRemote) {
try {
return await getViaRag(rag, id);
} catch (error) {
if (!rag.fallbackToLocal) throw error;
}
}
return loadKnowledgeDocs().find((doc) => doc.id === id);
},
async list(limit = 500): Promise<KnowledgeSearchResult[]> {
const candidates = (await getAllMetadataLocal())
.filter((doc) => passesTagFilter(doc, rag.includeTags, rag.excludeTags))
.slice(0, limit);
return candidates;
},
async count(): Promise<number> {
const candidates = await getAllMetadataLocal();
return candidates.filter((doc) =>
passesTagFilter(doc, rag.includeTags, rag.excludeTags),
).length;
},
};
};
const getAllMetadataLocal = async (): Promise<KnowledgeSearchResult[]> => {
const rag = loadRagConfig();
const useRemote = isRagRemote(rag);
if (useRemote) {
try {
return await searchViaRag(rag, "", 1000);
} catch (error) {
if (!rag.fallbackToLocal) throw error;
}
}
const docs = loadKnowledgeDocs();
return docs.map((doc) => ({
id: doc.id,
title: doc.title,
source: doc.source,
tags: doc.tags,
owner: doc.owner,
updated: doc.updated,
headings: doc.headings,
relevance: 0,
excerpt: "",
}));
};

View File

@@ -0,0 +1,41 @@
import type { FastifyInstance } from "fastify";
import { z } from "zod";
import { createDocsRepository } from "./repository.js";
const searchQuery = z.object({
q: z.string().trim().default(""),
limit: z.coerce.number().int().min(1).max(20).default(5),
});
const listQuery = z.object({
limit: z.coerce.number().int().min(1).max(1000).default(500),
});
export const registerDocsRoutes = async (app: FastifyInstance) => {
const docs = createDocsRepository();
app.get("/api/docs", async (request) => {
const query = listQuery.parse(request.query);
// Await explicitly so Fastify serializes a real array, not an
// unresolved Promise which would be `{}` in the response body.
const [items, total] = await Promise.all([docs.list(query.limit), docs.count()]);
return { items, total };
});
app.get("/api/docs/search", async (request) => {
const query = searchQuery.parse(request.query);
const items = await docs.search(query.q, query.limit);
return { items };
});
app.get("/api/docs/:id", async (request, reply) => {
const params = z.object({ id: z.string().min(1) }).parse(request.params);
const doc = docs.get(params.id);
if (!doc) {
return reply.code(404).send({ error: "doc_not_found" });
}
return doc;
});
};

20
apps/api/src/env.ts Normal file
View File

@@ -0,0 +1,20 @@
/**
* Small env helpers.
*
* `process.env.X ?? fallback` does NOT fall back on empty strings — only on
* undefined/null. That bites us when dotenv loads `KEY=` (blank value) from
* `.env`. Use `envString` / `envNumber` to get safe fallbacks.
*/
export const envString = (value: string | undefined | null, fallback: string): string => {
if (value === undefined || value === null) return fallback;
return value.length > 0 ? value : fallback;
};
export const envNumber = (value: string | undefined | null, fallback: number): number => {
if (value === undefined || value === null) return fallback;
const trimmed = value.trim();
if (trimmed.length === 0) return fallback;
const parsed = Number(trimmed);
return Number.isFinite(parsed) ? parsed : fallback;
};

106
apps/api/src/mcp/config.ts Normal file
View File

@@ -0,0 +1,106 @@
import { readFileSync } from "node:fs";
import { resolve } from "node:path";
import { parse } from "yaml";
import { envString } from "../env.js";
export type McpToolParameterSchema = {
type: "object";
required?: string[];
properties?: Record<string, unknown>;
};
export type McpToolDefinition = {
id: string;
name: string;
description: string;
server: string | null;
parameters: McpToolParameterSchema;
tags: string[];
enabled: boolean;
};
export type McpServerDefinition = {
id: string;
name: string;
description: string;
endpoint: string;
};
export type PublicMcpToolDefinition = Omit<McpToolDefinition, "server"> & {
server: string | null;
};
export type PublicMcpServerDefinition = McpServerDefinition;
type McpFile = {
mcp_servers?: McpServerDefinition[];
mcp_tools?: McpToolDefinition[];
};
const defaultPath = (): string =>
envString(process.env.MCP_CONFIG_PATH, resolve(process.cwd(), "../../config/mcp.yml"));
const isToolParameterSchema = (value: unknown): value is McpToolParameterSchema => {
if (!value || typeof value !== "object") return false;
const v = value as McpToolParameterSchema;
return v.type === "object";
};
export const loadMcpTools = (
configPath: string = defaultPath(),
): McpToolDefinition[] => {
let raw: string;
try {
raw = readFileSync(configPath, "utf8");
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code === "ENOENT") return [];
throw error;
}
const parsed = parse(raw) as McpFile | null;
if (!parsed || !Array.isArray(parsed.mcp_tools)) return [];
return parsed.mcp_tools
.filter((tool) => tool && typeof tool === "object" && typeof tool.id === "string")
.map((tool) => ({
id: String(tool.id).trim(),
name: String(tool.name ?? tool.id).trim(),
description: String(tool.description ?? "").trim(),
server: typeof tool.server === "string" ? tool.server : null,
parameters: isToolParameterSchema(tool.parameters)
? tool.parameters
: ({ type: "object", properties: {}, required: [] } satisfies McpToolParameterSchema),
tags: Array.isArray(tool.tags) ? tool.tags.map(String) : [],
enabled: tool.enabled !== false,
}))
.filter((tool) => tool.id.length > 0);
};
export const loadMcpServers = (
configPath: string = defaultPath(),
): McpServerDefinition[] => {
let raw: string;
try {
raw = readFileSync(configPath, "utf8");
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code === "ENOENT") return [];
throw error;
}
const parsed = parse(raw) as McpFile | null;
if (!parsed || !Array.isArray(parsed.mcp_servers)) return [];
return parsed.mcp_servers
.filter((s) => s && typeof s === "object" && typeof s.id === "string")
.map((s) => ({
id: String(s.id).trim(),
name: String(s.name ?? s.id).trim(),
description: String(s.description ?? "").trim(),
endpoint: String(s.endpoint ?? "").trim(),
}))
.filter((s) => s.id.length > 0);
};
export const enabledMcpTools = (tools: McpToolDefinition[] = loadMcpTools()): McpToolDefinition[] =>
tools.filter((tool) => tool.enabled);
export const toPublicMcpTool = (tool: McpToolDefinition): PublicMcpToolDefinition => ({ ...tool });
export const toPublicMcpServer = (server: McpServerDefinition): PublicMcpServerDefinition => ({ ...server });

View File

@@ -0,0 +1,19 @@
import type { FastifyInstance } from "fastify";
import {
enabledMcpTools,
loadMcpServers,
toPublicMcpServer,
toPublicMcpTool,
} from "./config.js";
export const registerMcpRoutes = async (app: FastifyInstance) => {
app.get("/api/mcp/tools", async () => {
const items = enabledMcpTools().map(toPublicMcpTool);
return { items };
});
app.get("/api/mcp/servers", async () => {
const items = loadMcpServers().map(toPublicMcpServer);
return { items };
});
};

178
apps/api/src/metrics.ts Normal file
View File

@@ -0,0 +1,178 @@
// Tiny in-process metrics. Thread-safe enough for a 5-user MVP — counters
// and sums are only ever incremented under a single-threaded Node event
// loop, no atomic ops required.
export type RouteMetric = {
route: string;
method: string;
status: number;
durationMs: number;
timestamp: number;
};
type Aggregate = {
count: number;
statusBuckets: Map<number, number>;
sumMs: number;
maxMs: number;
p95Slots: number[]; // simple streaming reservoir for a coarse p95
};
const RESERVOIR_SIZE = 200;
const createAggregate = (): Aggregate => ({
count: 0,
statusBuckets: new Map(),
sumMs: 0,
maxMs: 0,
p95Slots: [],
});
const metricsState = {
startedAt: Date.now(),
aggregates: new Map<string, Aggregate>(),
// Last N events for the /metrics JSON inspector. Bounded to avoid leaks.
recent: [] as RouteMetric[],
recentLimit: 50,
errorCounts: new Map<string, number>(),
};
const keyFor = (route: string, method: string) => `${method.toUpperCase()} ${route}`;
export const observeHttp = (metric: RouteMetric) => {
const key = keyFor(metric.route, metric.method);
let agg = metricsState.aggregates.get(key);
if (!agg) {
agg = createAggregate();
metricsState.aggregates.set(key, agg);
}
agg.count += 1;
agg.sumMs += metric.durationMs;
if (metric.durationMs > agg.maxMs) agg.maxMs = metric.durationMs;
const statusBucket = Math.floor(metric.status / 100) * 100;
agg.statusBuckets.set(statusBucket, (agg.statusBuckets.get(statusBucket) ?? 0) + 1);
if (agg.p95Slots.length < RESERVOIR_SIZE) {
agg.p95Slots.push(metric.durationMs);
} else {
// Cheap replacement: evict the current max so the reservoir tracks the slowest N.
let maxIdx = 0;
for (let i = 1; i < agg.p95Slots.length; i++) {
if (agg.p95Slots[i] > agg.p95Slots[maxIdx]) maxIdx = i;
}
if (metric.durationMs < agg.p95Slots[maxIdx]) {
agg.p95Slots[maxIdx] = metric.durationMs;
}
}
if (metric.status >= 500) {
metricsState.errorCounts.set(key, (metricsState.errorCounts.get(key) ?? 0) + 1);
}
metricsState.recent.push(metric);
if (metricsState.recent.length > metricsState.recentLimit) {
metricsState.recent.splice(0, metricsState.recent.length - metricsState.recentLimit);
}
};
const percentile = (sorted: number[], p: number): number => {
if (sorted.length === 0) return 0;
const idx = Math.min(sorted.length - 1, Math.floor((p / 100) * sorted.length));
return sorted[idx];
};
export const snapshotMetrics = () => {
const routes: Array<{
route: string;
method: string;
count: number;
avg_ms: number;
p95_ms: number;
max_ms: number;
status_buckets: Record<string, number>;
}> = [];
for (const [key, agg] of metricsState.aggregates.entries()) {
const [method, ...rest] = key.split(" ");
const route = rest.join(" ");
const sorted = [...agg.p95Slots].sort((a, b) => a - b);
routes.push({
route,
method,
count: agg.count,
avg_ms: agg.count === 0 ? 0 : Math.round(agg.sumMs / agg.count),
p95_ms: Math.round(percentile(sorted, 95)),
max_ms: agg.maxMs,
status_buckets: Object.fromEntries(
[...agg.statusBuckets.entries()].map(([k, v]) => [`${k}_${k + 99}`, v]),
),
});
}
return {
started_at: new Date(metricsState.startedAt).toISOString(),
uptime_seconds: Math.round((Date.now() - metricsState.startedAt) / 1000),
totals: {
requests: [...metricsState.aggregates.values()].reduce((sum, a) => sum + a.count, 0),
errors_5xx: [...metricsState.errorCounts.values()].reduce((sum, n) => sum + n, 0),
},
routes: routes.sort((a, b) => b.count - a.count),
recent: [...metricsState.recent].reverse(),
};
};
// Tiny Prometheus-style exposition. Stable enough for a scraper.
export const renderPrometheusText = (): string => {
const lines: string[] = [];
lines.push("# HELP sic_uptime_seconds Seconds since the API process started");
lines.push("# TYPE sic_uptime_seconds gauge");
lines.push(`sic_uptime_seconds ${Math.round((Date.now() - metricsState.startedAt) / 1000)}`);
lines.push("");
lines.push("# HELP sic_http_requests_total Total HTTP requests, labelled by route, method, status");
lines.push("# TYPE sic_http_requests_total counter");
for (const [key, agg] of metricsState.aggregates.entries()) {
const [method, ...rest] = key.split(" ");
const route = rest.join(" ");
for (const [bucket, count] of agg.statusBuckets.entries()) {
const statusClass = `${bucket}_${bucket + 99}`;
lines.push(
`sic_http_requests_total{route="${route}",method="${method}",status_class="${statusClass}"} ${count}`,
);
}
}
lines.push("");
lines.push("# HELP sic_http_request_duration_ms Request duration in ms");
lines.push("# TYPE sic_http_request_duration_ms summary");
for (const [key, agg] of metricsState.aggregates.entries()) {
const [method, ...rest] = key.split(" ");
const route = rest.join(" ");
const sorted = [...agg.p95Slots].sort((a, b) => a - b);
const avg = agg.count === 0 ? 0 : Math.round(agg.sumMs / agg.count);
lines.push(
`sic_http_request_duration_ms{route="${route}",method="${method}",quantile="0.95"} ${percentile(sorted, 95)}`,
);
lines.push(
`sic_http_request_duration_ms_sum{route="${route}",method="${method}"} ${agg.sumMs}`,
);
lines.push(
`sic_http_request_duration_ms_count{route="${route}",method="${method}"} ${agg.count}`,
);
lines.push(
`sic_http_request_duration_ms_max{route="${route}",method="${method}"} ${agg.maxMs}`,
);
lines.push(
`sic_http_request_duration_ms_avg{route="${route}",method="${method}"} ${avg}`,
);
}
return `${lines.join("\n")}\n`;
};
export const __resetMetricsForTests = () => {
metricsState.startedAt = Date.now();
metricsState.aggregates.clear();
metricsState.recent.length = 0;
metricsState.errorCounts.clear();
};

View File

@@ -0,0 +1,105 @@
import { readFileSync } from "node:fs";
import { resolve } from "node:path";
import { parse } from "yaml";
import { envString } from "../env.js";
export type ModelDefinition = {
id: string;
label: string;
provider: "openai-compatible";
base_url?: string;
api_key_env?: string;
model: string;
max_tokens?: number;
// Ordered list of model ids to try if this one fails (5xx, 429, network
// error, timeout). Each id must resolve to a known model; unknown ids are
// dropped at load time. Use ["mr-auto"] for a single fallback. The
// fallback chain for the chain itself is resolved at request time by
// `resolveFallbackChain`.
fallback?: string[];
};
export type PublicModelDefinition = Pick<ModelDefinition, "id" | "label" | "provider" | "max_tokens">;
// Resolve the API key for a model. Order of precedence:
// 1. Per-model env var (model.api_key_env) — useful when different providers
// use different keys (e.g. local proxy, dedicated self-hosted).
// 2. Global LLM_API_KEY / MINIMAX_API_KEY fallbacks shared by all models.
// 3. Literal "dummy" so OpenAI-compatible servers that don't require auth
// (e.g. local OLLAMA, self-hosted reverse proxy) still work out of the box.
export const resolveModelApiKey = (model: ModelDefinition): string => {
const fromModel = model.api_key_env ? process.env[model.api_key_env] : undefined;
if (fromModel && fromModel.trim().length > 0) return fromModel.trim();
return (
envString(process.env.LLM_API_KEY, envString(process.env.MINIMAX_API_KEY, "dummy"))
);
};
const expandEnv = (value: string | undefined) =>
value?.replace(/\$\{([A-Z0-9_]+)\}/g, (_match, key: string) => process.env[key] ?? "");
export const loadModelDefinitions = (): ModelDefinition[] => {
const configPath = envString(process.env.MODELS_CONFIG_PATH, resolve(process.cwd(), "../../config/models.yml"));
const parsed = parse(readFileSync(configPath, "utf8")) as { models?: ModelDefinition[] } | null;
const known = new Set((parsed?.models ?? []).map((model) => String(model.id ?? "").trim()));
return (parsed?.models ?? []).map((model) => {
const rawFallback = Array.isArray(model.fallback) ? model.fallback : [];
const fallback = rawFallback
.map((id) => String(id).trim())
.filter((id) => id.length > 0 && known.has(id) && id !== model.id);
return {
...model,
base_url: expandEnv(model.base_url),
fallback,
};
});
};
export const getDefaultModelId = () => envString(process.env.DEFAULT_MODEL, "fast");
export const findModelDefinition = (modelId: string) =>
loadModelDefinitions().find((model) => model.id === modelId);
/**
* Resolve the ordered fallback chain starting at `modelId`. Walks each model's
* `fallback` array until exhausted, dedupes by id, and stops if a cycle is
* detected. The starting model is always first. If the model is unknown the
* chain is just `[modelId]` (caller will surface model_not_found).
*/
export const resolveFallbackChain = (modelId: string): string[] => {
const all = loadModelDefinitions();
const byId = new Map(all.map((model) => [model.id, model]));
const chain: string[] = [];
const seen = new Set<string>();
// LLM_FALLBACK_CHAIN (comma-separated) overrides the YAML chain for the
// selected model. Empty / unset means "use the YAML chain".
const override = envString(process.env.LLM_FALLBACK_CHAIN, "")
.split(",")
.map((id) => id.trim())
.filter((id) => id.length > 0);
let cursor: string | undefined = modelId;
let nextCursor: string | undefined = override[0];
while (cursor && !seen.has(cursor)) {
seen.add(cursor);
chain.push(cursor);
const model = byId.get(cursor);
if (nextCursor !== undefined) {
cursor = nextCursor;
nextCursor = undefined;
continue;
}
if (!model || !model.fallback || model.fallback.length === 0) break;
cursor = model.fallback[0];
}
return chain;
};
export const toPublicModel = (model: ModelDefinition): PublicModelDefinition => ({
id: model.id,
label: model.label,
provider: model.provider,
max_tokens: model.max_tokens,
});

View File

@@ -0,0 +1,14 @@
import type { FastifyInstance } from "fastify";
import { getDefaultModelId, loadModelDefinitions, toPublicModel } from "./config.js";
export const registerModelRoutes = async (app: FastifyInstance) => {
app.get("/api/models", async () => {
const defaultModelId = getDefaultModelId();
const items = loadModelDefinitions().map(toPublicModel);
return {
default_model: items.some((model) => model.id === defaultModelId) ? defaultModelId : items[0]?.id,
items,
};
});
};

View File

@@ -0,0 +1,58 @@
import { readFileSync } from "node:fs";
import { resolve } from "node:path";
import { parse } from "yaml";
import { envString } from "../env.js";
export type N8nWorkflowDefinition = {
id: string;
label: string;
description: string;
url: string;
required_roles: string[];
tags: string[];
};
export type PublicN8nWorkflowDefinition = N8nWorkflowDefinition;
type N8nFile = { n8n_workflows?: N8nWorkflowDefinition[] };
const expandEnv = (value: string): string =>
value.replace(/\$\{([A-Z0-9_]+)(?::\?[^}]+)?\}/g, (_match, name: string) => process.env[name] ?? "");
const defaultPath = (): string =>
envString(process.env.N8N_CONFIG_PATH, resolve(process.cwd(), "../../config/n8n-workflows.yml"));
export const loadN8nWorkflows = (
configPath: string = defaultPath(),
): N8nWorkflowDefinition[] => {
let raw: string;
try {
raw = readFileSync(configPath, "utf8");
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code === "ENOENT") return [];
throw error;
}
const parsed = parse(raw) as N8nFile | null;
if (!parsed || !Array.isArray(parsed.n8n_workflows)) return [];
return parsed.n8n_workflows
.filter((wf) => wf && typeof wf === "object" && typeof wf.id === "string")
.map((wf) => ({
id: String(wf.id).trim(),
label: String(wf.label ?? wf.id).trim(),
description: String(wf.description ?? "").trim(),
url: expandEnv(String(wf.url ?? "").trim()),
required_roles: Array.isArray(wf.required_roles)
? wf.required_roles.map(String)
: [],
tags: Array.isArray(wf.tags) ? wf.tags.map(String) : [],
}))
.filter((wf) => wf.id.length > 0);
};
export const canUseN8nWorkflow = (userRoles: string[], wf: N8nWorkflowDefinition): boolean => {
if (!Array.isArray(wf.required_roles) || wf.required_roles.length === 0) return true;
return wf.required_roles.every((role) => userRoles.includes(role));
};
export const toPublicN8nWorkflow = (wf: N8nWorkflowDefinition): PublicN8nWorkflowDefinition => ({ ...wf });

View File

@@ -0,0 +1,13 @@
import type { FastifyInstance } from "fastify";
import { canUseN8nWorkflow, loadN8nWorkflows, toPublicN8nWorkflow } from "./config.js";
import { getAuthUser } from "../auth/index.js";
export const registerN8nRoutes = async (app: FastifyInstance) => {
app.get("/api/n8n-workflows", async (request) => {
const user = await getAuthUser(request);
const items = loadN8nWorkflows()
.filter((wf) => canUseN8nWorkflow(user.roles, wf))
.map(toPublicN8nWorkflow);
return { items };
});
};

105
apps/api/src/rag/client.ts Normal file
View File

@@ -0,0 +1,105 @@
import type { KnowledgeDoc, KnowledgeSearchResult } from "../docs/repository.js";
import type { RagConfig } from "./config.js";
export type RagSearchResponse = {
items: Array<{
id: string;
title?: string;
source?: string;
tags?: string[];
relevance?: number;
excerpt?: string;
content?: string;
}>;
};
export type RagGetResponse = Partial<KnowledgeDoc> & { id: string };
const ensureTrailing = (url: string) => url.replace(/\/$/, "");
export const isRagRemote = (config: RagConfig): boolean => config.endpoint.trim().length > 0;
const buildHeaders = (config: RagConfig): Record<string, string> => {
const headers: Record<string, string> = {
"content-type": "application/json",
accept: "application/json",
};
if (config.authToken) {
headers.authorization = `Bearer ${config.authToken}`;
}
return headers;
};
export const searchViaRag = async (
config: RagConfig,
query: string,
limit: number,
): Promise<KnowledgeSearchResult[]> => {
const url = `${ensureTrailing(config.endpoint)}/search`;
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), config.timeoutMs);
try {
const response = await fetch(url, {
method: "POST",
headers: buildHeaders(config),
body: JSON.stringify({
query,
limit,
min_relevance: config.minRelevance,
include_tags: config.includeTags,
exclude_tags: config.excludeTags,
}),
signal: controller.signal,
});
if (!response.ok) {
throw new Error(`rag_search_failed:${response.status}`);
}
const data = (await response.json()) as RagSearchResponse;
if (!data || !Array.isArray(data.items)) return [];
return data.items.map((item) => ({
id: String(item.id),
title: String(item.title ?? item.id),
source: String(item.source ?? ""),
tags: Array.isArray(item.tags) ? item.tags.map(String) : [],
relevance: Number(item.relevance ?? 0),
excerpt: String(item.excerpt ?? ""),
headings: [],
}));
} finally {
clearTimeout(timeout);
}
};
export const getViaRag = async (
config: RagConfig,
id: string,
): Promise<KnowledgeDoc | undefined> => {
const url = `${ensureTrailing(config.endpoint)}/docs/${encodeURIComponent(id)}`;
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), config.timeoutMs);
try {
const response = await fetch(url, {
method: "GET",
headers: buildHeaders(config),
signal: controller.signal,
});
if (response.status === 404) return undefined;
if (!response.ok) {
throw new Error(`rag_get_failed:${response.status}`);
}
const data = (await response.json()) as RagGetResponse;
if (!data || !data.id) return undefined;
return {
id: String(data.id),
title: String(data.title ?? data.id),
source: String(data.source ?? ""),
tags: Array.isArray(data.tags) ? data.tags.map(String) : [],
owner: typeof data.owner === "string" ? data.owner : undefined,
updated: typeof data.updated === "string" ? data.updated : undefined,
headings: Array.isArray(data.headings) ? data.headings.map(String) : [],
content: String(data.content ?? ""),
};
} finally {
clearTimeout(timeout);
}
};

View File

@@ -0,0 +1,80 @@
import { readFileSync } from "node:fs";
import { resolve } from "node:path";
import { parse } from "yaml";
import { envString } from "../env.js";
export type RagChunkStrategy = "heading" | "paragraph" | "fixed";
export type RagConfig = {
endpoint: string;
authToken: string;
timeoutMs: number;
fallbackToLocal: boolean;
chunkStrategy: RagChunkStrategy;
chunkSizeChars: number;
topK: number;
minRelevance: number;
includeTags: string[];
excludeTags: string[];
};
export type PublicRagConfig = Omit<RagConfig, "authToken"> & {
// Never expose the auth token over the public API; show only whether
// one is configured.
hasAuthToken: boolean;
};
const defaultConfig = (): RagConfig => ({
endpoint: "",
authToken: "",
timeoutMs: 10_000,
fallbackToLocal: true,
chunkStrategy: "heading",
chunkSizeChars: 1500,
topK: 5,
minRelevance: 0,
includeTags: [],
excludeTags: [],
});
const expandEnv = (value: string): string =>
value.replace(/\$\{([A-Z0-9_]+):?\}/g, (_m, name: string) => process.env[name] ?? "");
const normalize = (raw: unknown): RagConfig => {
if (!raw || typeof raw !== "object") return defaultConfig();
const r = raw as Partial<RagConfig>;
const chunkStrategy: RagChunkStrategy =
r.chunkStrategy === "paragraph" || r.chunkStrategy === "fixed" ? r.chunkStrategy : "heading";
return {
endpoint: expandEnv(String(r.endpoint ?? "").trim()),
authToken: expandEnv(String(r.authToken ?? "").trim()),
timeoutMs: Math.max(100, Number(r.timeoutMs ?? 10_000)),
fallbackToLocal: r.fallbackToLocal !== false,
chunkStrategy,
chunkSizeChars: Math.max(200, Number(r.chunkSizeChars ?? 1500)),
topK: Math.max(1, Number(r.topK ?? 5)),
minRelevance: Math.max(0, Math.min(1, Number(r.minRelevance ?? 0))),
includeTags: Array.isArray(r.includeTags) ? r.includeTags.map(String) : [],
excludeTags: Array.isArray(r.excludeTags) ? r.excludeTags.map(String) : [],
};
};
const defaultPath = (): string =>
envString(process.env.RAG_CONFIG_PATH, resolve(process.cwd(), "../../config/rag.yml"));
export const loadRagConfig = (configPath: string = defaultPath()): RagConfig => {
try {
const raw = readFileSync(configPath, "utf8");
const parsed = parse(raw) as { rag?: unknown } | null;
return normalize(parsed?.rag);
} catch (error) {
const code = (error as NodeJS.ErrnoException).code;
if (code === "ENOENT") return defaultConfig();
throw error;
}
};
export const toPublicRagConfig = (config: RagConfig): PublicRagConfig => {
const { authToken: _auth, ...rest } = config;
return { ...rest, hasAuthToken: Boolean(config.authToken) };
};

View File

@@ -0,0 +1,6 @@
import type { FastifyInstance } from "fastify";
import { loadRagConfig, toPublicRagConfig } from "./config.js";
export const registerRagRoutes = async (app: FastifyInstance) => {
app.get("/api/rag/config", async () => toPublicRagConfig(loadRagConfig()));
};

View File

@@ -0,0 +1,88 @@
// In-memory token bucket rate limiter keyed by an arbitrary id (user id).
//
// Token bucket semantics:
// - Capacity = `burst` (max tokens the bucket can hold).
// - Refill = `perMinute / 60` tokens per second (lazy: tokens are added
// on `consume` based on elapsed time since the bucket was
// last touched).
// - Each accepted call consumes exactly one token. Calls that find the
// bucket empty are rejected and the caller gets back a Retry-After hint
// in milliseconds.
//
// Stale entries: the map only grows by the number of distinct ids seen.
// For a 5-user MVP this is bounded; for larger installs the caller can call
// `pruneStale(maxIdleMs)` periodically.
import { envNumber } from "./env.js";
export type RateLimiterOptions = {
perMinute: number;
burst: number;
};
export type ConsumeResult =
| { ok: true; remaining: number }
| { ok: false; retryAfterMs: number };
export type RateLimiter = {
consume(id: string, now?: number): ConsumeResult;
size: () => number;
reset: (id?: string) => void;
};
export const createRateLimiter = (
options: RateLimiterOptions,
): RateLimiter => {
const { perMinute, burst } = options;
const refillPerMs = perMinute / 60_000;
const buckets = new Map<string, { tokens: number; lastRefillMs: number }>();
const consume = (id: string, now: number = Date.now()): ConsumeResult => {
let bucket = buckets.get(id);
if (!bucket) {
bucket = { tokens: burst, lastRefillMs: now };
buckets.set(id, bucket);
} else {
const elapsed = Math.max(0, now - bucket.lastRefillMs);
const refilled = elapsed * refillPerMs;
if (refilled > 0) {
bucket.tokens = Math.min(burst, bucket.tokens + refilled);
bucket.lastRefillMs = now;
}
}
if (bucket.tokens >= 1) {
bucket.tokens -= 1;
return { ok: true, remaining: Math.floor(bucket.tokens) };
}
// Time until the bucket has at least 1 token.
const needed = 1 - bucket.tokens;
const retryAfterMs = refillPerMs > 0 ? Math.ceil(needed / refillPerMs) : 60_000;
return { ok: false, retryAfterMs };
};
return {
consume,
size: () => buckets.size,
reset: (id) => {
if (id === undefined) buckets.clear();
else buckets.delete(id);
},
};
};
export const chatRateLimiterFromEnv = (): RateLimiter => {
const perMinute = Math.max(1, envNumber(process.env.CHAT_RATE_LIMIT_PER_MINUTE, 20));
const burst = Math.max(1, envNumber(process.env.CHAT_RATE_LIMIT_BURST, 5));
return createRateLimiter({ perMinute, burst });
};
export const webhookRateLimiterFromEnv = (): RateLimiter => {
// Defaults: 60/min refill, burst 10. Generous on purpose — the goal is
// to stop a runaway loop, not throttle a real operator. Tighten per
// webhook_id in the future if a specific hook becomes a hotspot.
const perMinute = Math.max(1, envNumber(process.env.WEBHOOK_RATE_LIMIT_PER_MINUTE, 60));
const burst = Math.max(1, envNumber(process.env.WEBHOOK_RATE_LIMIT_BURST, 10));
return createRateLimiter({ perMinute, burst });
};

170
apps/api/src/server.ts Normal file
View File

@@ -0,0 +1,170 @@
import Fastify from "fastify";
import cors from "@fastify/cors";
import { ZodError } from "zod";
import { config as loadDotenv } from "dotenv";
import { dirname, resolve } from "node:path";
import { fileURLToPath } from "node:url";
// Load `.env` from the repo root regardless of the cwd the process was
// started from. `import "dotenv/config"` would only look in `process.cwd()`,
// which silently breaks when the API is started from a sub-directory.
const __dirnameApi = dirname(fileURLToPath(import.meta.url));
loadDotenv({ path: resolve(__dirnameApi, "../../../.env") });
import { registerAuthRoutes } from "./auth/routes.js";
import { registerChatRoutes } from "./chat/routes.js";
import { openDatabase } from "./db/database.js";
import { migrate } from "./db/migrate.js";
import { registerDocsRoutes } from "./docs/routes.js";
import { envNumber } from "./env.js";
import { registerMcpRoutes } from "./mcp/routes.js";
import { registerModelRoutes } from "./models/routes.js";
import { registerN8nRoutes } from "./n8n/routes.js";
import { registerRagRoutes } from "./rag/routes.js";
import { registerSessionRoutes } from "./sessions/routes.js";
import { registerSkillRoutes } from "./skills/routes.js";
import { runWebhookAuditPurge, webhookAuditPurgeConfigFromEnv } from "./webhooks/audit.js";
import { registerWebhookRoutes } from "./webhooks/routes.js";
import { observeHttp, renderPrometheusText, snapshotMetrics } from "./metrics.js";
const port = envNumber(process.env.API_PORT, 8787);
const bodyLimit = envNumber(process.env.API_BODY_LIMIT_BYTES, 1_048_576);
const corsOrigin = () => {
const configured = process.env.CORS_ALLOWED_ORIGINS;
if (!configured) return true;
const origins = configured
.split(",")
.map((origin) => origin.trim())
.filter(Boolean);
if (origins.includes("*")) return true;
return origins;
};
const app = Fastify({
logger: true,
bodyLimit,
});
const db = openDatabase();
migrate(db);
await app.register(cors, {
origin: corsOrigin(),
});
// Observability: track every request by route + method + status with duration.
// The route template (e.g. "/api/sessions/:id") is preferred over the raw URL
// so `/api/sessions/abc` and `/api/sessions/def` aggregate into the same bucket.
app.addHook("onResponse", async (request, reply) => {
const route = request.routeOptions?.url ?? request.url ?? "unknown";
observeHttp({
route,
method: request.method,
status: reply.statusCode,
durationMs: Math.round(performance.now() - (request as { sic_startedAt?: number }).sic_startedAt!),
timestamp: Date.now(),
});
});
app.addHook("onRequest", async (request) => {
(request as { sic_startedAt?: number }).sic_startedAt = performance.now();
});
app.addHook("onSend", async (_request, reply, payload) => {
reply.header("x-content-type-options", "nosniff");
reply.header("referrer-policy", "no-referrer");
reply.header("x-frame-options", "DENY");
return payload;
});
app.setErrorHandler((error, _request, reply) => {
const message = error instanceof Error ? error.message : String(error);
if (error instanceof ZodError) {
return reply.code(400).send({
error: "validation_error",
issues: error.issues,
});
}
if (
message.startsWith("auth_") ||
message.startsWith("JWT") ||
message.startsWith("JWKS")
) {
return reply.code(401).send({ error: "unauthorized" });
}
app.log.error(error);
return reply.code(500).send({ error: "internal_error" });
});
app.get("/healthz", async () => ({ status: "ok" }));
app.get("/readyz", async () => {
db.prepare("SELECT 1").get();
return { status: "ready" };
});
app.get("/api/version", async () => ({
name: "pi-chat-api",
version: "0.1.0",
}));
// Observability surface. `/metrics` returns Prometheus text (scraper-friendly);
// `/api/metrics` returns the same data as JSON for humans and the smoke test.
app.get("/metrics", async (_request, reply) => {
reply.header("content-type", "text/plain; version=0.0.4; charset=utf-8");
return renderPrometheusText();
});
app.get("/api/metrics", async () => snapshotMetrics());
await registerSessionRoutes(app, db);
await registerAuthRoutes(app);
await registerDocsRoutes(app);
await registerModelRoutes(app);
await registerRagRoutes(app);
await registerSkillRoutes(app);
await registerN8nRoutes(app);
await registerMcpRoutes(app);
await registerWebhookRoutes(app, db);
await registerChatRoutes(app, db);
// Audit retention: run once on boot, then on a timer. Cheap, idempotent.
const auditPurgeConfig = webhookAuditPurgeConfigFromEnv();
const initialPurge = runWebhookAuditPurge(db, auditPurgeConfig);
if (initialPurge.deletedByAge > 0 || initialPurge.deletedByCap > 0) {
app.log.info(
{ ...initialPurge, config: auditPurgeConfig },
"webhook audit purge (boot)",
);
}
const auditPurgeIntervalMs = Math.max(60_000, envNumber(process.env.WEBHOOK_AUDIT_PURGE_INTERVAL_MS, 3_600_000));
const auditPurgeTimer = setInterval(() => {
try {
const report = runWebhookAuditPurge(db, auditPurgeConfig);
if (report.deletedByAge > 0 || report.deletedByCap > 0) {
app.log.info({ ...report, config: auditPurgeConfig }, "webhook audit purge (timer)");
}
} catch (error) {
app.log.error({ err: error }, "webhook audit purge failed");
}
}, auditPurgeIntervalMs);
auditPurgeTimer.unref?.();
const shutdown = async () => {
app.log.info("shutdown requested");
clearInterval(auditPurgeTimer);
await app.close();
db.close();
};
process.on("SIGINT", shutdown);
process.on("SIGTERM", shutdown);
await app.listen({ port, host: "0.0.0.0" });

View File

@@ -0,0 +1,140 @@
import { randomUUID } from "node:crypto";
import type { AppDatabase } from "../db/database.js";
export type ChatSessionRecord = {
id: string;
user_id: string;
title: string | null;
system_prompt: string | null;
created_at: string;
updated_at: string;
};
export type ChatMessageRecord = {
id: string;
session_id: string;
user_id: string;
role: "user" | "assistant" | "system" | "tool";
content: string;
metadata: string | null;
created_at: string;
};
export function createSessionRepository(db: AppDatabase) {
return {
list(userId: string): ChatSessionRecord[] {
return db
.prepare("SELECT * FROM chat_sessions WHERE user_id = ? ORDER BY updated_at DESC")
.all(userId) as ChatSessionRecord[];
},
create(userId: string, title: string | null): ChatSessionRecord {
const now = new Date().toISOString();
const session: ChatSessionRecord = {
id: randomUUID(),
user_id: userId,
title,
system_prompt: null,
created_at: now,
updated_at: now,
};
db.prepare(
"INSERT INTO chat_sessions (id, user_id, title, created_at, updated_at) VALUES (?, ?, ?, ?, ?)",
).run(session.id, session.user_id, session.title, session.created_at, session.updated_at);
return session;
},
get(userId: string, sessionId: string): ChatSessionRecord | null {
return (
(db.prepare("SELECT * FROM chat_sessions WHERE id = ? AND user_id = ?").get(sessionId, userId) as
| ChatSessionRecord
| undefined) ?? null
);
},
delete(userId: string, sessionId: string): boolean {
const result = db.prepare("DELETE FROM chat_sessions WHERE id = ? AND user_id = ?").run(sessionId, userId);
return result.changes > 0;
},
deleteAllForUser(userId: string): number {
const result = db
.prepare("DELETE FROM chat_sessions WHERE user_id = ?")
.run(userId);
return Number(result.changes ?? 0);
},
touch(userId: string, sessionId: string): void {
db.prepare("UPDATE chat_sessions SET updated_at = ? WHERE id = ? AND user_id = ?").run(
new Date().toISOString(),
sessionId,
userId,
);
},
updateTitle(userId: string, sessionId: string, title: string): void {
db.prepare("UPDATE chat_sessions SET title = ?, updated_at = ? WHERE id = ? AND user_id = ?").run(
title,
new Date().toISOString(),
sessionId,
userId,
);
},
updateSystemPrompt(userId: string, sessionId: string, prompt: string | null): boolean {
const normalized = prompt && prompt.trim().length > 0 ? prompt.trim() : null;
const result = db
.prepare(
"UPDATE chat_sessions SET system_prompt = ?, updated_at = ? WHERE id = ? AND user_id = ?",
)
.run(normalized, new Date().toISOString(), sessionId, userId);
return result.changes > 0;
},
};
}
export function createMessageRepository(db: AppDatabase) {
return {
listForSession(userId: string, sessionId: string): ChatMessageRecord[] {
return db
.prepare("SELECT * FROM chat_messages WHERE session_id = ? AND user_id = ? ORDER BY created_at ASC")
.all(sessionId, userId) as ChatMessageRecord[];
},
create(input: {
sessionId: string;
userId: string;
role: ChatMessageRecord["role"];
content: string;
metadata?: unknown;
}): ChatMessageRecord {
const message: ChatMessageRecord = {
id: randomUUID(),
session_id: input.sessionId,
user_id: input.userId,
role: input.role,
content: input.content,
metadata: input.metadata ? JSON.stringify(input.metadata) : null,
created_at: new Date().toISOString(),
};
db.prepare(
`INSERT INTO chat_messages
(id, session_id, user_id, role, content, metadata, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?)`,
).run(
message.id,
message.session_id,
message.user_id,
message.role,
message.content,
message.metadata,
message.created_at,
);
return message;
},
};
}

View File

@@ -0,0 +1,172 @@
import type { FastifyInstance } from "fastify";
import { z } from "zod";
import { getAuthUser } from "../auth/index.js";
import type { AppDatabase } from "../db/database.js";
import { createMessageRepository, createSessionRepository } from "./repository.js";
const createSessionBody = z.object({
title: z.string().min(1).max(120).optional(),
});
const updateSessionBody = z.object({
title: z.string().trim().min(1).max(120),
});
const updateSystemPromptBody = z.object({
// Empty / whitespace-only strings clear the override; null is a no-op.
system_prompt: z.string().max(8_000).nullable().optional(),
});
export async function registerSessionRoutes(app: FastifyInstance, db: AppDatabase) {
const sessions = createSessionRepository(db);
const messages = createMessageRepository(db);
app.get("/api/sessions", async (request) => {
const user = await getAuthUser(request);
return { items: sessions.list(user.id) };
});
app.post("/api/sessions", async (request, reply) => {
const user = await getAuthUser(request);
const body = createSessionBody.parse(request.body ?? {});
const session = sessions.create(user.id, body.title ?? null);
return reply.code(201).send(session);
});
app.get<{ Params: { id: string } }>("/api/sessions/:id", async (request, reply) => {
const user = await getAuthUser(request);
const session = sessions.get(user.id, request.params.id);
if (!session) {
return reply.code(404).send({ error: "session_not_found" });
}
return {
...session,
messages: messages.listForSession(user.id, session.id),
};
});
app.patch<{ Params: { id: string } }>("/api/sessions/:id", async (request, reply) => {
const user = await getAuthUser(request);
const body = updateSessionBody.parse(request.body);
const session = sessions.get(user.id, request.params.id);
if (!session) {
return reply.code(404).send({ error: "session_not_found" });
}
sessions.updateTitle(user.id, session.id, body.title);
return sessions.get(user.id, session.id);
});
// Per-session system prompt override. Inserted into the chat stream
// immediately after the base identity prompt, before the docs/actions
// context. Use to attach incident-specific context (runbook link, on-call
// names, severity matrix) without polluting the global prompt.
app.patch<{ Params: { id: string } }>(
"/api/sessions/:id/system-prompt",
async (request, reply) => {
const user = await getAuthUser(request);
const body = updateSystemPromptBody.parse(request.body ?? {});
const session = sessions.get(user.id, request.params.id);
if (!session) {
return reply.code(404).send({ error: "session_not_found" });
}
sessions.updateSystemPrompt(user.id, session.id, body.system_prompt ?? null);
return sessions.get(user.id, session.id);
},
);
app.delete<{ Params: { id: string } }>("/api/sessions/:id", async (request, reply) => {
const user = await getAuthUser(request);
const deleted = sessions.delete(user.id, request.params.id);
if (!deleted) {
return reply.code(404).send({ error: "session_not_found" });
}
return reply.code(204).send();
});
// Bulk delete: wipes every session owned by the current user. Cascade
// removes the messages and webhook_runs that point at them. The frontend
// requires the user to type the literal word "delete" before this fires.
app.delete("/api/sessions", async (request, reply) => {
const user = await getAuthUser(request);
const removed = sessions.deleteAllForUser(user.id);
return reply.code(200).send({ deleted: removed });
});
// Export: returns a JSON document with the session metadata and all its
// messages. The shape is a stable contract so a `POST /api/sessions/import`
// can read it back. webhook_runs are intentionally excluded from the
// export — those are operational audit data, not conversation content.
app.get<{ Params: { id: string } }>("/api/sessions/:id/export", async (request, reply) => {
const user = await getAuthUser(request);
const session = sessions.get(user.id, request.params.id);
if (!session) {
return reply.code(404).send({ error: "session_not_found" });
}
return {
version: 1,
exported_at: new Date().toISOString(),
session: {
id: session.id,
title: session.title,
created_at: session.created_at,
updated_at: session.updated_at,
},
messages: messages.listForSession(user.id, session.id),
};
});
// Import: accepts the export document above, creates a new session owned
// by the caller, and writes the messages with fresh ids. Returns the new
// session id and a count of imported messages.
const importSessionBody = z.object({
session: z.object({
title: z.string().max(120).nullable().optional(),
created_at: z.string().optional(),
updated_at: z.string().optional(),
}),
messages: z.array(
z.object({
role: z.enum(["user", "assistant", "system"]),
content: z.string().min(1).max(50_000),
metadata: z.record(z.unknown()).optional(),
// Original created_at is preserved if present; otherwise "now" is
// used. Used only to restore the timeline.
created_at: z.string().optional(),
}),
),
});
app.post("/api/sessions/import", async (request, reply) => {
const user = await getAuthUser(request);
const body = importSessionBody.parse(request.body);
const newSession = sessions.create(user.id, body.session.title ?? null);
let imported = 0;
for (const message of body.messages) {
messages.create({
sessionId: newSession.id,
userId: user.id,
role: message.role,
content: message.content,
metadata: message.metadata,
});
imported += 1;
}
sessions.touch(user.id, newSession.id);
return reply.code(201).send({
session: newSession,
imported_messages: imported,
});
});
}

View File

@@ -0,0 +1,57 @@
import { readFileSync } from "node:fs";
import { resolve } from "node:path";
import { parse } from "yaml";
import { envString } from "../env.js";
export type SkillDefinition = {
id: string;
name: string;
description: string;
enabled: boolean;
prompt: string;
};
export type PublicSkillDefinition = Omit<SkillDefinition, "prompt">;
type SkillsFile = { skills?: SkillDefinition[] };
const defaultPath = (): string => {
// When the API is started from apps/api, the config dir is at ../../config.
// The env var wins so tests / docker setups can override.
return envString(process.env.SKILLS_CONFIG_PATH, resolve(process.cwd(), "../../config/skills.yml"));
};
export const loadSkillDefinitions = (
configPath: string = defaultPath(),
): SkillDefinition[] => {
try {
const raw = readFileSync(configPath, "utf8");
const parsed = parse(raw) as SkillsFile;
if (!parsed || !Array.isArray(parsed.skills)) return [];
return parsed.skills
.filter((skill) => skill && typeof skill === "object")
.map((skill) => ({
id: String(skill.id ?? "").trim(),
name: String(skill.name ?? "").trim(),
description: String(skill.description ?? "").trim(),
enabled: Boolean(skill.enabled),
prompt: String(skill.prompt ?? "").trim(),
}))
.filter((skill) => skill.id.length > 0);
} catch (error) {
// Config is optional: missing file is fine, malformed file should surface.
const code = (error as NodeJS.ErrnoException).code;
if (code === "ENOENT") return [];
throw error;
}
};
export const getEnabledSkillPrompts = (skills: SkillDefinition[] = loadSkillDefinitions()): string[] =>
skills.filter((skill) => skill.enabled && skill.prompt.length > 0).map((skill) => skill.prompt);
export const toPublicSkill = (skill: SkillDefinition): PublicSkillDefinition => ({
id: skill.id,
name: skill.name,
description: skill.description,
enabled: skill.enabled,
});

View File

@@ -0,0 +1,9 @@
import type { FastifyInstance } from "fastify";
import { loadSkillDefinitions, toPublicSkill } from "./config.js";
export const registerSkillRoutes = async (app: FastifyInstance) => {
app.get("/api/skills", async () => {
const items = loadSkillDefinitions().map(toPublicSkill);
return { items };
});
};

View File

@@ -0,0 +1,162 @@
import { randomUUID } from "node:crypto";
import type { AppDatabase } from "../db/database.js";
import { envNumber } from "../env.js";
export type WebhookRunStatus = "success" | "error";
export type WebhookRunRecord = {
id: string;
webhook_id: string;
user_id: string;
session_id: string;
status: WebhookRunStatus;
request_payload: string | null;
response_status: number | null;
attempts: number;
created_at: string;
};
export const createWebhookAuditRepository = (db: AppDatabase) => ({
create(input: {
webhookId: string;
userId: string;
sessionId: string;
status: WebhookRunStatus;
requestPayload?: unknown;
responseStatus?: number | null;
attempts?: number;
createdAt?: string;
}) {
const run = {
id: randomUUID(),
webhook_id: input.webhookId,
user_id: input.userId,
session_id: input.sessionId,
status: input.status,
request_payload: input.requestPayload ? JSON.stringify(input.requestPayload) : null,
response_status: input.responseStatus ?? null,
attempts: input.attempts ?? 1,
created_at: input.createdAt ?? new Date().toISOString(),
};
db.prepare(
`INSERT INTO webhook_runs (id, webhook_id, user_id, session_id, status, request_payload, response_status, attempts, created_at)
VALUES (@id, @webhook_id, @user_id, @session_id, @status, @request_payload, @response_status, @attempts, @created_at)`,
).run(run);
return run;
},
listForSession(userId: string, sessionId: string, limit = 20) {
return db
.prepare(
`SELECT id, webhook_id, user_id, session_id, status, request_payload, response_status, attempts, created_at
FROM webhook_runs
WHERE session_id = ? AND user_id = ?
ORDER BY created_at DESC
LIMIT ?`,
)
.all(sessionId, userId, limit) as WebhookRunRecord[];
},
purgeOlderThan(isoCutoff: string): number {
const result = db
.prepare(`DELETE FROM webhook_runs WHERE created_at < ?`)
.run(isoCutoff);
return Number(result.changes ?? 0);
},
enforcePerUserCap(maxPerUser: number): number {
if (maxPerUser <= 0) return 0;
// Uses SQLite window function (3.25+) to keep the most recent N rows
// per user and delete the rest. user_id is always present in the table.
const result = db
.prepare(
`DELETE FROM webhook_runs
WHERE rowid IN (
SELECT rowid FROM (
SELECT rowid,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY created_at DESC, rowid DESC) AS rn
FROM webhook_runs
)
WHERE rn > ?
)`,
)
.run(maxPerUser);
return Number(result.changes ?? 0);
},
/**
* Per-webhook usage stats for a single user over a recent time window.
* `isoSince` should be a UTC ISO string (e.g. now - 7 days).
* Returns a map of webhook_id -> { runs, successes, successRate }.
*/
usageForUserSince(isoSince: string, userId: string): Record<
string,
{ runs: number; successes: number; successRate: number }
> {
const rows = db
.prepare(
`SELECT webhook_id,
COUNT(*) AS runs,
SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS successes
FROM webhook_runs
WHERE user_id = ? AND created_at >= ?
GROUP BY webhook_id`,
)
.all(userId, isoSince) as Array<{
webhook_id: string;
runs: number;
successes: number | null;
}>;
const out: Record<string, { runs: number; successes: number; successRate: number }> = {};
for (const row of rows) {
const runs = Number(row.runs ?? 0);
const successes = Number(row.successes ?? 0);
out[row.webhook_id] = {
runs,
successes,
successRate: runs > 0 ? successes / runs : 0,
};
}
return out;
},
});
export type WebhookAuditPurgeConfig = {
retentionDays: number;
maxPerUser: number;
};
export const webhookAuditPurgeConfigFromEnv = (): WebhookAuditPurgeConfig => {
const retentionDays = Math.max(0, envNumber(process.env.WEBHOOK_RUNS_RETENTION_DAYS, 30));
const maxPerUser = Math.max(0, envNumber(process.env.WEBHOOK_RUNS_MAX_PER_USER, 1000));
return { retentionDays, maxPerUser };
};
export type WebhookAuditPurgeReport = {
deletedByAge: number;
deletedByCap: number;
cutoff: string | null;
};
/**
* Run both purge passes (age + per-user cap) against the audit table.
* Returns a small report for logging / health endpoints. Safe to call on
* every boot and on a timer.
*/
export const runWebhookAuditPurge = (
db: AppDatabase,
config: WebhookAuditPurgeConfig = webhookAuditPurgeConfigFromEnv(),
): WebhookAuditPurgeReport => {
const audit = createWebhookAuditRepository(db);
let deletedByAge = 0;
let cutoff: string | null = null;
if (config.retentionDays > 0) {
cutoff = new Date(Date.now() - config.retentionDays * 86_400_000).toISOString();
deletedByAge = audit.purgeOlderThan(cutoff);
}
const deletedByCap = config.maxPerUser > 0 ? audit.enforcePerUserCap(config.maxPerUser) : 0;
return { deletedByAge, deletedByCap, cutoff };
};

View File

@@ -0,0 +1,53 @@
import { readFileSync } from "node:fs";
import { resolve } from "node:path";
import YAML from "yaml";
import { envString } from "../env.js";
export type WebhookMethod = "GET" | "POST" | "PUT" | "PATCH" | "DELETE";
export type WebhookDefinition = {
id: string;
label: string;
description?: string;
method: WebhookMethod;
url: string;
required_roles: string[];
confirmation_required: boolean;
payload_template?: unknown;
};
export type PublicWebhookDefinition = Omit<WebhookDefinition, "url" | "payload_template">;
const defaultConfigPath = () => resolve(process.cwd(), "../../config/webhooks.yml");
const expandEnv = (value: string) =>
value.replace(/\$\{([A-Z0-9_]+)\}/g, (_match, name: string) => process.env[name] ?? "");
export const loadWebhookDefinitions = (
configPath = process.env.WEBHOOKS_CONFIG_PATH && process.env.WEBHOOKS_CONFIG_PATH.length > 0
? process.env.WEBHOOKS_CONFIG_PATH
: defaultConfigPath(),
): WebhookDefinition[] => {
const raw = readFileSync(configPath, "utf8");
const parsed = YAML.parse(raw) as { webhooks?: WebhookDefinition[] } | undefined;
return (parsed?.webhooks ?? []).map((webhook) => ({
...webhook,
method: webhook.method.toUpperCase() as WebhookMethod,
url: expandEnv(webhook.url),
required_roles: webhook.required_roles ?? [],
confirmation_required: webhook.confirmation_required ?? true,
}));
};
export const canUseWebhook = (userRoles: string[], webhook: WebhookDefinition) =>
webhook.required_roles.every((role) => userRoles.includes(role));
export const toPublicWebhook = (webhook: WebhookDefinition): PublicWebhookDefinition => ({
id: webhook.id,
label: webhook.label,
description: webhook.description,
method: webhook.method,
required_roles: webhook.required_roles,
confirmation_required: webhook.confirmation_required,
});

View File

@@ -0,0 +1,288 @@
import type { AuthUser } from "@pi-chat/shared";
import type { FastifyInstance } from "fastify";
import { z } from "zod";
import type { AppDatabase } from "../db/database.js";
import { getAuthUser } from "../auth/index.js";
import { webhookRateLimiterFromEnv } from "../rate-limit.js";
import { createSessionRepository } from "../sessions/repository.js";
import { createWebhookAuditRepository } from "./audit.js";
import { canUseWebhook, loadWebhookDefinitions, toPublicWebhook } from "./config.js";
import { envNumber } from "../env.js";
const runWebhookBody = z.object({
sessionId: z.string().min(1),
confirmed: z.literal(true),
lastUserMessage: z.string().max(envNumber(process.env.CHAT_MESSAGE_MAX_CHARS, 8_000)).default(""),
payload: z.record(z.unknown()).default({}),
});
const webhookRunsQuery = z.object({
sessionId: z.string().min(1),
limit: z.coerce.number().int().min(1).max(50).default(20),
});
const renderTemplate = (template: unknown, context: Record<string, unknown>): unknown => {
if (typeof template === "string") {
return template.replace(/\{\{([a-zA-Z0-9_.]+)\}\}/g, (_match, path: string) => {
const value = path.split(".").reduce<unknown>((current, key) => {
if (current && typeof current === "object" && key in current) {
return (current as Record<string, unknown>)[key];
}
return "";
}, context);
return value == null ? "" : String(value);
});
}
if (Array.isArray(template)) {
return template.map((item) => renderTemplate(item, context));
}
if (template && typeof template === "object") {
return Object.fromEntries(
Object.entries(template).map(([key, value]) => [key, renderTemplate(value, context)]),
);
}
return template;
};
const buildPayload = (template: unknown, input: z.infer<typeof runWebhookBody>, user: AuthUser) => {
const templated = renderTemplate(template ?? {}, {
user,
session: { id: input.sessionId },
chat: { last_user_message: input.lastUserMessage },
});
return {
...(templated && typeof templated === "object" && !Array.isArray(templated) ? templated : {}),
...input.payload,
};
};
const fetchWithTimeout = async (url: string, init: RequestInit, timeoutMs: number) => {
const abortController = new AbortController();
const timeout = setTimeout(() => abortController.abort(), timeoutMs);
return fetch(url, { ...init, signal: abortController.signal }).finally(() => clearTimeout(timeout));
};
const sleep = (ms: number) =>
new Promise<void>((resolve) => {
setTimeout(resolve, ms);
});
const isRetryableStatus = (status: number) => status >= 500 || status === 429;
type RetryPolicy = {
maxAttempts: number;
initialBackoffMs: number;
maxBackoffMs: number;
timeoutMs: number;
};
const retryPolicyFromEnv = (): RetryPolicy => {
const maxAttempts = Math.max(1, envNumber(process.env.WEBHOOK_RETRY_MAX_ATTEMPTS, 3));
const initialBackoffMs = Math.max(0, envNumber(process.env.WEBHOOK_RETRY_INITIAL_BACKOFF_MS, 500));
const maxBackoffMs = Math.max(initialBackoffMs, envNumber(process.env.WEBHOOK_RETRY_MAX_BACKOFF_MS, 5_000));
const timeoutMs = Math.max(1, envNumber(process.env.WEBHOOK_TIMEOUT_MS, 15_000));
return { maxAttempts, initialBackoffMs, maxBackoffMs, timeoutMs };
};
type RunOutcome = {
response: Response | null;
attempts: number;
lastError: unknown;
};
const runWithRetry = async (url: string, init: RequestInit, policy: RetryPolicy): Promise<RunOutcome> => {
let lastError: unknown = null;
let response: Response | null = null;
for (let attempt = 1; attempt <= policy.maxAttempts; attempt++) {
try {
response = await fetchWithTimeout(url, init, policy.timeoutMs);
if (response.ok) {
return { response, attempts: attempt, lastError: null };
}
if (!isRetryableStatus(response.status)) {
// 4xx (non-429): don't retry, surface as-is.
return { response, attempts: attempt, lastError: null };
}
} catch (error) {
lastError = error;
response = null;
}
if (attempt < policy.maxAttempts) {
const backoff = Math.min(
policy.maxBackoffMs,
policy.initialBackoffMs * 2 ** (attempt - 1),
);
await sleep(backoff);
}
}
return { response, attempts: policy.maxAttempts, lastError };
};
export const registerWebhookRoutes = async (app: FastifyInstance, db: AppDatabase) => {
const sessions = createSessionRepository(db);
const audit = createWebhookAuditRepository(db);
const webhookRateLimiter = webhookRateLimiterFromEnv();
app.get("/api/webhooks", async (request) => {
const user = await getAuthUser(request);
const items = loadWebhookDefinitions()
.filter((webhook) => canUseWebhook(user.roles, webhook))
.map(toPublicWebhook);
return { items };
});
app.get("/api/webhooks/usage", async (request) => {
const user = await getAuthUser(request);
const query = z
.object({ days: z.coerce.number().int().min(1).max(365).default(7) })
.parse(request.query ?? {});
const since = new Date(Date.now() - query.days * 86_400_000).toISOString();
const usage = audit.usageForUserSince(since, user.id);
const items = Object.entries(usage).map(([webhook_id, stats]) => ({
webhook_id,
runs: stats.runs,
successes: stats.successes,
success_rate: stats.successRate,
window_days: query.days,
}));
return { window_days: query.days, items };
});
app.get("/api/webhook-runs", async (request, reply) => {
const user = await getAuthUser(request);
const query = webhookRunsQuery.parse(request.query);
const session = sessions.get(user.id, query.sessionId);
if (!session) {
return reply.code(404).send({ error: "session_not_found" });
}
const items = audit.listForSession(user.id, query.sessionId, query.limit).map((run) => ({
id: run.id,
webhook_id: run.webhook_id,
session_id: run.session_id,
status: run.status,
response_status: run.response_status,
attempts: run.attempts,
created_at: run.created_at,
}));
return { items };
});
app.post("/api/webhooks/:id/run", async (request, reply) => {
const user = await getAuthUser(request);
const params = z.object({ id: z.string().min(1) }).parse(request.params);
const body = runWebhookBody.parse(request.body);
const webhook = loadWebhookDefinitions().find((item) => item.id === params.id);
if (!webhook || !canUseWebhook(user.roles, webhook)) {
return reply.code(404).send({ error: "webhook_not_found" });
}
// Per-webhook abuse detection: each webhook_id has its own bucket so a
// runaway loop on one hook doesn't starve the rest. The bucket is shared
// across all users on purpose — that's the abuse signal.
const decision = webhookRateLimiter.consume(webhook.id);
if (!decision.ok) {
const retryAfterSec = Math.max(1, Math.ceil(decision.retryAfterMs / 1000));
app.log.warn(
{ webhook: webhook.id, user: user.id, retryAfterSec },
"webhook rate limit exceeded",
);
return reply
.code(429)
.header("retry-after", String(retryAfterSec))
.header("x-ratelimit-remaining", "0")
.send({
error: "rate_limited",
retry_after_ms: decision.retryAfterMs,
});
}
reply.header("x-ratelimit-remaining", String(decision.remaining));
const session = sessions.get(user.id, body.sessionId);
if (!session) {
return reply.code(404).send({ error: "session_not_found" });
}
const requestPayload = buildPayload(webhook.payload_template, body, user);
if (!webhook.url) {
audit.create({
webhookId: webhook.id,
userId: user.id,
sessionId: body.sessionId,
status: "error",
requestPayload,
attempts: 0,
});
return reply.code(500).send({ error: "webhook_not_configured" });
}
const policy = retryPolicyFromEnv();
const outcome = await runWithRetry(
webhook.url,
{
method: webhook.method,
headers: { "content-type": "application/json" },
body: webhook.method === "GET" ? undefined : JSON.stringify(requestPayload),
},
policy,
);
if (outcome.lastError) {
app.log.error({ err: outcome.lastError, webhook: webhook.id, attempts: outcome.attempts }, "webhook request failed after retries");
} else if (outcome.attempts > 1 && outcome.response) {
app.log.warn(
{ webhook: webhook.id, attempts: outcome.attempts, status: outcome.response.status },
"webhook request retried",
);
}
const response = outcome.response;
const httpOk = response?.ok ?? false;
const isTransportError = !response;
const run = audit.create({
webhookId: webhook.id,
userId: user.id,
sessionId: body.sessionId,
status: httpOk ? "success" : "error",
requestPayload,
responseStatus: response?.status ?? null,
attempts: outcome.attempts,
});
if (isTransportError) {
return reply.code(502).send({
id: run.id,
webhook_id: run.webhook_id,
status: run.status,
response_status: run.response_status,
attempts: run.attempts,
error: "webhook_request_failed",
});
}
return reply.code(httpOk ? 200 : 502).send({
id: run.id,
webhook_id: run.webhook_id,
status: run.status,
response_status: run.response_status,
attempts: run.attempts,
});
});
};