commit 62728b2200235a31cace57d171ad065bdc39acb2 Author: rikrdo Date: Mon Jun 29 16:20:53 2026 +0200 Initial commit: SIC harness (backend, web, pi-adapter, configs, docs) - pnpm monorepo: apps/api (Fastify + SQLite + SSE), apps/web (React+Vite), packages/shared, packages/pi-adapter - Local auth (admin/webhook-runner roles) + Keycloak JWT ready - Multi-session chat with reliable history (user persisted before LLM, assistant persisted after stream) - Markdown knowledge base with /api/docs/search + /api/docs/:id - YAML webhook catalog with backend-only execution, retry/backoff, audit (webhook_runs), and per-user rate limit - Skills config (sre-on-call, blameless-postmortem, security-incident) injected into LLM system prompt - LLM provider failover chain (config/models.yml fallback + LLM_FALLBACK_CHAIN override) - Context-aware webhooks panel + backend id-mention safety net - Per-message stats (time/duration/tokens/model), Markdown+GFM render, code & table copy/download buttons - Vitest suite, end-to-end smoke test (scripts/smoke.mjs), per-session system prompt override - /metrics Prometheus endpoint + /api/metrics JSON, request-id correlation - dotenv with explicit repo-root path; envString/envNumber helpers (handles empty-string env) - Runbooks + SOPs under knowledge/ in English; README, docs, and INDEX.md in English diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..ede87f5 --- /dev/null +++ b/.env.example @@ -0,0 +1,118 @@ +# ============================================================= +# SIC — Super Incident Commander +# Local development environment. +# +# How to use: +# 1. Copy this file to `.env`: +# cp .env.example .env +# 2. Edit `.env` and fill in the secrets (at minimum MINIMAX_API_KEY). +# 3. Start the API: +# pnpm dev +# +# The API loads `.env` automatically via dotenv at boot. Real shell +# environment variables always win over the file, so production / +# docker setups that inject env vars keep working unchanged. +# +# Never commit a real `.env` file — it's gitignored. +# ============================================================= + + +# --------------------------------------------------------------- +# Server +# --------------------------------------------------------------- +API_PORT=8787 +HOST=0.0.0.0 +API_BODY_LIMIT_BYTES=1048576 +CORS_ALLOWED_ORIGINS= +WEB_PORT=3000 +WEB_VITE_API_PROXY=http://localhost:8787 + +# --------------------------------------------------------------- +# Auth +# --------------------------------------------------------------- +# local: dev mode, returns a synthetic `local-user` with admin + webhook-runner roles +# keycloak: validates Authorization: Bearer JWT against OIDC_ISSUER/OIDC_AUDIENCE +AUTH_MODE=local +OIDC_ISSUER=https://auth.rikrdo.com/realms/homelab +OIDC_AUDIENCE=pi-chat + +# --------------------------------------------------------------- +# Persistence +# --------------------------------------------------------------- +DATABASE_URL=sqlite://./data/pi-chat.db + +# --------------------------------------------------------------- +# LLM provider (OpenAI-compatible) +# --------------------------------------------------------------- +# Default provider and base URL. The MiniMax and mr-auto model +# entries in config/models.yml both read these. +LLM_BASE_URL=https://api.minimax.io/v1 + +# Per-model fallback chain. Override the default chain parsed from +# config/models.yml. Comma-separated model ids in the order to try. +# Set to empty to disable and use the YAML-only chain. +# Example: LLM_FALLBACK_CHAIN=balanced,mr-auto +LLM_FALLBACK_CHAIN= +LLM_API_KEY= +DEFAULT_MODEL=fast + +# Backwards-compat alias for the MiniMax key. Either this or LLM_API_KEY works. +# Used by chat routes as a fallback when LLM_API_KEY is empty. +MINIMAX_API_KEY= + +# Per-model API key overrides (config/models.yml -> model.api_key_env). +# Only the mr-auto model needs this; MiniMax shares LLM_API_KEY. +MR_AUTO_API_KEY= + +LLM_TIMEOUT_MS=30000 + +# --------------------------------------------------------------- +# Chat input limits +# --------------------------------------------------------------- +CHAT_MESSAGE_MAX_CHARS=8000 + +# --------------------------------------------------------------- +# Rate limits +# --------------------------------------------------------------- +# /api/chat/stream — per authenticated user +CHAT_RATE_LIMIT_PER_MINUTE=20 +CHAT_RATE_LIMIT_BURST=5 + +# POST /api/webhooks/:id/run — per webhook id (across all users) +WEBHOOK_RATE_LIMIT_PER_MINUTE=60 +WEBHOOK_RATE_LIMIT_BURST=10 + +# --------------------------------------------------------------- +# Webhook execution +# --------------------------------------------------------------- +WEBHOOK_TIMEOUT_MS=15000 +WEBHOOK_RETRY_MAX_ATTEMPTS=3 +WEBHOOK_RETRY_INITIAL_BACKOFF_MS=500 +WEBHOOK_RETRY_MAX_BACKOFF_MS=5000 + +# Audit retention +WEBHOOK_RUNS_RETENTION_DAYS=30 +WEBHOOK_RUNS_MAX_PER_USER=1000 +WEBHOOK_AUDIT_PURGE_INTERVAL_MS=3600000 + +# Webhook usage stats window (days) for /api/webhooks/usage +WEBHOOK_USAGE_WINDOW_DAYS=7 + +# --------------------------------------------------------------- +# Config file paths +# --------------------------------------------------------------- +# Each config file can be overridden with an env var. Defaults +# resolve relative to apps/api (where the API is started). +MODELS_CONFIG_PATH= +WEBHOOKS_CONFIG_PATH= +SKILLS_CONFIG_PATH= +KNOWLEDGE_DIR= +N8N_CONFIG_PATH= +RAG_CONFIG_PATH= +MCP_CONFIG_PATH= + +# --------------------------------------------------------------- +# Development +# --------------------------------------------------------------- +# Set to `1` to enable verbose Fastify logging. +DEBUG=0 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..b3669a0 --- /dev/null +++ b/.gitignore @@ -0,0 +1,39 @@ +node_modules/ +dist/ +.env +.env.local +data/*.db +data/*.db-* +**/data/*.db +**/data/*.db-* +**/data/*.db-shm +**/data/*.db-wal +*.log + +# Local agent / codebase caches +.atl/ +.codebase-memory/ +INDEX.md +scripts/*.mjs +!scripts/*.mjs.bak + +# Editor / OS +.DS_Store +.idea/ +.vscode/ + +# Build artifacts +apps/api/dist/ +apps/web/dist/ +packages/*/dist/ + +# Logs +logs/ +*.log +*.pid +*.seed +*.pid.lock + +# Coverage +coverage/ +.nyc_output/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..a45a7e0 --- /dev/null +++ b/README.md @@ -0,0 +1,273 @@ +# SIC — Super Incident Commander + +Lightweight web harness to use a centralized `pi.dev` engine from the browser, with independent sessions, reliable history in SQLite, internal Markdown documentation, and webhooks executed only from the backend after explicit user confirmation. + +## MVP scope + +- Expected ceiling: 5 concurrent users. +- Frontend: React + Vite. +- Backend: Node.js + Fastify. +- Initial persistence: SQLite. +- LLM: OpenAI-compatible endpoint via `pi-adapter`. +- Default LLM provider: MiniMax OpenAI-compatible. +- Configuration: YAML + environment variables. +- Initial deploy: Docker Compose. + +## Reliability principle + +Nothing critical lives only in memory. Sessions, messages, and webhook audit are rebuilt from SQLite. + +Every conversation read/write must respect: + +```sql +WHERE session_id = ? +AND user_id = ? +``` + +## Structure + +```text +apps/ + api/ # Fastify API, SSE, sessions, webhooks, docs + web/ # React + Vite UI +packages/ + shared/ # Shared types + pi-adapter/ # pi.dev / OpenAI-compatible adapter +config/ # YAML for models, webhooks and docs +knowledge/ # Internal Markdown documentation +deploy/ # Docker Compose and future manifests +docs/ # Definitions, reliable history and agents +scripts/ # End-to-end smoke test + mock LLM +``` + +## API surface + +- `GET /healthz` +- `GET /readyz` +- `GET /api/version` +- `GET /api/me` +- `GET /api/sessions` +- `POST /api/sessions` +- `GET /api/sessions/:id` +- `PATCH /api/sessions/:id` +- `DELETE /api/sessions/:id` +- `GET /api/docs/search?q=vpn` +- `GET /api/docs/:id` +- `GET /api/models` +- `GET /api/webhooks` +- `GET /api/webhook-runs?sessionId=...` +- `POST /api/webhooks/:id/run` +- `GET /api/skills` +- `PATCH /api/sessions/:id/system-prompt` — set per-session context +- `GET /metrics` — Prometheus text +- `GET /api/metrics` — same as JSON +- `POST /api/chat/stream` + +## Chat stream contract + +`POST /api/chat/stream` takes `sessionId`, `message` and optionally `model`. + +Reliability rules: + +1. Validate that the session belongs to the current user. +2. Persist the `user` message before calling the LLM. +3. If the session has no title yet, derive a short one from the first message. +4. Validate the requested model against `config/models.yml`. +5. Search relevant Markdown docs and role-allowed webhooks. +6. Call the OpenAI-compatible endpoint via `pi-adapter`. If the model has a fallback chain, the chat route walks it on structured or transport errors; the first `ok=true` response wins. +7. Emit SSE events: `docs`, `token`, `actions`, `done`. +8. Persist the `assistant` response; if every model in the chain fails, persist a controlled message with error metadata and the full failure trail. + +### Provider fallback + +Each model in `config/models.yml` can declare `fallback: [other-id, ...]`. The chat route walks the chain when a model returns `ok=false` (no_content / json_parse / schema) or throws (5xx / 429 / network / timeout). When the assistant metadata is persisted, it includes `requested_model`, `fallback_attempts`, `fallback_chain`, and `fallback_failures` whenever the chain was actually used, so you can see what happened in the chat history. + +Override the chain globally with `LLM_FALLBACK_CHAIN` (comma-separated ids, first entry after the requested model). Leave empty to use each model's YAML chain. + +Default chain today (from `config/models.yml`): + +- `fast` → no fallback (it IS the cheap path) +- `balanced` → `mr-auto` +- `reasoning` → no fallback +- `mr-auto` → no fallback + +## MiniMax + +The project is wired to MiniMax via the official OpenAI-compatible endpoint: + +- Base URL: `https://api.minimax.io/v1` +- Chat path used by the adapter: `/chat/completions` +- Auth: `Authorization: Bearer ` + +Models configured in `config/models.yml`: + +- `fast` → `MiniMax-M2.7-highspeed` +- `balanced` → `MiniMax-M2.7` +- `reasoning` → `MiniMax-M3` + +To run locally, set the key: + +```bash +export MINIMAX_API_KEY="your-key" +export LLM_BASE_URL="https://api.minimax.io/v1" +export LLM_API_KEY="$MINIMAX_API_KEY" +export DEFAULT_MODEL="fast" +``` + +In Docker Compose you only need to export `MINIMAX_API_KEY`; the compose maps it to `LLM_API_KEY`. + +## UI MVP + +The React app already consumes the API through the Vite proxy: + +- Loads or creates a local session. +- Loads `GET /api/models` and lets the user pick the model per message. +- Lists persisted sessions and lets the user switch between them. +- Lets the user rename and delete sessions, always through the API with per-user isolation. +- Sends messages to `POST /api/chat/stream` and consumes SSE events. +- Shows recommended documentation and lets the user open the full document via `GET /api/docs/:id`. +- Shows suggested actions in the right panel. +- Loads `GET /api/webhooks` to show public labels/descriptions for actions. +- Executes webhooks only after user confirmation and always through the backend. +- Shows execution audit per session from `GET /api/webhook-runs`, without exposing URLs or payload templates. +- Can attach a development Bearer token to test `AUTH_MODE=keycloak`; reads from `localStorage` or `VITE_AUTH_TOKEN`. + +## Skills + +Skills are persona/behavior prompt fragments loaded from `config/skills.yml` and injected into the LLM's system prompt at chat time. They are NOT capabilities: the model still only recommends actions and the backend still owns execution. + +Each skill has: `id`, `name`, `description`, `enabled`, `prompt`. Skills with `enabled: true` are injected into the chat system prompt (after the base identity prompt, before the docs/actions context). Skills with `enabled: false` are kept in the file but inactive. The frontend can list them via `GET /api/skills` (no prompt text is exposed publicly — only id, name, description, enabled). + +Edit `config/skills.yml` and restart the API to change the active skill set. The default file ships with `sre-on-call` and `blameless-postmortem` enabled; `security-incident` is shipped disabled as a reference. + +The env var `SKILLS_CONFIG_PATH` overrides the default config path (`../../config/skills.yml` relative to `cwd`). + +## Per-session context + +Every session has an optional `system_prompt` field. When set, it is prepended to every chat turn as a system message (after the base identity prompt and skill prompts, before the docs/actions context). Use it to pin incident id, on-call name, or runbook references that shouldn't drift across the conversation. + +- **Frontend**: each session row has a small circle button (`○` empty, `●` set). Click it to open a modal editor with Save and Clear. +- **API**: `PATCH /api/sessions/:id/system-prompt` with `{ "system_prompt": "..." }`. Send `null` or empty string to clear. +- **Limit**: 8000 characters. +- **Persistence**: stored in `chat_sessions.system_prompt`; same `WHERE id = ? AND user_id = ?` ownership rule as every other session operation. + +## Observability + +Two endpoints surface API metrics: + +- `GET /metrics` — Prometheus text exposition (counter / summary), scraper-friendly. Default Prometheus port / scrape target. +- `GET /api/metrics` — same data as JSON for humans and the smoke test. Shape: + + ```json + { + "started_at": "2026-06-29T12:00:00.000Z", + "uptime_seconds": 1234, + "totals": { "requests": 5678, "errors_5xx": 0 }, + "routes": [ + { + "route": "/api/chat/stream", + "method": "POST", + "count": 42, + "avg_ms": 1230, + "p95_ms": 4500, + "max_ms": 8000, + "status_buckets": { "200_299": 42 } + } + ], + "recent": [ + { + "route": "/api/sessions/:id", + "method": "DELETE", + "status": 204, + "durationMs": 4, + "timestamp": 1782727300000 + } + ] + } + ``` + +Routes are aggregated by route **template** (e.g. `/api/sessions/:id`), not by raw URL, so `/api/sessions/abc` and `/api/sessions/def` share a bucket. p95 uses a fixed-size streaming reservoir (200 samples) so memory stays bounded under traffic. In-memory only — counters reset on restart; that's the expected behavior for a 5-user MVP. + +## Auth + +The backend supports two modes: + +- `AUTH_MODE=local`: dev mode, uses `local-user` with roles `admin` and `webhook-runner`. +- `AUTH_MODE=keycloak`: validates `Authorization: Bearer ` with remote JWKS from `OIDC_ISSUER` and `OIDC_AUDIENCE`. + +For manual Keycloak testing, the UI lets you paste a JWT in the "Dev token" box. That token is stored in `localStorage` and sent as `Authorization: Bearer ` on API and stream calls. Alternatively, Vite can receive `VITE_AUTH_TOKEN` to preconfigure it for the local environment. + +Claims used from Keycloak: + +- `sub` as `user.id`. +- `preferred_username` and `email` for display. +- Roles from `realm_access.roles` and `resource_access[OIDC_AUDIENCE].roles`. + +## Basic hardening + +- `API_BODY_LIMIT_BYTES`: global Fastify body limit. Default: `1048576`. +- `CHAT_MESSAGE_MAX_CHARS`: chat message and `lastUserMessage` limit on webhooks. Default: `8000`. +- `CORS_ALLOWED_ORIGINS`: comma-separated list. If unset, open for dev. +- `LLM_TIMEOUT_MS`: OpenAI-compatible call timeout. Default: `30000`. +- `WEBHOOK_TIMEOUT_MS`: backend-only webhook execution timeout. Default: `15000`. +- `WEBHOOK_RETRY_MAX_ATTEMPTS`: retries per webhook on transient errors (5xx, 429, timeout, network). Default: `3`. +- `WEBHOOK_RETRY_INITIAL_BACKOFF_MS`: initial backoff with exponential growth. Default: `500`. +- `WEBHOOK_RETRY_MAX_BACKOFF_MS`: backoff cap. Default: `5000`. +- `WEBHOOK_RUNS_RETENTION_DAYS`: age cutoff for `webhook_runs` rows. Runs older than this are purged on boot and on a timer. Default: `30`. Set to `0` to disable the age pass. +- `WEBHOOK_RUNS_MAX_PER_USER`: keep at most this many most-recent runs per user. The oldest overflow is purged. Default: `1000`. Set to `0` to disable the cap pass. +- `WEBHOOK_AUDIT_PURGE_INTERVAL_MS`: how often the janitor runs while the API is up. Default: `3600000` (1 hour). Minimum: `60000` (1 minute). +- `CHAT_RATE_LIMIT_PER_MINUTE`: per-user rate limit on `POST /api/chat/stream` (token-bucket refill rate). Default: `20`. +- `CHAT_RATE_LIMIT_BURST`: per-user burst size. Default: `5`. Rejected calls return `429` with `retry-after` in seconds and `x-ratelimit-remaining: 0`. +- The API adds basic defensive headers: `x-content-type-options`, `referrer-policy`, `x-frame-options`. + +## End-to-end smoke test + +A smoke script exercises the full API (health, auth, models, docs, webhooks, sessions, SSE stream, message persistence and audit). + +### With a real LLM (MiniMax) + +```bash +# Terminal 1: start the API and the web +export LLM_BASE_URL=https://api.minimax.io/v1 +export LLM_API_KEY="$MINIMAX_API_KEY" +export DEFAULT_MODEL=fast +pnpm dev + +# Terminal 2: smoke test against http://localhost:3000 +pnpm smoke +``` + +### With the mock LLM (no key) + +```bash +# Terminal 1: start the API and the web pointing at the mock +pnpm mock:llm & +export LLM_BASE_URL=http://127.0.0.1:4010/v1 +export LLM_API_KEY=dummy +export DEFAULT_MODEL=fast +pnpm dev + +# Terminal 2 +pnpm smoke + +# or in a single step, the script starts the mock internally: +pnpm smoke:mock +``` + +Steps covered (in order): + +1. `/healthz`, `/readyz` +2. `/api/me` (local auth) +3. `/api/models` +4. `/api/docs/search` + `/api/docs/:id` +5. `/api/webhooks` +6. `POST /api/sessions` + `GET /api/sessions` +7. `POST /api/chat/stream` and SSE event parsing (`docs`, `token`, `actions`, `done`) +8. `GET /api/sessions/:id` to confirm the assistant message was persisted +9. `GET /api/webhook-runs?sessionId=...` to confirm audit listing +10. `DELETE /api/sessions/:id` (cleanup) + +Optional flags: + +- `pnpm smoke --api-base http://localhost:4000` to point at a different API +- `pnpm smoke:mock` (alias of `pnpm smoke --mock-llm`) starts the mock inside the script diff --git a/apps/api/package.json b/apps/api/package.json new file mode 100644 index 0000000..69c76a6 --- /dev/null +++ b/apps/api/package.json @@ -0,0 +1,28 @@ +{ + "name": "@pi-chat/api", + "private": true, + "version": "0.1.0", + "type": "module", + "scripts": { + "dev": "tsx watch src/server.ts", + "typecheck": "tsc --noEmit", + "lint": "tsc --noEmit" + }, + "dependencies": { + "@fastify/cors": "^11.0.1", + "@pi-chat/pi-adapter": "workspace:*", + "@pi-chat/shared": "workspace:*", + "better-sqlite3": "^11.10.0", + "dotenv": "^17.4.2", + "fastify": "^5.3.3", + "jose": "^5.10.0", + "yaml": "^2.7.1", + "zod": "^3.25.67" + }, + "devDependencies": { + "@types/better-sqlite3": "^7.6.13", + "@types/node": "^22.15.32", + "tsx": "^4.20.3", + "typescript": "^5.8.3" + } +} diff --git a/apps/api/src/auth/index.ts b/apps/api/src/auth/index.ts new file mode 100644 index 0000000..4ec425b --- /dev/null +++ b/apps/api/src/auth/index.ts @@ -0,0 +1,76 @@ +import type { AuthUser } from "@pi-chat/shared"; +import type { FastifyRequest } from "fastify"; +import { createRemoteJWKSet, jwtVerify } from "jose"; +import { envString } from "../env.js"; + +type KeycloakClaims = { + sub?: string; + preferred_username?: string; + email?: string; + realm_access?: { roles?: string[] }; + resource_access?: Record; +}; + +const authMode = () => envString(process.env.AUTH_MODE, "local"); +const oidcIssuer = () => envString(process.env.OIDC_ISSUER, "https://auth.rikrdo.com/realms/homelab"); +const oidcAudience = () => envString(process.env.OIDC_AUDIENCE, "pi-chat"); + +let jwks: ReturnType | undefined; + +const getLocalUser = (): AuthUser => ({ + id: "local-user", + username: "local-user", + roles: ["admin", "webhook-runner"], +}); + +const bearerTokenFrom = (request: FastifyRequest) => { + const header = request.headers.authorization; + if (!header?.startsWith("Bearer ")) { + throw new Error("auth_missing_bearer_token"); + } + + return header.slice("Bearer ".length).trim(); +}; + +const rolesFromClaims = (claims: KeycloakClaims) => { + const audience = oidcAudience(); + const realmRoles = claims.realm_access?.roles ?? []; + const clientRoles = claims.resource_access?.[audience]?.roles ?? []; + + return [...new Set([...realmRoles, ...clientRoles])]; +}; + +const getKeycloakUser = async (request: FastifyRequest): Promise => { + const issuer = oidcIssuer(); + const audience = oidcAudience(); + jwks ??= createRemoteJWKSet(new URL(`${issuer}/protocol/openid-connect/certs`)); + + const { payload } = await jwtVerify(bearerTokenFrom(request), jwks, { + issuer, + audience, + }); + const claims = payload as KeycloakClaims; + + if (!claims.sub) { + throw new Error("auth_missing_subject"); + } + + return { + id: claims.sub, + username: claims.preferred_username, + email: claims.email, + roles: rolesFromClaims(claims), + }; +}; + +export const getAuthUser = async (request: FastifyRequest): Promise => { + if (authMode() === "local") { + return getLocalUser(); + } + + if (authMode() === "keycloak") { + return getKeycloakUser(request); + } + + throw new Error(`auth_mode_not_supported:${authMode()}`); +}; diff --git a/apps/api/src/auth/routes.ts b/apps/api/src/auth/routes.ts new file mode 100644 index 0000000..63ca34a --- /dev/null +++ b/apps/api/src/auth/routes.ts @@ -0,0 +1,6 @@ +import type { FastifyInstance } from "fastify"; +import { getAuthUser } from "./index.js"; + +export const registerAuthRoutes = async (app: FastifyInstance) => { + app.get("/api/me", async (request) => ({ user: await getAuthUser(request) })); +}; diff --git a/apps/api/src/chat/routes.ts b/apps/api/src/chat/routes.ts new file mode 100644 index 0000000..5b375ab --- /dev/null +++ b/apps/api/src/chat/routes.ts @@ -0,0 +1,355 @@ +import { createOpenAICompatiblePiAdapter } from "@pi-chat/pi-adapter"; +import type { InternalDocReference, RecommendedAction } from "@pi-chat/shared"; +import type { FastifyInstance, FastifyReply } from "fastify"; +import { z } from "zod"; +import { getAuthUser } from "../auth/index.js"; +import type { AppDatabase } from "../db/database.js"; +import { createDocsRepository, type KnowledgeSearchResult } from "../docs/repository.js"; +import { findModelDefinition, getDefaultModelId, resolveFallbackChain, resolveModelApiKey } from "../models/config.js"; +import { envNumber } from "../env.js"; +import { chatRateLimiterFromEnv } from "../rate-limit.js"; +import { getEnabledSkillPrompts } from "../skills/config.js"; +import { createMessageRepository, createSessionRepository } from "../sessions/repository.js"; +import { createWebhookAuditRepository } from "../webhooks/audit.js"; +import { canUseWebhook, loadWebhookDefinitions } from "../webhooks/config.js"; + +const chatStreamBody = z.object({ + sessionId: z.string().min(1), + message: z.string().trim().min(1).max(envNumber(process.env.CHAT_MESSAGE_MAX_CHARS, 8_000)), + model: z.string().trim().default(getDefaultModelId()), +}); + +const sendEvent = (reply: FastifyReply, event: unknown) => { + reply.raw.write(`data: ${JSON.stringify(event)}\n\n`); +}; + +const sendAnswerTokens = (reply: FastifyReply, answer: string) => { + for (const token of answer.match(/\S+\s*/g) ?? []) { + sendEvent(reply, { type: "token", token }); + } +}; + +const toDocReference = (doc: KnowledgeSearchResult): InternalDocReference => ({ + id: doc.id, + title: doc.title, + source: doc.source, + relevance: doc.relevance, +}); + +const shouldAutoTitle = (title: string | null) => !title || title === "New session"; + +const titleFromMessage = (message: string) => { + const normalized = message.replace(/\s+/g, " ").trim(); + return normalized.length > 48 ? `${normalized.slice(0, 45)}...` : normalized; +}; + +// Match a webhook id against a chunk of text. Accepts the full id (with - or _) +// OR its first word (e.g. id "dns-flush" matches "dns" in "flush the dns cache"). +// Both sides are normalized to lowercase and dashes/underscores → spaces. +const matchesId = (text: string, id: string): boolean => { + const normalized = text.toLowerCase().replace(/[-_]+/g, " "); + const normalizedId = id.toLowerCase().replace(/[-_]+/g, " "); + if (normalized.includes(normalizedId)) return true; + const firstWord = normalizedId.split(" ")[0] ?? ""; + if (firstWord.length >= 3 && new RegExp(`\\b${firstWord}\\b`, "i").test(normalized)) { + return true; + } + return false; +}; + +const enrichActionsWithMentions = ( + userMessage: string, + answer: string, + recommended: RecommendedAction[], + available: RecommendedAction[], +): RecommendedAction[] => { + const seen = new Set(recommended.map((action) => action.id)); + const enriched = [...recommended]; + for (const candidate of available) { + if (seen.has(candidate.id)) continue; + if (matchesId(userMessage, candidate.id) || matchesId(answer, candidate.id)) { + enriched.push({ + ...candidate, + // Lower confidence than LLM-recommended ones so the UI can show the + // difference if needed; still actionable. + confidence: 0.4, + reason: "Mentioned in the conversation", + }); + seen.add(candidate.id); + } + } + return enriched; +}; + +export const registerChatRoutes = async (app: FastifyInstance, db: AppDatabase) => { + const sessions = createSessionRepository(db); + const messages = createMessageRepository(db); + const docs = createDocsRepository(); + const audit = createWebhookAuditRepository(db); + const rateLimiter = chatRateLimiterFromEnv(); + + app.post("/api/chat/stream", async (request, reply) => { + const user = await getAuthUser(request); + + const decision = rateLimiter.consume(user.id); + if (!decision.ok) { + const retryAfterSec = Math.max(1, Math.ceil(decision.retryAfterMs / 1000)); + app.log.warn({ user: user.id, retryAfterSec }, "chat rate limit exceeded"); + return reply + .code(429) + .header("retry-after", String(retryAfterSec)) + .header("x-ratelimit-remaining", "0") + .send({ + error: "rate_limited", + retry_after_ms: decision.retryAfterMs, + }); + } + reply.header("x-ratelimit-remaining", String(decision.remaining)); + + const body = chatStreamBody.parse(request.body); + const session = sessions.get(user.id, body.sessionId); + const selectedModel = findModelDefinition(body.model); + + if (!session) { + return reply.code(404).send({ error: "session_not_found" }); + } + + if (!selectedModel) { + return reply.code(400).send({ error: "model_not_found" }); + } + +const resolvedKey = resolveModelApiKey(selectedModel); + app.log.debug( + { + model: selectedModel.id, + keyLen: resolvedKey.length, + llmKeyLen: (process.env.LLM_API_KEY ?? "").length, + }, + "llm api key resolved", + ); + + // Build the ordered fallback chain starting at the selected model. The + // adapter is built fresh per model because base URL, key and provider + // model name can differ across chain entries. + const chain = resolveFallbackChain(selectedModel.id); + const chainModels = chain + .map((id) => findModelDefinition(id)) + .filter((m): m is NonNullable> => Boolean(m)); + + const adapters = chainModels.map((model) => ({ + model, + pi: createOpenAICompatiblePiAdapter({ + baseUrl: model.base_url || process.env.LLM_BASE_URL || "https://api.minimax.io/v1", + apiKey: resolveModelApiKey(model), + defaultModel: model.model, + maxTokens: model.max_tokens, + }), + })); + + reply.raw.writeHead(200, { + "content-type": "text/event-stream; charset=utf-8", + "cache-control": "no-cache, no-transform", + connection: "keep-alive", + }); + + const userMessage = messages.create({ + sessionId: body.sessionId, + userId: user.id, + role: "user", + content: body.message, + metadata: { model: selectedModel.id }, + }); + + if (shouldAutoTitle(session.title)) { + sessions.updateTitle(user.id, body.sessionId, titleFromMessage(body.message)); + } + + sessions.touch(user.id, body.sessionId); + + const docResults = await docs.search(body.message, 5); + const internalDocs = docResults.map(toDocReference); + + // Soft usage signal: how often has this user run each webhook in the + // recent past. Surfaced as `usageHint` so the LLM can prefer frequently + // used webhooks when ambiguous, and so the UI can show a "Most used" tag. + const usageSinceDays = Math.max(0, envNumber(process.env.WEBHOOK_USAGE_WINDOW_DAYS, 7)); + const usageSince = usageSinceDays > 0 + ? new Date(Date.now() - usageSinceDays * 86_400_000).toISOString() + : new Date(0).toISOString(); + const usageMap = usageSinceDays > 0 + ? audit.usageForUserSince(usageSince, user.id) + : {}; + + const formatUsageHint = (webhookId: string): string | null => { + const stats = usageMap[webhookId]; + if (!stats || stats.runs === 0) return null; + const successPct = Math.round(stats.successRate * 100); + return `${stats.runs} run${stats.runs === 1 ? "" : "s"} in last ${usageSinceDays}d, ${successPct}% success`; + }; + + const availableActions: RecommendedAction[] = loadWebhookDefinitions() + .filter((webhook) => canUseWebhook(user.roles, webhook)) + .map((webhook) => { + const usageHint = formatUsageHint(webhook.id); + return { + type: "webhook" as const, + id: webhook.id, + confidence: 0, + reason: webhook.description ?? webhook.label, + requires_confirmation: webhook.confirmation_required, + ...(usageHint ? { usageHint } : {}), + }; + }); + + sendEvent(reply, { type: "docs", docs: docResults }); + + try { + const history = messages + .listForSession(user.id, body.sessionId) + .filter((message) => message.id !== userMessage.id) + .slice(-12) + .map((message) => ({ role: message.role, content: message.content })); + + const t0 = Date.now(); + // Walk the fallback chain. The first adapter that returns ok=true + // wins. If a structured error comes back from any one model we move + // to the next; an exception (network/5xx/timeout) also jumps chain. + let chat: Awaited> | null = null; + let usedModelId = selectedModel.id; + let fallbackAttempts = 0; + const failures: Array<{ model: string; reason: string; kind?: string }> = []; + + for (const entry of adapters) { + try { + const result = await entry.pi.chat({ + message: body.message, + model: entry.model.model, + docs: internalDocs, + availableActions, + history, + skillPrompts: getEnabledSkillPrompts(), + systemPrompt: session.system_prompt, + }); + if (result.ok) { + chat = result; + usedModelId = entry.model.id; + break; + } + // Structured error (no_content / json_parse / schema). Try next. + failures.push({ + model: entry.model.id, + kind: result.error.kind, + reason: result.error.kind === "no_content" + ? result.error.message + : result.error.reason, + }); + fallbackAttempts += 1; + chat = result; // keep last error for the controlled fallback path + usedModelId = entry.model.id; + } catch (error) { + // Transport / timeout / 5xx — also fall through. + failures.push({ + model: entry.model.id, + reason: error instanceof Error ? error.message : String(error), + }); + fallbackAttempts += 1; + app.log.warn( + { model: entry.model.id, err: error }, + "llm call failed, trying next model in fallback chain", + ); + } + } + + const durationMs = Date.now() - t0; + if (!chat) { + throw new Error("all fallback models failed"); + } + + // The adapter may return ok=true (well-formed JSON) or ok=false with + // a structured error + safe fallback. In both cases the fallback + // contains a usable `answer` and (possibly empty) actions; we never + // throw on a parse/schema problem — those are operational signal, not + // request failures. + const result = chat.ok ? chat.result : chat.fallback; + if (!chat.ok) { + app.log.warn( + { + kind: chat.error.kind, + reason: chat.error.kind === "no_content" ? chat.error.message : chat.error.reason, + model: usedModelId, + }, + "pi-adapter returned a parse/structured error; using safe fallback", + ); + } + + // Deterministic safety net: if the LLM forgot to put a relevant webhook + // in `recommended_actions` (common with short user prompts), scan both + // the user's input and the model's answer for any role-allowed webhook + // id (or its first word) and synthesize an action so the user can still + // execute it from the right panel. + const recommendedActions = enrichActionsWithMentions( + body.message, + result.answer, + result.recommended_actions, + availableActions, + ); + + sendAnswerTokens(reply, result.answer); + sendEvent(reply, { type: "actions", actions: recommendedActions }); + + const assistantMetadata: Record = { + model: usedModelId, + docs: result.internal_docs, + actions: result.recommended_actions, + usage: { ...(chat.usage ?? {}), durationMs }, + }; + if (usedModelId !== selectedModel.id || fallbackAttempts > 0) { + assistantMetadata.requested_model = selectedModel.id; + assistantMetadata.fallback_attempts = fallbackAttempts; + assistantMetadata.fallback_chain = chain; + assistantMetadata.fallback_failures = failures; + } + if (!chat.ok) { + assistantMetadata.error_kind = chat.error.kind; + assistantMetadata.error_reason = chat.error.kind === "no_content" + ? chat.error.message + : chat.error.reason; + assistantMetadata.fallback = true; + } + + messages.create({ + sessionId: body.sessionId, + userId: user.id, + role: "assistant", + content: result.answer, + metadata: assistantMetadata, + }); + sessions.touch(user.id, body.sessionId); + + sendEvent(reply, { type: "done" }); + reply.raw.end(); + } catch (error) { + const message = "I could not complete the model response. The user message has been saved."; + + app.log.error(error); + messages.create({ + sessionId: body.sessionId, + userId: user.id, + role: "assistant", + content: message, + metadata: { + model: selectedModel.id, + error: error instanceof Error ? error.message : "unknown_error", + }, + }); + sessions.touch(user.id, body.sessionId); + + sendEvent(reply, { type: "token", token: message }); + sendEvent(reply, { type: "error", error: "llm_error" }); + sendEvent(reply, { type: "done" }); + reply.raw.end(); + } + + return reply; + }); +}; diff --git a/apps/api/src/db/database.ts b/apps/api/src/db/database.ts new file mode 100644 index 0000000..c7d9c34 --- /dev/null +++ b/apps/api/src/db/database.ts @@ -0,0 +1,27 @@ +import Database from "better-sqlite3"; +import { mkdirSync } from "node:fs"; +import { dirname, resolve } from "node:path"; + +export type AppDatabase = Database.Database; + +function sqlitePathFromUrl(databaseUrl: string): string { + if (databaseUrl.startsWith("sqlite:///")) { + return databaseUrl.replace("sqlite://", ""); + } + + if (databaseUrl.startsWith("sqlite://")) { + return databaseUrl.replace("sqlite://", ""); + } + + return databaseUrl; +} + +export function openDatabase(databaseUrl = process.env.DATABASE_URL ?? "sqlite://./data/pi-chat.db"): AppDatabase { + const filename = resolve(sqlitePathFromUrl(databaseUrl)); + mkdirSync(dirname(filename), { recursive: true }); + + const db = new Database(filename); + db.pragma("journal_mode = WAL"); + db.pragma("foreign_keys = ON"); + return db; +} diff --git a/apps/api/src/db/migrate.ts b/apps/api/src/db/migrate.ts new file mode 100644 index 0000000..5026878 --- /dev/null +++ b/apps/api/src/db/migrate.ts @@ -0,0 +1,62 @@ +import type { AppDatabase } from "./database.js"; + +export function migrate(db: AppDatabase): void { + db.exec(` + CREATE TABLE IF NOT EXISTS chat_sessions ( + id TEXT PRIMARY KEY, + user_id TEXT NOT NULL, + title TEXT, + system_prompt TEXT, + created_at TEXT NOT NULL, + updated_at TEXT NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_chat_sessions_user_updated + ON chat_sessions(user_id, updated_at DESC); + + CREATE TABLE IF NOT EXISTS chat_messages ( + id TEXT PRIMARY KEY, + session_id TEXT NOT NULL, + user_id TEXT NOT NULL, + role TEXT NOT NULL, + content TEXT NOT NULL, + metadata TEXT, + created_at TEXT NOT NULL, + FOREIGN KEY (session_id) REFERENCES chat_sessions(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_chat_messages_session_user_created + ON chat_messages(session_id, user_id, created_at ASC); + + CREATE TABLE IF NOT EXISTS webhook_runs ( + id TEXT PRIMARY KEY, + webhook_id TEXT NOT NULL, + user_id TEXT NOT NULL, + session_id TEXT NOT NULL, + status TEXT NOT NULL, + request_payload TEXT, + response_status INTEGER, + attempts INTEGER NOT NULL DEFAULT 1, + created_at TEXT NOT NULL, + FOREIGN KEY (session_id) REFERENCES chat_sessions(id) ON DELETE CASCADE + ); + + CREATE INDEX IF NOT EXISTS idx_webhook_runs_session_user_created + ON webhook_runs(session_id, user_id, created_at DESC); + `); + + // Idempotent additive migrations for existing DBs. + const webhookRunColumns = db + .prepare("PRAGMA table_info(webhook_runs)") + .all() as Array<{ name: string }>; + if (!webhookRunColumns.some((column) => column.name === "attempts")) { + db.exec("ALTER TABLE webhook_runs ADD COLUMN attempts INTEGER NOT NULL DEFAULT 1"); + } + + const sessionColumns = db + .prepare("PRAGMA table_info(chat_sessions)") + .all() as Array<{ name: string }>; + if (!sessionColumns.some((column) => column.name === "system_prompt")) { + db.exec("ALTER TABLE chat_sessions ADD COLUMN system_prompt TEXT"); + } +} diff --git a/apps/api/src/docs/repository.ts b/apps/api/src/docs/repository.ts new file mode 100644 index 0000000..69e6a1f --- /dev/null +++ b/apps/api/src/docs/repository.ts @@ -0,0 +1,255 @@ +import { readdirSync, readFileSync, statSync } from "node:fs"; +import { relative, resolve } from "node:path"; +import YAML from "yaml"; +import { envString } from "../env.js"; +import { loadRagConfig } from "../rag/config.js"; +import { getViaRag, isRagRemote, searchViaRag } from "../rag/client.js"; + +export type KnowledgeDoc = { + id: string; + title: string; + source: string; + tags: string[]; + owner?: string; + updated?: string; + headings: string[]; + content: string; +}; + +export type KnowledgeSearchResult = Omit & { + relevance: number; + excerpt: string; +}; + +const defaultKnowledgeDir = () => resolve(process.cwd(), "../../knowledge"); + +const normalizePathId = (source: string) => + source.replace(/\.md$/i, "").split(/[\\/]/g).join(":"); + +const walkMarkdownFiles = (dir: string): string[] => { + const entries = readdirSync(dir, { withFileTypes: true }); + + return entries.flatMap((entry) => { + const fullPath = resolve(dir, entry.name); + + if (entry.isDirectory()) { + return walkMarkdownFiles(fullPath); + } + + if (entry.isFile() && entry.name.endsWith(".md")) { + return [fullPath]; + } + + return []; + }); +}; + +const parseFrontmatter = (raw: string) => { + if (!raw.startsWith("---")) { + return { metadata: {}, body: raw }; + } + + const end = raw.indexOf("\n---", 3); + if (end === -1) { + return { metadata: {}, body: raw }; + } + + const frontmatter = raw.slice(3, end).trim(); + const body = raw.slice(end + 4).trim(); + const metadata = YAML.parse(frontmatter) ?? {}; + + return { metadata, body }; +}; + +const extractHeadings = (body: string) => + body + .split("\n") + .filter((line) => line.startsWith("#")) + .map((line) => line.replace(/^#+\s*/, "").trim()) + .filter(Boolean); + +const tokenize = (value: string) => + value + .toLowerCase() + .split(/[^a-z0-9]+/i) + .map((token) => token.trim()) + .filter((token) => token.length >= 2); + +const scoreDoc = (doc: KnowledgeDoc, query: string) => { + const tokens = tokenize(query); + if (tokens.length === 0) { + return 0.1; + } + + const title = doc.title.toLowerCase(); + const tags = doc.tags.join(" ").toLowerCase(); + const headings = doc.headings.join(" ").toLowerCase(); + const source = doc.source.toLowerCase(); + const content = doc.content.toLowerCase(); + + return tokens.reduce((score, token) => { + if (title.includes(token)) score += 5; + if (tags.includes(token)) score += 4; + if (headings.includes(token)) score += 3; + if (source.includes(token)) score += 2; + if (content.includes(token)) score += 1; + return score; + }, 0); +}; + +const excerptFor = (content: string, query: string) => { + const token = tokenize(query)[0]; + const compact = content.replace(/\s+/g, " ").trim(); + if (!token) return compact.slice(0, 220); + + const index = compact.toLowerCase().indexOf(token); + if (index === -1) return compact.slice(0, 220); + + return compact.slice(Math.max(0, index - 80), index + 140); +}; + +// RAG-driven tag filter: a doc matches if it has at least one of the +// includeTags (if any) and none of the excludeTags. +const passesTagFilter = ( + doc: KnowledgeSearchResult, + includeTags: string[], + excludeTags: string[], +): boolean => { + if (includeTags.length > 0) { + const hasIncluded = doc.tags.some((tag) => includeTags.includes(tag)); + if (!hasIncluded) return false; + } + if (excludeTags.length > 0) { + const hasExcluded = doc.tags.some((tag) => excludeTags.includes(tag)); + if (hasExcluded) return false; + } + return true; +}; + +export const loadKnowledgeDocs = (knowledgeDir = envString(process.env.KNOWLEDGE_DIR, defaultKnowledgeDir())): KnowledgeDoc[] => { + const root = resolve(knowledgeDir); + + try { + statSync(root); + } catch { + return []; + } + + return walkMarkdownFiles(root).map((filePath) => { + const source = relative(root, filePath); + const raw = readFileSync(filePath, "utf8"); + const { metadata, body } = parseFrontmatter(raw); + const data = metadata as Record; + const headings = extractHeadings(body); + + return { + id: normalizePathId(source), + title: typeof data.title === "string" ? data.title : headings[0] ?? source, + source, + tags: Array.isArray(data.tags) ? data.tags.map(String) : [], + owner: typeof data.owner === "string" ? data.owner : undefined, + updated: typeof data.updated === "string" ? data.updated : undefined, + headings, + content: body, + }; + }); +}; + +export const createDocsRepository = () => { + const rag = loadRagConfig(); + const useRemote = isRagRemote(rag); + + return { + async search(query: string, limit?: number): Promise { + const effectiveLimit = Math.max(1, limit ?? rag.topK); + + if (useRemote) { + try { + return await searchViaRag(rag, query, effectiveLimit); + } catch (error) { + if (!rag.fallbackToLocal) throw error; + } + } + + // Local fallback: read from knowledge/ and apply token-overlap scoring. + const fullDocs = loadKnowledgeDocs().filter((doc) => + passesTagFilter( + { + id: doc.id, + title: doc.title, + source: doc.source, + tags: doc.tags, + owner: doc.owner, + updated: doc.updated, + headings: doc.headings, + relevance: 0, + excerpt: "", + }, + rag.includeTags, + rag.excludeTags, + ), + ); + const scored = fullDocs + .map((doc) => ({ + ...doc, + relevance: scoreDoc(doc, query), + excerpt: excerptFor(doc.content, query), + })) + .filter((doc) => doc.relevance >= rag.minRelevance && doc.relevance > 0); + + return scored + .sort((a, b) => b.relevance - a.relevance || a.title.localeCompare(b.title)) + .slice(0, effectiveLimit) + .map(({ content: _content, ...doc }) => doc); + }, + + async get(id: string): Promise { + if (useRemote) { + try { + return await getViaRag(rag, id); + } catch (error) { + if (!rag.fallbackToLocal) throw error; + } + } + return loadKnowledgeDocs().find((doc) => doc.id === id); + }, + + async list(limit = 500): Promise { + const candidates = (await getAllMetadataLocal()) + .filter((doc) => passesTagFilter(doc, rag.includeTags, rag.excludeTags)) + .slice(0, limit); + return candidates; + }, + + async count(): Promise { + const candidates = await getAllMetadataLocal(); + return candidates.filter((doc) => + passesTagFilter(doc, rag.includeTags, rag.excludeTags), + ).length; + }, + }; +}; + +const getAllMetadataLocal = async (): Promise => { + const rag = loadRagConfig(); + const useRemote = isRagRemote(rag); + if (useRemote) { + try { + return await searchViaRag(rag, "", 1000); + } catch (error) { + if (!rag.fallbackToLocal) throw error; + } + } + const docs = loadKnowledgeDocs(); + return docs.map((doc) => ({ + id: doc.id, + title: doc.title, + source: doc.source, + tags: doc.tags, + owner: doc.owner, + updated: doc.updated, + headings: doc.headings, + relevance: 0, + excerpt: "", + })); +}; diff --git a/apps/api/src/docs/routes.ts b/apps/api/src/docs/routes.ts new file mode 100644 index 0000000..f204e97 --- /dev/null +++ b/apps/api/src/docs/routes.ts @@ -0,0 +1,41 @@ +import type { FastifyInstance } from "fastify"; +import { z } from "zod"; +import { createDocsRepository } from "./repository.js"; + +const searchQuery = z.object({ + q: z.string().trim().default(""), + limit: z.coerce.number().int().min(1).max(20).default(5), +}); + +const listQuery = z.object({ + limit: z.coerce.number().int().min(1).max(1000).default(500), +}); + +export const registerDocsRoutes = async (app: FastifyInstance) => { + const docs = createDocsRepository(); + + app.get("/api/docs", async (request) => { + const query = listQuery.parse(request.query); + // Await explicitly so Fastify serializes a real array, not an + // unresolved Promise which would be `{}` in the response body. + const [items, total] = await Promise.all([docs.list(query.limit), docs.count()]); + return { items, total }; + }); + + app.get("/api/docs/search", async (request) => { + const query = searchQuery.parse(request.query); + const items = await docs.search(query.q, query.limit); + return { items }; + }); + + app.get("/api/docs/:id", async (request, reply) => { + const params = z.object({ id: z.string().min(1) }).parse(request.params); + const doc = docs.get(params.id); + + if (!doc) { + return reply.code(404).send({ error: "doc_not_found" }); + } + + return doc; + }); +}; diff --git a/apps/api/src/env.ts b/apps/api/src/env.ts new file mode 100644 index 0000000..3b12032 --- /dev/null +++ b/apps/api/src/env.ts @@ -0,0 +1,20 @@ +/** + * Small env helpers. + * + * `process.env.X ?? fallback` does NOT fall back on empty strings — only on + * undefined/null. That bites us when dotenv loads `KEY=` (blank value) from + * `.env`. Use `envString` / `envNumber` to get safe fallbacks. + */ + +export const envString = (value: string | undefined | null, fallback: string): string => { + if (value === undefined || value === null) return fallback; + return value.length > 0 ? value : fallback; +}; + +export const envNumber = (value: string | undefined | null, fallback: number): number => { + if (value === undefined || value === null) return fallback; + const trimmed = value.trim(); + if (trimmed.length === 0) return fallback; + const parsed = Number(trimmed); + return Number.isFinite(parsed) ? parsed : fallback; +}; diff --git a/apps/api/src/mcp/config.ts b/apps/api/src/mcp/config.ts new file mode 100644 index 0000000..8fe172c --- /dev/null +++ b/apps/api/src/mcp/config.ts @@ -0,0 +1,106 @@ +import { readFileSync } from "node:fs"; +import { resolve } from "node:path"; +import { parse } from "yaml"; +import { envString } from "../env.js"; + +export type McpToolParameterSchema = { + type: "object"; + required?: string[]; + properties?: Record; +}; + +export type McpToolDefinition = { + id: string; + name: string; + description: string; + server: string | null; + parameters: McpToolParameterSchema; + tags: string[]; + enabled: boolean; +}; + +export type McpServerDefinition = { + id: string; + name: string; + description: string; + endpoint: string; +}; + +export type PublicMcpToolDefinition = Omit & { + server: string | null; +}; + +export type PublicMcpServerDefinition = McpServerDefinition; + +type McpFile = { + mcp_servers?: McpServerDefinition[]; + mcp_tools?: McpToolDefinition[]; +}; + +const defaultPath = (): string => + envString(process.env.MCP_CONFIG_PATH, resolve(process.cwd(), "../../config/mcp.yml")); + +const isToolParameterSchema = (value: unknown): value is McpToolParameterSchema => { + if (!value || typeof value !== "object") return false; + const v = value as McpToolParameterSchema; + return v.type === "object"; +}; + +export const loadMcpTools = ( + configPath: string = defaultPath(), +): McpToolDefinition[] => { + let raw: string; + try { + raw = readFileSync(configPath, "utf8"); + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code === "ENOENT") return []; + throw error; + } + const parsed = parse(raw) as McpFile | null; + if (!parsed || !Array.isArray(parsed.mcp_tools)) return []; + return parsed.mcp_tools + .filter((tool) => tool && typeof tool === "object" && typeof tool.id === "string") + .map((tool) => ({ + id: String(tool.id).trim(), + name: String(tool.name ?? tool.id).trim(), + description: String(tool.description ?? "").trim(), + server: typeof tool.server === "string" ? tool.server : null, + parameters: isToolParameterSchema(tool.parameters) + ? tool.parameters + : ({ type: "object", properties: {}, required: [] } satisfies McpToolParameterSchema), + tags: Array.isArray(tool.tags) ? tool.tags.map(String) : [], + enabled: tool.enabled !== false, + })) + .filter((tool) => tool.id.length > 0); +}; + +export const loadMcpServers = ( + configPath: string = defaultPath(), +): McpServerDefinition[] => { + let raw: string; + try { + raw = readFileSync(configPath, "utf8"); + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code === "ENOENT") return []; + throw error; + } + const parsed = parse(raw) as McpFile | null; + if (!parsed || !Array.isArray(parsed.mcp_servers)) return []; + return parsed.mcp_servers + .filter((s) => s && typeof s === "object" && typeof s.id === "string") + .map((s) => ({ + id: String(s.id).trim(), + name: String(s.name ?? s.id).trim(), + description: String(s.description ?? "").trim(), + endpoint: String(s.endpoint ?? "").trim(), + })) + .filter((s) => s.id.length > 0); +}; + +export const enabledMcpTools = (tools: McpToolDefinition[] = loadMcpTools()): McpToolDefinition[] => + tools.filter((tool) => tool.enabled); + +export const toPublicMcpTool = (tool: McpToolDefinition): PublicMcpToolDefinition => ({ ...tool }); +export const toPublicMcpServer = (server: McpServerDefinition): PublicMcpServerDefinition => ({ ...server }); diff --git a/apps/api/src/mcp/routes.ts b/apps/api/src/mcp/routes.ts new file mode 100644 index 0000000..494bed1 --- /dev/null +++ b/apps/api/src/mcp/routes.ts @@ -0,0 +1,19 @@ +import type { FastifyInstance } from "fastify"; +import { + enabledMcpTools, + loadMcpServers, + toPublicMcpServer, + toPublicMcpTool, +} from "./config.js"; + +export const registerMcpRoutes = async (app: FastifyInstance) => { + app.get("/api/mcp/tools", async () => { + const items = enabledMcpTools().map(toPublicMcpTool); + return { items }; + }); + + app.get("/api/mcp/servers", async () => { + const items = loadMcpServers().map(toPublicMcpServer); + return { items }; + }); +}; diff --git a/apps/api/src/metrics.ts b/apps/api/src/metrics.ts new file mode 100644 index 0000000..fd49b86 --- /dev/null +++ b/apps/api/src/metrics.ts @@ -0,0 +1,178 @@ +// Tiny in-process metrics. Thread-safe enough for a 5-user MVP — counters +// and sums are only ever incremented under a single-threaded Node event +// loop, no atomic ops required. + +export type RouteMetric = { + route: string; + method: string; + status: number; + durationMs: number; + timestamp: number; +}; + +type Aggregate = { + count: number; + statusBuckets: Map; + sumMs: number; + maxMs: number; + p95Slots: number[]; // simple streaming reservoir for a coarse p95 +}; + +const RESERVOIR_SIZE = 200; + +const createAggregate = (): Aggregate => ({ + count: 0, + statusBuckets: new Map(), + sumMs: 0, + maxMs: 0, + p95Slots: [], +}); + +const metricsState = { + startedAt: Date.now(), + aggregates: new Map(), + // Last N events for the /metrics JSON inspector. Bounded to avoid leaks. + recent: [] as RouteMetric[], + recentLimit: 50, + errorCounts: new Map(), +}; + +const keyFor = (route: string, method: string) => `${method.toUpperCase()} ${route}`; + +export const observeHttp = (metric: RouteMetric) => { + const key = keyFor(metric.route, metric.method); + let agg = metricsState.aggregates.get(key); + if (!agg) { + agg = createAggregate(); + metricsState.aggregates.set(key, agg); + } + agg.count += 1; + agg.sumMs += metric.durationMs; + if (metric.durationMs > agg.maxMs) agg.maxMs = metric.durationMs; + const statusBucket = Math.floor(metric.status / 100) * 100; + agg.statusBuckets.set(statusBucket, (agg.statusBuckets.get(statusBucket) ?? 0) + 1); + + if (agg.p95Slots.length < RESERVOIR_SIZE) { + agg.p95Slots.push(metric.durationMs); + } else { + // Cheap replacement: evict the current max so the reservoir tracks the slowest N. + let maxIdx = 0; + for (let i = 1; i < agg.p95Slots.length; i++) { + if (agg.p95Slots[i] > agg.p95Slots[maxIdx]) maxIdx = i; + } + if (metric.durationMs < agg.p95Slots[maxIdx]) { + agg.p95Slots[maxIdx] = metric.durationMs; + } + } + + if (metric.status >= 500) { + metricsState.errorCounts.set(key, (metricsState.errorCounts.get(key) ?? 0) + 1); + } + + metricsState.recent.push(metric); + if (metricsState.recent.length > metricsState.recentLimit) { + metricsState.recent.splice(0, metricsState.recent.length - metricsState.recentLimit); + } +}; + +const percentile = (sorted: number[], p: number): number => { + if (sorted.length === 0) return 0; + const idx = Math.min(sorted.length - 1, Math.floor((p / 100) * sorted.length)); + return sorted[idx]; +}; + +export const snapshotMetrics = () => { + const routes: Array<{ + route: string; + method: string; + count: number; + avg_ms: number; + p95_ms: number; + max_ms: number; + status_buckets: Record; + }> = []; + + for (const [key, agg] of metricsState.aggregates.entries()) { + const [method, ...rest] = key.split(" "); + const route = rest.join(" "); + const sorted = [...agg.p95Slots].sort((a, b) => a - b); + routes.push({ + route, + method, + count: agg.count, + avg_ms: agg.count === 0 ? 0 : Math.round(agg.sumMs / agg.count), + p95_ms: Math.round(percentile(sorted, 95)), + max_ms: agg.maxMs, + status_buckets: Object.fromEntries( + [...agg.statusBuckets.entries()].map(([k, v]) => [`${k}_${k + 99}`, v]), + ), + }); + } + + return { + started_at: new Date(metricsState.startedAt).toISOString(), + uptime_seconds: Math.round((Date.now() - metricsState.startedAt) / 1000), + totals: { + requests: [...metricsState.aggregates.values()].reduce((sum, a) => sum + a.count, 0), + errors_5xx: [...metricsState.errorCounts.values()].reduce((sum, n) => sum + n, 0), + }, + routes: routes.sort((a, b) => b.count - a.count), + recent: [...metricsState.recent].reverse(), + }; +}; + +// Tiny Prometheus-style exposition. Stable enough for a scraper. +export const renderPrometheusText = (): string => { + const lines: string[] = []; + lines.push("# HELP sic_uptime_seconds Seconds since the API process started"); + lines.push("# TYPE sic_uptime_seconds gauge"); + lines.push(`sic_uptime_seconds ${Math.round((Date.now() - metricsState.startedAt) / 1000)}`); + lines.push(""); + + lines.push("# HELP sic_http_requests_total Total HTTP requests, labelled by route, method, status"); + lines.push("# TYPE sic_http_requests_total counter"); + for (const [key, agg] of metricsState.aggregates.entries()) { + const [method, ...rest] = key.split(" "); + const route = rest.join(" "); + for (const [bucket, count] of agg.statusBuckets.entries()) { + const statusClass = `${bucket}_${bucket + 99}`; + lines.push( + `sic_http_requests_total{route="${route}",method="${method}",status_class="${statusClass}"} ${count}`, + ); + } + } + lines.push(""); + + lines.push("# HELP sic_http_request_duration_ms Request duration in ms"); + lines.push("# TYPE sic_http_request_duration_ms summary"); + for (const [key, agg] of metricsState.aggregates.entries()) { + const [method, ...rest] = key.split(" "); + const route = rest.join(" "); + const sorted = [...agg.p95Slots].sort((a, b) => a - b); + const avg = agg.count === 0 ? 0 : Math.round(agg.sumMs / agg.count); + lines.push( + `sic_http_request_duration_ms{route="${route}",method="${method}",quantile="0.95"} ${percentile(sorted, 95)}`, + ); + lines.push( + `sic_http_request_duration_ms_sum{route="${route}",method="${method}"} ${agg.sumMs}`, + ); + lines.push( + `sic_http_request_duration_ms_count{route="${route}",method="${method}"} ${agg.count}`, + ); + lines.push( + `sic_http_request_duration_ms_max{route="${route}",method="${method}"} ${agg.maxMs}`, + ); + lines.push( + `sic_http_request_duration_ms_avg{route="${route}",method="${method}"} ${avg}`, + ); + } + + return `${lines.join("\n")}\n`; +}; + +export const __resetMetricsForTests = () => { + metricsState.startedAt = Date.now(); + metricsState.aggregates.clear(); + metricsState.recent.length = 0; + metricsState.errorCounts.clear(); +}; \ No newline at end of file diff --git a/apps/api/src/models/config.ts b/apps/api/src/models/config.ts new file mode 100644 index 0000000..0f615f5 --- /dev/null +++ b/apps/api/src/models/config.ts @@ -0,0 +1,105 @@ +import { readFileSync } from "node:fs"; +import { resolve } from "node:path"; +import { parse } from "yaml"; +import { envString } from "../env.js"; + +export type ModelDefinition = { + id: string; + label: string; + provider: "openai-compatible"; + base_url?: string; + api_key_env?: string; + model: string; + max_tokens?: number; + // Ordered list of model ids to try if this one fails (5xx, 429, network + // error, timeout). Each id must resolve to a known model; unknown ids are + // dropped at load time. Use ["mr-auto"] for a single fallback. The + // fallback chain for the chain itself is resolved at request time by + // `resolveFallbackChain`. + fallback?: string[]; +}; + +export type PublicModelDefinition = Pick; + +// Resolve the API key for a model. Order of precedence: +// 1. Per-model env var (model.api_key_env) — useful when different providers +// use different keys (e.g. local proxy, dedicated self-hosted). +// 2. Global LLM_API_KEY / MINIMAX_API_KEY fallbacks shared by all models. +// 3. Literal "dummy" so OpenAI-compatible servers that don't require auth +// (e.g. local OLLAMA, self-hosted reverse proxy) still work out of the box. +export const resolveModelApiKey = (model: ModelDefinition): string => { + const fromModel = model.api_key_env ? process.env[model.api_key_env] : undefined; + if (fromModel && fromModel.trim().length > 0) return fromModel.trim(); + return ( + envString(process.env.LLM_API_KEY, envString(process.env.MINIMAX_API_KEY, "dummy")) + ); +}; + +const expandEnv = (value: string | undefined) => + value?.replace(/\$\{([A-Z0-9_]+)\}/g, (_match, key: string) => process.env[key] ?? ""); + +export const loadModelDefinitions = (): ModelDefinition[] => { + const configPath = envString(process.env.MODELS_CONFIG_PATH, resolve(process.cwd(), "../../config/models.yml")); + const parsed = parse(readFileSync(configPath, "utf8")) as { models?: ModelDefinition[] } | null; + const known = new Set((parsed?.models ?? []).map((model) => String(model.id ?? "").trim())); + + return (parsed?.models ?? []).map((model) => { + const rawFallback = Array.isArray(model.fallback) ? model.fallback : []; + const fallback = rawFallback + .map((id) => String(id).trim()) + .filter((id) => id.length > 0 && known.has(id) && id !== model.id); + return { + ...model, + base_url: expandEnv(model.base_url), + fallback, + }; + }); +}; + +export const getDefaultModelId = () => envString(process.env.DEFAULT_MODEL, "fast"); + +export const findModelDefinition = (modelId: string) => + loadModelDefinitions().find((model) => model.id === modelId); + +/** + * Resolve the ordered fallback chain starting at `modelId`. Walks each model's + * `fallback` array until exhausted, dedupes by id, and stops if a cycle is + * detected. The starting model is always first. If the model is unknown the + * chain is just `[modelId]` (caller will surface model_not_found). + */ +export const resolveFallbackChain = (modelId: string): string[] => { + const all = loadModelDefinitions(); + const byId = new Map(all.map((model) => [model.id, model])); + const chain: string[] = []; + const seen = new Set(); + + // LLM_FALLBACK_CHAIN (comma-separated) overrides the YAML chain for the + // selected model. Empty / unset means "use the YAML chain". + const override = envString(process.env.LLM_FALLBACK_CHAIN, "") + .split(",") + .map((id) => id.trim()) + .filter((id) => id.length > 0); + + let cursor: string | undefined = modelId; + let nextCursor: string | undefined = override[0]; + while (cursor && !seen.has(cursor)) { + seen.add(cursor); + chain.push(cursor); + const model = byId.get(cursor); + if (nextCursor !== undefined) { + cursor = nextCursor; + nextCursor = undefined; + continue; + } + if (!model || !model.fallback || model.fallback.length === 0) break; + cursor = model.fallback[0]; + } + return chain; +}; + +export const toPublicModel = (model: ModelDefinition): PublicModelDefinition => ({ + id: model.id, + label: model.label, + provider: model.provider, + max_tokens: model.max_tokens, +}); diff --git a/apps/api/src/models/routes.ts b/apps/api/src/models/routes.ts new file mode 100644 index 0000000..5c7104f --- /dev/null +++ b/apps/api/src/models/routes.ts @@ -0,0 +1,14 @@ +import type { FastifyInstance } from "fastify"; +import { getDefaultModelId, loadModelDefinitions, toPublicModel } from "./config.js"; + +export const registerModelRoutes = async (app: FastifyInstance) => { + app.get("/api/models", async () => { + const defaultModelId = getDefaultModelId(); + const items = loadModelDefinitions().map(toPublicModel); + + return { + default_model: items.some((model) => model.id === defaultModelId) ? defaultModelId : items[0]?.id, + items, + }; + }); +}; diff --git a/apps/api/src/n8n/config.ts b/apps/api/src/n8n/config.ts new file mode 100644 index 0000000..b2eb44d --- /dev/null +++ b/apps/api/src/n8n/config.ts @@ -0,0 +1,58 @@ +import { readFileSync } from "node:fs"; +import { resolve } from "node:path"; +import { parse } from "yaml"; +import { envString } from "../env.js"; + +export type N8nWorkflowDefinition = { + id: string; + label: string; + description: string; + url: string; + required_roles: string[]; + tags: string[]; +}; + +export type PublicN8nWorkflowDefinition = N8nWorkflowDefinition; + +type N8nFile = { n8n_workflows?: N8nWorkflowDefinition[] }; + +const expandEnv = (value: string): string => + value.replace(/\$\{([A-Z0-9_]+)(?::\?[^}]+)?\}/g, (_match, name: string) => process.env[name] ?? ""); + +const defaultPath = (): string => + envString(process.env.N8N_CONFIG_PATH, resolve(process.cwd(), "../../config/n8n-workflows.yml")); + +export const loadN8nWorkflows = ( + configPath: string = defaultPath(), +): N8nWorkflowDefinition[] => { + let raw: string; + try { + raw = readFileSync(configPath, "utf8"); + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code === "ENOENT") return []; + throw error; + } + const parsed = parse(raw) as N8nFile | null; + if (!parsed || !Array.isArray(parsed.n8n_workflows)) return []; + return parsed.n8n_workflows + .filter((wf) => wf && typeof wf === "object" && typeof wf.id === "string") + .map((wf) => ({ + id: String(wf.id).trim(), + label: String(wf.label ?? wf.id).trim(), + description: String(wf.description ?? "").trim(), + url: expandEnv(String(wf.url ?? "").trim()), + required_roles: Array.isArray(wf.required_roles) + ? wf.required_roles.map(String) + : [], + tags: Array.isArray(wf.tags) ? wf.tags.map(String) : [], + })) + .filter((wf) => wf.id.length > 0); +}; + +export const canUseN8nWorkflow = (userRoles: string[], wf: N8nWorkflowDefinition): boolean => { + if (!Array.isArray(wf.required_roles) || wf.required_roles.length === 0) return true; + return wf.required_roles.every((role) => userRoles.includes(role)); +}; + +export const toPublicN8nWorkflow = (wf: N8nWorkflowDefinition): PublicN8nWorkflowDefinition => ({ ...wf }); diff --git a/apps/api/src/n8n/routes.ts b/apps/api/src/n8n/routes.ts new file mode 100644 index 0000000..86040cd --- /dev/null +++ b/apps/api/src/n8n/routes.ts @@ -0,0 +1,13 @@ +import type { FastifyInstance } from "fastify"; +import { canUseN8nWorkflow, loadN8nWorkflows, toPublicN8nWorkflow } from "./config.js"; +import { getAuthUser } from "../auth/index.js"; + +export const registerN8nRoutes = async (app: FastifyInstance) => { + app.get("/api/n8n-workflows", async (request) => { + const user = await getAuthUser(request); + const items = loadN8nWorkflows() + .filter((wf) => canUseN8nWorkflow(user.roles, wf)) + .map(toPublicN8nWorkflow); + return { items }; + }); +}; diff --git a/apps/api/src/rag/client.ts b/apps/api/src/rag/client.ts new file mode 100644 index 0000000..b2a8af5 --- /dev/null +++ b/apps/api/src/rag/client.ts @@ -0,0 +1,105 @@ +import type { KnowledgeDoc, KnowledgeSearchResult } from "../docs/repository.js"; +import type { RagConfig } from "./config.js"; + +export type RagSearchResponse = { + items: Array<{ + id: string; + title?: string; + source?: string; + tags?: string[]; + relevance?: number; + excerpt?: string; + content?: string; + }>; +}; + +export type RagGetResponse = Partial & { id: string }; + +const ensureTrailing = (url: string) => url.replace(/\/$/, ""); + +export const isRagRemote = (config: RagConfig): boolean => config.endpoint.trim().length > 0; + +const buildHeaders = (config: RagConfig): Record => { + const headers: Record = { + "content-type": "application/json", + accept: "application/json", + }; + if (config.authToken) { + headers.authorization = `Bearer ${config.authToken}`; + } + return headers; +}; + +export const searchViaRag = async ( + config: RagConfig, + query: string, + limit: number, +): Promise => { + const url = `${ensureTrailing(config.endpoint)}/search`; + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), config.timeoutMs); + try { + const response = await fetch(url, { + method: "POST", + headers: buildHeaders(config), + body: JSON.stringify({ + query, + limit, + min_relevance: config.minRelevance, + include_tags: config.includeTags, + exclude_tags: config.excludeTags, + }), + signal: controller.signal, + }); + if (!response.ok) { + throw new Error(`rag_search_failed:${response.status}`); + } + const data = (await response.json()) as RagSearchResponse; + if (!data || !Array.isArray(data.items)) return []; + return data.items.map((item) => ({ + id: String(item.id), + title: String(item.title ?? item.id), + source: String(item.source ?? ""), + tags: Array.isArray(item.tags) ? item.tags.map(String) : [], + relevance: Number(item.relevance ?? 0), + excerpt: String(item.excerpt ?? ""), + headings: [], + })); + } finally { + clearTimeout(timeout); + } +}; + +export const getViaRag = async ( + config: RagConfig, + id: string, +): Promise => { + const url = `${ensureTrailing(config.endpoint)}/docs/${encodeURIComponent(id)}`; + const controller = new AbortController(); + const timeout = setTimeout(() => controller.abort(), config.timeoutMs); + try { + const response = await fetch(url, { + method: "GET", + headers: buildHeaders(config), + signal: controller.signal, + }); + if (response.status === 404) return undefined; + if (!response.ok) { + throw new Error(`rag_get_failed:${response.status}`); + } + const data = (await response.json()) as RagGetResponse; + if (!data || !data.id) return undefined; + return { + id: String(data.id), + title: String(data.title ?? data.id), + source: String(data.source ?? ""), + tags: Array.isArray(data.tags) ? data.tags.map(String) : [], + owner: typeof data.owner === "string" ? data.owner : undefined, + updated: typeof data.updated === "string" ? data.updated : undefined, + headings: Array.isArray(data.headings) ? data.headings.map(String) : [], + content: String(data.content ?? ""), + }; + } finally { + clearTimeout(timeout); + } +}; diff --git a/apps/api/src/rag/config.ts b/apps/api/src/rag/config.ts new file mode 100644 index 0000000..f15f391 --- /dev/null +++ b/apps/api/src/rag/config.ts @@ -0,0 +1,80 @@ +import { readFileSync } from "node:fs"; +import { resolve } from "node:path"; +import { parse } from "yaml"; +import { envString } from "../env.js"; + +export type RagChunkStrategy = "heading" | "paragraph" | "fixed"; + +export type RagConfig = { + endpoint: string; + authToken: string; + timeoutMs: number; + fallbackToLocal: boolean; + chunkStrategy: RagChunkStrategy; + chunkSizeChars: number; + topK: number; + minRelevance: number; + includeTags: string[]; + excludeTags: string[]; +}; + +export type PublicRagConfig = Omit & { + // Never expose the auth token over the public API; show only whether + // one is configured. + hasAuthToken: boolean; +}; + +const defaultConfig = (): RagConfig => ({ + endpoint: "", + authToken: "", + timeoutMs: 10_000, + fallbackToLocal: true, + chunkStrategy: "heading", + chunkSizeChars: 1500, + topK: 5, + minRelevance: 0, + includeTags: [], + excludeTags: [], +}); + +const expandEnv = (value: string): string => + value.replace(/\$\{([A-Z0-9_]+):?\}/g, (_m, name: string) => process.env[name] ?? ""); + +const normalize = (raw: unknown): RagConfig => { + if (!raw || typeof raw !== "object") return defaultConfig(); + const r = raw as Partial; + const chunkStrategy: RagChunkStrategy = + r.chunkStrategy === "paragraph" || r.chunkStrategy === "fixed" ? r.chunkStrategy : "heading"; + return { + endpoint: expandEnv(String(r.endpoint ?? "").trim()), + authToken: expandEnv(String(r.authToken ?? "").trim()), + timeoutMs: Math.max(100, Number(r.timeoutMs ?? 10_000)), + fallbackToLocal: r.fallbackToLocal !== false, + chunkStrategy, + chunkSizeChars: Math.max(200, Number(r.chunkSizeChars ?? 1500)), + topK: Math.max(1, Number(r.topK ?? 5)), + minRelevance: Math.max(0, Math.min(1, Number(r.minRelevance ?? 0))), + includeTags: Array.isArray(r.includeTags) ? r.includeTags.map(String) : [], + excludeTags: Array.isArray(r.excludeTags) ? r.excludeTags.map(String) : [], + }; +}; + +const defaultPath = (): string => + envString(process.env.RAG_CONFIG_PATH, resolve(process.cwd(), "../../config/rag.yml")); + +export const loadRagConfig = (configPath: string = defaultPath()): RagConfig => { + try { + const raw = readFileSync(configPath, "utf8"); + const parsed = parse(raw) as { rag?: unknown } | null; + return normalize(parsed?.rag); + } catch (error) { + const code = (error as NodeJS.ErrnoException).code; + if (code === "ENOENT") return defaultConfig(); + throw error; + } +}; + +export const toPublicRagConfig = (config: RagConfig): PublicRagConfig => { + const { authToken: _auth, ...rest } = config; + return { ...rest, hasAuthToken: Boolean(config.authToken) }; +}; diff --git a/apps/api/src/rag/routes.ts b/apps/api/src/rag/routes.ts new file mode 100644 index 0000000..0075aee --- /dev/null +++ b/apps/api/src/rag/routes.ts @@ -0,0 +1,6 @@ +import type { FastifyInstance } from "fastify"; +import { loadRagConfig, toPublicRagConfig } from "./config.js"; + +export const registerRagRoutes = async (app: FastifyInstance) => { + app.get("/api/rag/config", async () => toPublicRagConfig(loadRagConfig())); +}; diff --git a/apps/api/src/rate-limit.ts b/apps/api/src/rate-limit.ts new file mode 100644 index 0000000..245cb75 --- /dev/null +++ b/apps/api/src/rate-limit.ts @@ -0,0 +1,88 @@ +// In-memory token bucket rate limiter keyed by an arbitrary id (user id). +// +// Token bucket semantics: +// - Capacity = `burst` (max tokens the bucket can hold). +// - Refill = `perMinute / 60` tokens per second (lazy: tokens are added +// on `consume` based on elapsed time since the bucket was +// last touched). +// - Each accepted call consumes exactly one token. Calls that find the +// bucket empty are rejected and the caller gets back a Retry-After hint +// in milliseconds. +// +// Stale entries: the map only grows by the number of distinct ids seen. +// For a 5-user MVP this is bounded; for larger installs the caller can call +// `pruneStale(maxIdleMs)` periodically. + +import { envNumber } from "./env.js"; + +export type RateLimiterOptions = { + perMinute: number; + burst: number; +}; + +export type ConsumeResult = + | { ok: true; remaining: number } + | { ok: false; retryAfterMs: number }; + +export type RateLimiter = { + consume(id: string, now?: number): ConsumeResult; + size: () => number; + reset: (id?: string) => void; +}; + +export const createRateLimiter = ( + options: RateLimiterOptions, +): RateLimiter => { + const { perMinute, burst } = options; + const refillPerMs = perMinute / 60_000; + const buckets = new Map(); + + const consume = (id: string, now: number = Date.now()): ConsumeResult => { + let bucket = buckets.get(id); + if (!bucket) { + bucket = { tokens: burst, lastRefillMs: now }; + buckets.set(id, bucket); + } else { + const elapsed = Math.max(0, now - bucket.lastRefillMs); + const refilled = elapsed * refillPerMs; + if (refilled > 0) { + bucket.tokens = Math.min(burst, bucket.tokens + refilled); + bucket.lastRefillMs = now; + } + } + + if (bucket.tokens >= 1) { + bucket.tokens -= 1; + return { ok: true, remaining: Math.floor(bucket.tokens) }; + } + + // Time until the bucket has at least 1 token. + const needed = 1 - bucket.tokens; + const retryAfterMs = refillPerMs > 0 ? Math.ceil(needed / refillPerMs) : 60_000; + return { ok: false, retryAfterMs }; + }; + + return { + consume, + size: () => buckets.size, + reset: (id) => { + if (id === undefined) buckets.clear(); + else buckets.delete(id); + }, + }; +}; + +export const chatRateLimiterFromEnv = (): RateLimiter => { + const perMinute = Math.max(1, envNumber(process.env.CHAT_RATE_LIMIT_PER_MINUTE, 20)); + const burst = Math.max(1, envNumber(process.env.CHAT_RATE_LIMIT_BURST, 5)); + return createRateLimiter({ perMinute, burst }); +}; + +export const webhookRateLimiterFromEnv = (): RateLimiter => { + // Defaults: 60/min refill, burst 10. Generous on purpose — the goal is + // to stop a runaway loop, not throttle a real operator. Tighten per + // webhook_id in the future if a specific hook becomes a hotspot. + const perMinute = Math.max(1, envNumber(process.env.WEBHOOK_RATE_LIMIT_PER_MINUTE, 60)); + const burst = Math.max(1, envNumber(process.env.WEBHOOK_RATE_LIMIT_BURST, 10)); + return createRateLimiter({ perMinute, burst }); +}; diff --git a/apps/api/src/server.ts b/apps/api/src/server.ts new file mode 100644 index 0000000..68c0036 --- /dev/null +++ b/apps/api/src/server.ts @@ -0,0 +1,170 @@ +import Fastify from "fastify"; +import cors from "@fastify/cors"; +import { ZodError } from "zod"; +import { config as loadDotenv } from "dotenv"; +import { dirname, resolve } from "node:path"; +import { fileURLToPath } from "node:url"; + +// Load `.env` from the repo root regardless of the cwd the process was +// started from. `import "dotenv/config"` would only look in `process.cwd()`, +// which silently breaks when the API is started from a sub-directory. +const __dirnameApi = dirname(fileURLToPath(import.meta.url)); +loadDotenv({ path: resolve(__dirnameApi, "../../../.env") }); + +import { registerAuthRoutes } from "./auth/routes.js"; +import { registerChatRoutes } from "./chat/routes.js"; +import { openDatabase } from "./db/database.js"; +import { migrate } from "./db/migrate.js"; +import { registerDocsRoutes } from "./docs/routes.js"; +import { envNumber } from "./env.js"; +import { registerMcpRoutes } from "./mcp/routes.js"; +import { registerModelRoutes } from "./models/routes.js"; +import { registerN8nRoutes } from "./n8n/routes.js"; +import { registerRagRoutes } from "./rag/routes.js"; +import { registerSessionRoutes } from "./sessions/routes.js"; +import { registerSkillRoutes } from "./skills/routes.js"; +import { runWebhookAuditPurge, webhookAuditPurgeConfigFromEnv } from "./webhooks/audit.js"; +import { registerWebhookRoutes } from "./webhooks/routes.js"; +import { observeHttp, renderPrometheusText, snapshotMetrics } from "./metrics.js"; + +const port = envNumber(process.env.API_PORT, 8787); +const bodyLimit = envNumber(process.env.API_BODY_LIMIT_BYTES, 1_048_576); + +const corsOrigin = () => { + const configured = process.env.CORS_ALLOWED_ORIGINS; + if (!configured) return true; + + const origins = configured + .split(",") + .map((origin) => origin.trim()) + .filter(Boolean); + + if (origins.includes("*")) return true; + return origins; +}; + +const app = Fastify({ + logger: true, + bodyLimit, +}); + +const db = openDatabase(); +migrate(db); + +await app.register(cors, { + origin: corsOrigin(), +}); + +// Observability: track every request by route + method + status with duration. +// The route template (e.g. "/api/sessions/:id") is preferred over the raw URL +// so `/api/sessions/abc` and `/api/sessions/def` aggregate into the same bucket. +app.addHook("onResponse", async (request, reply) => { + const route = request.routeOptions?.url ?? request.url ?? "unknown"; + observeHttp({ + route, + method: request.method, + status: reply.statusCode, + durationMs: Math.round(performance.now() - (request as { sic_startedAt?: number }).sic_startedAt!), + timestamp: Date.now(), + }); +}); + +app.addHook("onRequest", async (request) => { + (request as { sic_startedAt?: number }).sic_startedAt = performance.now(); +}); + +app.addHook("onSend", async (_request, reply, payload) => { + reply.header("x-content-type-options", "nosniff"); + reply.header("referrer-policy", "no-referrer"); + reply.header("x-frame-options", "DENY"); + + return payload; +}); + +app.setErrorHandler((error, _request, reply) => { + const message = error instanceof Error ? error.message : String(error); + + if (error instanceof ZodError) { + return reply.code(400).send({ + error: "validation_error", + issues: error.issues, + }); + } + + if ( + message.startsWith("auth_") || + message.startsWith("JWT") || + message.startsWith("JWKS") + ) { + return reply.code(401).send({ error: "unauthorized" }); + } + + app.log.error(error); + return reply.code(500).send({ error: "internal_error" }); +}); + +app.get("/healthz", async () => ({ status: "ok" })); +app.get("/readyz", async () => { + db.prepare("SELECT 1").get(); + return { status: "ready" }; +}); + +app.get("/api/version", async () => ({ + name: "pi-chat-api", + version: "0.1.0", +})); + +// Observability surface. `/metrics` returns Prometheus text (scraper-friendly); +// `/api/metrics` returns the same data as JSON for humans and the smoke test. +app.get("/metrics", async (_request, reply) => { + reply.header("content-type", "text/plain; version=0.0.4; charset=utf-8"); + return renderPrometheusText(); +}); + +app.get("/api/metrics", async () => snapshotMetrics()); + +await registerSessionRoutes(app, db); +await registerAuthRoutes(app); +await registerDocsRoutes(app); +await registerModelRoutes(app); +await registerRagRoutes(app); +await registerSkillRoutes(app); +await registerN8nRoutes(app); +await registerMcpRoutes(app); +await registerWebhookRoutes(app, db); +await registerChatRoutes(app, db); + +// Audit retention: run once on boot, then on a timer. Cheap, idempotent. +const auditPurgeConfig = webhookAuditPurgeConfigFromEnv(); +const initialPurge = runWebhookAuditPurge(db, auditPurgeConfig); +if (initialPurge.deletedByAge > 0 || initialPurge.deletedByCap > 0) { + app.log.info( + { ...initialPurge, config: auditPurgeConfig }, + "webhook audit purge (boot)", + ); +} + +const auditPurgeIntervalMs = Math.max(60_000, envNumber(process.env.WEBHOOK_AUDIT_PURGE_INTERVAL_MS, 3_600_000)); +const auditPurgeTimer = setInterval(() => { + try { + const report = runWebhookAuditPurge(db, auditPurgeConfig); + if (report.deletedByAge > 0 || report.deletedByCap > 0) { + app.log.info({ ...report, config: auditPurgeConfig }, "webhook audit purge (timer)"); + } + } catch (error) { + app.log.error({ err: error }, "webhook audit purge failed"); + } +}, auditPurgeIntervalMs); +auditPurgeTimer.unref?.(); + +const shutdown = async () => { + app.log.info("shutdown requested"); + clearInterval(auditPurgeTimer); + await app.close(); + db.close(); +}; + +process.on("SIGINT", shutdown); +process.on("SIGTERM", shutdown); + +await app.listen({ port, host: "0.0.0.0" }); diff --git a/apps/api/src/sessions/repository.ts b/apps/api/src/sessions/repository.ts new file mode 100644 index 0000000..98d7c12 --- /dev/null +++ b/apps/api/src/sessions/repository.ts @@ -0,0 +1,140 @@ +import { randomUUID } from "node:crypto"; +import type { AppDatabase } from "../db/database.js"; + +export type ChatSessionRecord = { + id: string; + user_id: string; + title: string | null; + system_prompt: string | null; + created_at: string; + updated_at: string; +}; + +export type ChatMessageRecord = { + id: string; + session_id: string; + user_id: string; + role: "user" | "assistant" | "system" | "tool"; + content: string; + metadata: string | null; + created_at: string; +}; + +export function createSessionRepository(db: AppDatabase) { + return { + list(userId: string): ChatSessionRecord[] { + return db + .prepare("SELECT * FROM chat_sessions WHERE user_id = ? ORDER BY updated_at DESC") + .all(userId) as ChatSessionRecord[]; + }, + + create(userId: string, title: string | null): ChatSessionRecord { + const now = new Date().toISOString(); + const session: ChatSessionRecord = { + id: randomUUID(), + user_id: userId, + title, + system_prompt: null, + created_at: now, + updated_at: now, + }; + + db.prepare( + "INSERT INTO chat_sessions (id, user_id, title, created_at, updated_at) VALUES (?, ?, ?, ?, ?)", + ).run(session.id, session.user_id, session.title, session.created_at, session.updated_at); + + return session; + }, + + get(userId: string, sessionId: string): ChatSessionRecord | null { + return ( + (db.prepare("SELECT * FROM chat_sessions WHERE id = ? AND user_id = ?").get(sessionId, userId) as + | ChatSessionRecord + | undefined) ?? null + ); + }, + + delete(userId: string, sessionId: string): boolean { + const result = db.prepare("DELETE FROM chat_sessions WHERE id = ? AND user_id = ?").run(sessionId, userId); + return result.changes > 0; + }, + + deleteAllForUser(userId: string): number { + const result = db + .prepare("DELETE FROM chat_sessions WHERE user_id = ?") + .run(userId); + return Number(result.changes ?? 0); + }, + + touch(userId: string, sessionId: string): void { + db.prepare("UPDATE chat_sessions SET updated_at = ? WHERE id = ? AND user_id = ?").run( + new Date().toISOString(), + sessionId, + userId, + ); + }, + + updateTitle(userId: string, sessionId: string, title: string): void { + db.prepare("UPDATE chat_sessions SET title = ?, updated_at = ? WHERE id = ? AND user_id = ?").run( + title, + new Date().toISOString(), + sessionId, + userId, + ); + }, + + updateSystemPrompt(userId: string, sessionId: string, prompt: string | null): boolean { + const normalized = prompt && prompt.trim().length > 0 ? prompt.trim() : null; + const result = db + .prepare( + "UPDATE chat_sessions SET system_prompt = ?, updated_at = ? WHERE id = ? AND user_id = ?", + ) + .run(normalized, new Date().toISOString(), sessionId, userId); + return result.changes > 0; + }, + }; +} + +export function createMessageRepository(db: AppDatabase) { + return { + listForSession(userId: string, sessionId: string): ChatMessageRecord[] { + return db + .prepare("SELECT * FROM chat_messages WHERE session_id = ? AND user_id = ? ORDER BY created_at ASC") + .all(sessionId, userId) as ChatMessageRecord[]; + }, + + create(input: { + sessionId: string; + userId: string; + role: ChatMessageRecord["role"]; + content: string; + metadata?: unknown; + }): ChatMessageRecord { + const message: ChatMessageRecord = { + id: randomUUID(), + session_id: input.sessionId, + user_id: input.userId, + role: input.role, + content: input.content, + metadata: input.metadata ? JSON.stringify(input.metadata) : null, + created_at: new Date().toISOString(), + }; + + db.prepare( + `INSERT INTO chat_messages + (id, session_id, user_id, role, content, metadata, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?)`, + ).run( + message.id, + message.session_id, + message.user_id, + message.role, + message.content, + message.metadata, + message.created_at, + ); + + return message; + }, + }; +} diff --git a/apps/api/src/sessions/routes.ts b/apps/api/src/sessions/routes.ts new file mode 100644 index 0000000..35413af --- /dev/null +++ b/apps/api/src/sessions/routes.ts @@ -0,0 +1,172 @@ +import type { FastifyInstance } from "fastify"; +import { z } from "zod"; +import { getAuthUser } from "../auth/index.js"; +import type { AppDatabase } from "../db/database.js"; +import { createMessageRepository, createSessionRepository } from "./repository.js"; + +const createSessionBody = z.object({ + title: z.string().min(1).max(120).optional(), +}); + +const updateSessionBody = z.object({ + title: z.string().trim().min(1).max(120), +}); + +const updateSystemPromptBody = z.object({ + // Empty / whitespace-only strings clear the override; null is a no-op. + system_prompt: z.string().max(8_000).nullable().optional(), +}); + +export async function registerSessionRoutes(app: FastifyInstance, db: AppDatabase) { + const sessions = createSessionRepository(db); + const messages = createMessageRepository(db); + + app.get("/api/sessions", async (request) => { + const user = await getAuthUser(request); + return { items: sessions.list(user.id) }; + }); + + app.post("/api/sessions", async (request, reply) => { + const user = await getAuthUser(request); + const body = createSessionBody.parse(request.body ?? {}); + const session = sessions.create(user.id, body.title ?? null); + return reply.code(201).send(session); + }); + + app.get<{ Params: { id: string } }>("/api/sessions/:id", async (request, reply) => { + const user = await getAuthUser(request); + const session = sessions.get(user.id, request.params.id); + + if (!session) { + return reply.code(404).send({ error: "session_not_found" }); + } + + return { + ...session, + messages: messages.listForSession(user.id, session.id), + }; + }); + + app.patch<{ Params: { id: string } }>("/api/sessions/:id", async (request, reply) => { + const user = await getAuthUser(request); + const body = updateSessionBody.parse(request.body); + const session = sessions.get(user.id, request.params.id); + + if (!session) { + return reply.code(404).send({ error: "session_not_found" }); + } + + sessions.updateTitle(user.id, session.id, body.title); + return sessions.get(user.id, session.id); + }); + + // Per-session system prompt override. Inserted into the chat stream + // immediately after the base identity prompt, before the docs/actions + // context. Use to attach incident-specific context (runbook link, on-call + // names, severity matrix) without polluting the global prompt. + app.patch<{ Params: { id: string } }>( + "/api/sessions/:id/system-prompt", + async (request, reply) => { + const user = await getAuthUser(request); + const body = updateSystemPromptBody.parse(request.body ?? {}); + const session = sessions.get(user.id, request.params.id); + + if (!session) { + return reply.code(404).send({ error: "session_not_found" }); + } + + sessions.updateSystemPrompt(user.id, session.id, body.system_prompt ?? null); + return sessions.get(user.id, session.id); + }, + ); + + app.delete<{ Params: { id: string } }>("/api/sessions/:id", async (request, reply) => { + const user = await getAuthUser(request); + const deleted = sessions.delete(user.id, request.params.id); + + if (!deleted) { + return reply.code(404).send({ error: "session_not_found" }); + } + + return reply.code(204).send(); + }); + + // Bulk delete: wipes every session owned by the current user. Cascade + // removes the messages and webhook_runs that point at them. The frontend + // requires the user to type the literal word "delete" before this fires. + app.delete("/api/sessions", async (request, reply) => { + const user = await getAuthUser(request); + const removed = sessions.deleteAllForUser(user.id); + return reply.code(200).send({ deleted: removed }); + }); + + // Export: returns a JSON document with the session metadata and all its + // messages. The shape is a stable contract so a `POST /api/sessions/import` + // can read it back. webhook_runs are intentionally excluded from the + // export — those are operational audit data, not conversation content. + app.get<{ Params: { id: string } }>("/api/sessions/:id/export", async (request, reply) => { + const user = await getAuthUser(request); + const session = sessions.get(user.id, request.params.id); + + if (!session) { + return reply.code(404).send({ error: "session_not_found" }); + } + + return { + version: 1, + exported_at: new Date().toISOString(), + session: { + id: session.id, + title: session.title, + created_at: session.created_at, + updated_at: session.updated_at, + }, + messages: messages.listForSession(user.id, session.id), + }; + }); + + // Import: accepts the export document above, creates a new session owned + // by the caller, and writes the messages with fresh ids. Returns the new + // session id and a count of imported messages. + const importSessionBody = z.object({ + session: z.object({ + title: z.string().max(120).nullable().optional(), + created_at: z.string().optional(), + updated_at: z.string().optional(), + }), + messages: z.array( + z.object({ + role: z.enum(["user", "assistant", "system"]), + content: z.string().min(1).max(50_000), + metadata: z.record(z.unknown()).optional(), + // Original created_at is preserved if present; otherwise "now" is + // used. Used only to restore the timeline. + created_at: z.string().optional(), + }), + ), + }); + + app.post("/api/sessions/import", async (request, reply) => { + const user = await getAuthUser(request); + const body = importSessionBody.parse(request.body); + + const newSession = sessions.create(user.id, body.session.title ?? null); + let imported = 0; + for (const message of body.messages) { + messages.create({ + sessionId: newSession.id, + userId: user.id, + role: message.role, + content: message.content, + metadata: message.metadata, + }); + imported += 1; + } + sessions.touch(user.id, newSession.id); + + return reply.code(201).send({ + session: newSession, + imported_messages: imported, + }); + }); +} diff --git a/apps/api/src/skills/config.ts b/apps/api/src/skills/config.ts new file mode 100644 index 0000000..a2c9edb --- /dev/null +++ b/apps/api/src/skills/config.ts @@ -0,0 +1,57 @@ +import { readFileSync } from "node:fs"; +import { resolve } from "node:path"; +import { parse } from "yaml"; +import { envString } from "../env.js"; + +export type SkillDefinition = { + id: string; + name: string; + description: string; + enabled: boolean; + prompt: string; +}; + +export type PublicSkillDefinition = Omit; + +type SkillsFile = { skills?: SkillDefinition[] }; + +const defaultPath = (): string => { + // When the API is started from apps/api, the config dir is at ../../config. + // The env var wins so tests / docker setups can override. + return envString(process.env.SKILLS_CONFIG_PATH, resolve(process.cwd(), "../../config/skills.yml")); +}; + +export const loadSkillDefinitions = ( + configPath: string = defaultPath(), +): SkillDefinition[] => { + try { + const raw = readFileSync(configPath, "utf8"); + const parsed = parse(raw) as SkillsFile; + if (!parsed || !Array.isArray(parsed.skills)) return []; + return parsed.skills + .filter((skill) => skill && typeof skill === "object") + .map((skill) => ({ + id: String(skill.id ?? "").trim(), + name: String(skill.name ?? "").trim(), + description: String(skill.description ?? "").trim(), + enabled: Boolean(skill.enabled), + prompt: String(skill.prompt ?? "").trim(), + })) + .filter((skill) => skill.id.length > 0); + } catch (error) { + // Config is optional: missing file is fine, malformed file should surface. + const code = (error as NodeJS.ErrnoException).code; + if (code === "ENOENT") return []; + throw error; + } +}; + +export const getEnabledSkillPrompts = (skills: SkillDefinition[] = loadSkillDefinitions()): string[] => + skills.filter((skill) => skill.enabled && skill.prompt.length > 0).map((skill) => skill.prompt); + +export const toPublicSkill = (skill: SkillDefinition): PublicSkillDefinition => ({ + id: skill.id, + name: skill.name, + description: skill.description, + enabled: skill.enabled, +}); diff --git a/apps/api/src/skills/routes.ts b/apps/api/src/skills/routes.ts new file mode 100644 index 0000000..d75c618 --- /dev/null +++ b/apps/api/src/skills/routes.ts @@ -0,0 +1,9 @@ +import type { FastifyInstance } from "fastify"; +import { loadSkillDefinitions, toPublicSkill } from "./config.js"; + +export const registerSkillRoutes = async (app: FastifyInstance) => { + app.get("/api/skills", async () => { + const items = loadSkillDefinitions().map(toPublicSkill); + return { items }; + }); +}; diff --git a/apps/api/src/webhooks/audit.ts b/apps/api/src/webhooks/audit.ts new file mode 100644 index 0000000..7b961c9 --- /dev/null +++ b/apps/api/src/webhooks/audit.ts @@ -0,0 +1,162 @@ +import { randomUUID } from "node:crypto"; +import type { AppDatabase } from "../db/database.js"; +import { envNumber } from "../env.js"; + +export type WebhookRunStatus = "success" | "error"; + +export type WebhookRunRecord = { + id: string; + webhook_id: string; + user_id: string; + session_id: string; + status: WebhookRunStatus; + request_payload: string | null; + response_status: number | null; + attempts: number; + created_at: string; +}; + +export const createWebhookAuditRepository = (db: AppDatabase) => ({ + create(input: { + webhookId: string; + userId: string; + sessionId: string; + status: WebhookRunStatus; + requestPayload?: unknown; + responseStatus?: number | null; + attempts?: number; + createdAt?: string; + }) { + const run = { + id: randomUUID(), + webhook_id: input.webhookId, + user_id: input.userId, + session_id: input.sessionId, + status: input.status, + request_payload: input.requestPayload ? JSON.stringify(input.requestPayload) : null, + response_status: input.responseStatus ?? null, + attempts: input.attempts ?? 1, + created_at: input.createdAt ?? new Date().toISOString(), + }; + + db.prepare( + `INSERT INTO webhook_runs (id, webhook_id, user_id, session_id, status, request_payload, response_status, attempts, created_at) + VALUES (@id, @webhook_id, @user_id, @session_id, @status, @request_payload, @response_status, @attempts, @created_at)`, + ).run(run); + + return run; + }, + + listForSession(userId: string, sessionId: string, limit = 20) { + return db + .prepare( + `SELECT id, webhook_id, user_id, session_id, status, request_payload, response_status, attempts, created_at + FROM webhook_runs + WHERE session_id = ? AND user_id = ? + ORDER BY created_at DESC + LIMIT ?`, + ) + .all(sessionId, userId, limit) as WebhookRunRecord[]; + }, + + purgeOlderThan(isoCutoff: string): number { + const result = db + .prepare(`DELETE FROM webhook_runs WHERE created_at < ?`) + .run(isoCutoff); + return Number(result.changes ?? 0); + }, + + enforcePerUserCap(maxPerUser: number): number { + if (maxPerUser <= 0) return 0; + // Uses SQLite window function (3.25+) to keep the most recent N rows + // per user and delete the rest. user_id is always present in the table. + const result = db + .prepare( + `DELETE FROM webhook_runs + WHERE rowid IN ( + SELECT rowid FROM ( + SELECT rowid, + ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY created_at DESC, rowid DESC) AS rn + FROM webhook_runs + ) + WHERE rn > ? + )`, + ) + .run(maxPerUser); + return Number(result.changes ?? 0); + }, + + /** + * Per-webhook usage stats for a single user over a recent time window. + * `isoSince` should be a UTC ISO string (e.g. now - 7 days). + * Returns a map of webhook_id -> { runs, successes, successRate }. + */ + usageForUserSince(isoSince: string, userId: string): Record< + string, + { runs: number; successes: number; successRate: number } + > { + const rows = db + .prepare( + `SELECT webhook_id, + COUNT(*) AS runs, + SUM(CASE WHEN status = 'success' THEN 1 ELSE 0 END) AS successes + FROM webhook_runs + WHERE user_id = ? AND created_at >= ? + GROUP BY webhook_id`, + ) + .all(userId, isoSince) as Array<{ + webhook_id: string; + runs: number; + successes: number | null; + }>; + + const out: Record = {}; + for (const row of rows) { + const runs = Number(row.runs ?? 0); + const successes = Number(row.successes ?? 0); + out[row.webhook_id] = { + runs, + successes, + successRate: runs > 0 ? successes / runs : 0, + }; + } + return out; + }, +}); + +export type WebhookAuditPurgeConfig = { + retentionDays: number; + maxPerUser: number; +}; + +export const webhookAuditPurgeConfigFromEnv = (): WebhookAuditPurgeConfig => { + const retentionDays = Math.max(0, envNumber(process.env.WEBHOOK_RUNS_RETENTION_DAYS, 30)); + const maxPerUser = Math.max(0, envNumber(process.env.WEBHOOK_RUNS_MAX_PER_USER, 1000)); + return { retentionDays, maxPerUser }; +}; + +export type WebhookAuditPurgeReport = { + deletedByAge: number; + deletedByCap: number; + cutoff: string | null; +}; + +/** + * Run both purge passes (age + per-user cap) against the audit table. + * Returns a small report for logging / health endpoints. Safe to call on + * every boot and on a timer. + */ +export const runWebhookAuditPurge = ( + db: AppDatabase, + config: WebhookAuditPurgeConfig = webhookAuditPurgeConfigFromEnv(), +): WebhookAuditPurgeReport => { + const audit = createWebhookAuditRepository(db); + let deletedByAge = 0; + let cutoff: string | null = null; + if (config.retentionDays > 0) { + cutoff = new Date(Date.now() - config.retentionDays * 86_400_000).toISOString(); + deletedByAge = audit.purgeOlderThan(cutoff); + } + const deletedByCap = config.maxPerUser > 0 ? audit.enforcePerUserCap(config.maxPerUser) : 0; + return { deletedByAge, deletedByCap, cutoff }; +}; diff --git a/apps/api/src/webhooks/config.ts b/apps/api/src/webhooks/config.ts new file mode 100644 index 0000000..b3d389e --- /dev/null +++ b/apps/api/src/webhooks/config.ts @@ -0,0 +1,53 @@ +import { readFileSync } from "node:fs"; +import { resolve } from "node:path"; +import YAML from "yaml"; +import { envString } from "../env.js"; + +export type WebhookMethod = "GET" | "POST" | "PUT" | "PATCH" | "DELETE"; + +export type WebhookDefinition = { + id: string; + label: string; + description?: string; + method: WebhookMethod; + url: string; + required_roles: string[]; + confirmation_required: boolean; + payload_template?: unknown; +}; + +export type PublicWebhookDefinition = Omit; + +const defaultConfigPath = () => resolve(process.cwd(), "../../config/webhooks.yml"); + +const expandEnv = (value: string) => + value.replace(/\$\{([A-Z0-9_]+)\}/g, (_match, name: string) => process.env[name] ?? ""); + +export const loadWebhookDefinitions = ( + configPath = process.env.WEBHOOKS_CONFIG_PATH && process.env.WEBHOOKS_CONFIG_PATH.length > 0 + ? process.env.WEBHOOKS_CONFIG_PATH + : defaultConfigPath(), +): WebhookDefinition[] => { + const raw = readFileSync(configPath, "utf8"); + const parsed = YAML.parse(raw) as { webhooks?: WebhookDefinition[] } | undefined; + + return (parsed?.webhooks ?? []).map((webhook) => ({ + ...webhook, + method: webhook.method.toUpperCase() as WebhookMethod, + url: expandEnv(webhook.url), + required_roles: webhook.required_roles ?? [], + confirmation_required: webhook.confirmation_required ?? true, + })); +}; + +export const canUseWebhook = (userRoles: string[], webhook: WebhookDefinition) => + webhook.required_roles.every((role) => userRoles.includes(role)); + +export const toPublicWebhook = (webhook: WebhookDefinition): PublicWebhookDefinition => ({ + id: webhook.id, + label: webhook.label, + description: webhook.description, + method: webhook.method, + required_roles: webhook.required_roles, + confirmation_required: webhook.confirmation_required, +}); diff --git a/apps/api/src/webhooks/routes.ts b/apps/api/src/webhooks/routes.ts new file mode 100644 index 0000000..7588fe8 --- /dev/null +++ b/apps/api/src/webhooks/routes.ts @@ -0,0 +1,288 @@ +import type { AuthUser } from "@pi-chat/shared"; +import type { FastifyInstance } from "fastify"; +import { z } from "zod"; +import type { AppDatabase } from "../db/database.js"; +import { getAuthUser } from "../auth/index.js"; +import { webhookRateLimiterFromEnv } from "../rate-limit.js"; +import { createSessionRepository } from "../sessions/repository.js"; +import { createWebhookAuditRepository } from "./audit.js"; +import { canUseWebhook, loadWebhookDefinitions, toPublicWebhook } from "./config.js"; +import { envNumber } from "../env.js"; + +const runWebhookBody = z.object({ + sessionId: z.string().min(1), + confirmed: z.literal(true), + lastUserMessage: z.string().max(envNumber(process.env.CHAT_MESSAGE_MAX_CHARS, 8_000)).default(""), + payload: z.record(z.unknown()).default({}), +}); + +const webhookRunsQuery = z.object({ + sessionId: z.string().min(1), + limit: z.coerce.number().int().min(1).max(50).default(20), +}); + +const renderTemplate = (template: unknown, context: Record): unknown => { + if (typeof template === "string") { + return template.replace(/\{\{([a-zA-Z0-9_.]+)\}\}/g, (_match, path: string) => { + const value = path.split(".").reduce((current, key) => { + if (current && typeof current === "object" && key in current) { + return (current as Record)[key]; + } + return ""; + }, context); + + return value == null ? "" : String(value); + }); + } + + if (Array.isArray(template)) { + return template.map((item) => renderTemplate(item, context)); + } + + if (template && typeof template === "object") { + return Object.fromEntries( + Object.entries(template).map(([key, value]) => [key, renderTemplate(value, context)]), + ); + } + + return template; +}; + +const buildPayload = (template: unknown, input: z.infer, user: AuthUser) => { + const templated = renderTemplate(template ?? {}, { + user, + session: { id: input.sessionId }, + chat: { last_user_message: input.lastUserMessage }, + }); + + return { + ...(templated && typeof templated === "object" && !Array.isArray(templated) ? templated : {}), + ...input.payload, + }; +}; + +const fetchWithTimeout = async (url: string, init: RequestInit, timeoutMs: number) => { + const abortController = new AbortController(); + const timeout = setTimeout(() => abortController.abort(), timeoutMs); + + return fetch(url, { ...init, signal: abortController.signal }).finally(() => clearTimeout(timeout)); +}; + +const sleep = (ms: number) => + new Promise((resolve) => { + setTimeout(resolve, ms); + }); + +const isRetryableStatus = (status: number) => status >= 500 || status === 429; + +type RetryPolicy = { + maxAttempts: number; + initialBackoffMs: number; + maxBackoffMs: number; + timeoutMs: number; +}; + +const retryPolicyFromEnv = (): RetryPolicy => { + const maxAttempts = Math.max(1, envNumber(process.env.WEBHOOK_RETRY_MAX_ATTEMPTS, 3)); + const initialBackoffMs = Math.max(0, envNumber(process.env.WEBHOOK_RETRY_INITIAL_BACKOFF_MS, 500)); + const maxBackoffMs = Math.max(initialBackoffMs, envNumber(process.env.WEBHOOK_RETRY_MAX_BACKOFF_MS, 5_000)); + const timeoutMs = Math.max(1, envNumber(process.env.WEBHOOK_TIMEOUT_MS, 15_000)); + + return { maxAttempts, initialBackoffMs, maxBackoffMs, timeoutMs }; +}; + +type RunOutcome = { + response: Response | null; + attempts: number; + lastError: unknown; +}; + +const runWithRetry = async (url: string, init: RequestInit, policy: RetryPolicy): Promise => { + let lastError: unknown = null; + let response: Response | null = null; + + for (let attempt = 1; attempt <= policy.maxAttempts; attempt++) { + try { + response = await fetchWithTimeout(url, init, policy.timeoutMs); + if (response.ok) { + return { response, attempts: attempt, lastError: null }; + } + if (!isRetryableStatus(response.status)) { + // 4xx (non-429): don't retry, surface as-is. + return { response, attempts: attempt, lastError: null }; + } + } catch (error) { + lastError = error; + response = null; + } + + if (attempt < policy.maxAttempts) { + const backoff = Math.min( + policy.maxBackoffMs, + policy.initialBackoffMs * 2 ** (attempt - 1), + ); + await sleep(backoff); + } + } + + return { response, attempts: policy.maxAttempts, lastError }; +}; + +export const registerWebhookRoutes = async (app: FastifyInstance, db: AppDatabase) => { + const sessions = createSessionRepository(db); + const audit = createWebhookAuditRepository(db); + const webhookRateLimiter = webhookRateLimiterFromEnv(); + + app.get("/api/webhooks", async (request) => { + const user = await getAuthUser(request); + const items = loadWebhookDefinitions() + .filter((webhook) => canUseWebhook(user.roles, webhook)) + .map(toPublicWebhook); + + return { items }; + }); + + app.get("/api/webhooks/usage", async (request) => { + const user = await getAuthUser(request); + const query = z + .object({ days: z.coerce.number().int().min(1).max(365).default(7) }) + .parse(request.query ?? {}); + const since = new Date(Date.now() - query.days * 86_400_000).toISOString(); + const usage = audit.usageForUserSince(since, user.id); + const items = Object.entries(usage).map(([webhook_id, stats]) => ({ + webhook_id, + runs: stats.runs, + successes: stats.successes, + success_rate: stats.successRate, + window_days: query.days, + })); + return { window_days: query.days, items }; + }); + + app.get("/api/webhook-runs", async (request, reply) => { + const user = await getAuthUser(request); + const query = webhookRunsQuery.parse(request.query); + const session = sessions.get(user.id, query.sessionId); + + if (!session) { + return reply.code(404).send({ error: "session_not_found" }); + } + + const items = audit.listForSession(user.id, query.sessionId, query.limit).map((run) => ({ + id: run.id, + webhook_id: run.webhook_id, + session_id: run.session_id, + status: run.status, + response_status: run.response_status, + attempts: run.attempts, + created_at: run.created_at, + })); + + return { items }; + }); + + app.post("/api/webhooks/:id/run", async (request, reply) => { + const user = await getAuthUser(request); + const params = z.object({ id: z.string().min(1) }).parse(request.params); + const body = runWebhookBody.parse(request.body); + const webhook = loadWebhookDefinitions().find((item) => item.id === params.id); + + if (!webhook || !canUseWebhook(user.roles, webhook)) { + return reply.code(404).send({ error: "webhook_not_found" }); + } + + // Per-webhook abuse detection: each webhook_id has its own bucket so a + // runaway loop on one hook doesn't starve the rest. The bucket is shared + // across all users on purpose — that's the abuse signal. + const decision = webhookRateLimiter.consume(webhook.id); + if (!decision.ok) { + const retryAfterSec = Math.max(1, Math.ceil(decision.retryAfterMs / 1000)); + app.log.warn( + { webhook: webhook.id, user: user.id, retryAfterSec }, + "webhook rate limit exceeded", + ); + return reply + .code(429) + .header("retry-after", String(retryAfterSec)) + .header("x-ratelimit-remaining", "0") + .send({ + error: "rate_limited", + retry_after_ms: decision.retryAfterMs, + }); + } + reply.header("x-ratelimit-remaining", String(decision.remaining)); + + const session = sessions.get(user.id, body.sessionId); + if (!session) { + return reply.code(404).send({ error: "session_not_found" }); + } + + const requestPayload = buildPayload(webhook.payload_template, body, user); + + if (!webhook.url) { + audit.create({ + webhookId: webhook.id, + userId: user.id, + sessionId: body.sessionId, + status: "error", + requestPayload, + attempts: 0, + }); + + return reply.code(500).send({ error: "webhook_not_configured" }); + } + + const policy = retryPolicyFromEnv(); + const outcome = await runWithRetry( + webhook.url, + { + method: webhook.method, + headers: { "content-type": "application/json" }, + body: webhook.method === "GET" ? undefined : JSON.stringify(requestPayload), + }, + policy, + ); + + if (outcome.lastError) { + app.log.error({ err: outcome.lastError, webhook: webhook.id, attempts: outcome.attempts }, "webhook request failed after retries"); + } else if (outcome.attempts > 1 && outcome.response) { + app.log.warn( + { webhook: webhook.id, attempts: outcome.attempts, status: outcome.response.status }, + "webhook request retried", + ); + } + + const response = outcome.response; + const httpOk = response?.ok ?? false; + const isTransportError = !response; + + const run = audit.create({ + webhookId: webhook.id, + userId: user.id, + sessionId: body.sessionId, + status: httpOk ? "success" : "error", + requestPayload, + responseStatus: response?.status ?? null, + attempts: outcome.attempts, + }); + + if (isTransportError) { + return reply.code(502).send({ + id: run.id, + webhook_id: run.webhook_id, + status: run.status, + response_status: run.response_status, + attempts: run.attempts, + error: "webhook_request_failed", + }); + } + + return reply.code(httpOk ? 200 : 502).send({ + id: run.id, + webhook_id: run.webhook_id, + status: run.status, + response_status: run.response_status, + attempts: run.attempts, + }); + }); +}; diff --git a/apps/api/test/integration.test.ts b/apps/api/test/integration.test.ts new file mode 100644 index 0000000..6fb9aae --- /dev/null +++ b/apps/api/test/integration.test.ts @@ -0,0 +1,151 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { existsSync, mkdirSync, rmSync } from "node:fs"; +import { tmpdir } from "node:os"; +import { join } from "node:path"; +import Database from "better-sqlite3"; +import type { AppDatabase } from "../src/db/database.js"; +import { migrate } from "../src/db/migrate.js"; +import { createSessionRepository, createMessageRepository } from "../src/sessions/repository.js"; +import { createWebhookAuditRepository } from "../src/webhooks/audit.js"; +import { runWebhookAuditPurge } from "../src/webhooks/audit.js"; + +let db: AppDatabase; +let dbPath: string; + +beforeEach(() => { + const dir = join(tmpdir(), `sic-test-${Date.now()}-${Math.random().toString(36).slice(2)}`); + mkdirSync(dir, { recursive: true }); + dbPath = join(dir, "test.db"); + db = new Database(dbPath); + db.pragma("foreign_keys = ON"); + migrate(db); +}); + +afterEach(() => { + db.close(); + if (existsSync(dbPath)) rmSync(dbPath, { force: true }); +}); + +describe("session isolation", () => { + it("never returns sessions/messages from another user", () => { + const sessions = createSessionRepository(db); + const messages = createMessageRepository(db); + + const a = sessions.create("user-a", "A session"); + const b = sessions.create("user-b", "B session"); + + messages.create({ sessionId: a.id, userId: "user-a", role: "user", content: "a msg" }); + messages.create({ sessionId: b.id, userId: "user-b", role: "user", content: "b msg" }); + + // List filter + expect(sessions.list("user-a").map((s) => s.id)).toEqual([a.id]); + expect(sessions.list("user-b").map((s) => s.id)).toEqual([b.id]); + + // get() requires matching user_id + expect(sessions.get("user-a", b.id)).toBeFalsy(); + expect(sessions.get("user-b", a.id)).toBeFalsy(); + + // Messages filter by both session_id and user_id + const bMessages = messages.listForSession("user-a", b.id); + expect(bMessages).toEqual([]); + const aMessages = messages.listForSession("user-a", a.id); + expect(aMessages).toHaveLength(1); + expect(aMessages[0]?.content).toBe("a msg"); + }); + + it("delete cascades to messages", () => { + const sessions = createSessionRepository(db); + const messages = createMessageRepository(db); + + const s = sessions.create("user-a", null); + const m = messages.create({ + sessionId: s.id, + userId: "user-a", + role: "user", + content: "will be cascaded", + }); + + sessions.delete("user-a", s.id); + expect(messages.listForSession("user-a", s.id)).toEqual([]); + // Direct DB check that the message row is gone (not just hidden) + const row = db.prepare("SELECT id FROM chat_messages WHERE id = ?").get(m.id); + expect(row).toBeUndefined(); + }); + + it("updateTitle only affects the owner's session", () => { + const sessions = createSessionRepository(db); + const a = sessions.create("user-a", "Old"); + sessions.updateTitle("user-a", a.id, "New"); + expect(sessions.get("user-a", a.id)?.title).toBe("New"); + }); +}); + +describe("webhook audit + retention", () => { + it("usageForUserSince aggregates per webhook", () => { + const sessions = createSessionRepository(db); + const audit = createWebhookAuditRepository(db); + const s1 = sessions.create("user-a", "test"); + const s2 = sessions.create("user-b", "test"); + const now = Date.now(); + const fresh = new Date(now - 1_000).toISOString(); + const old = new Date(now - 100 * 86_400_000).toISOString(); + + audit.create({ webhookId: "dns-flush", userId: "user-a", sessionId: s1.id, status: "success", createdAt: fresh }); + audit.create({ webhookId: "dns-flush", userId: "user-a", sessionId: s1.id, status: "success", createdAt: fresh }); + audit.create({ webhookId: "dns-flush", userId: "user-a", sessionId: s1.id, status: "error", createdAt: fresh }); + audit.create({ webhookId: "dns-flush", userId: "user-a", sessionId: s1.id, status: "success", createdAt: old }); + audit.create({ webhookId: "other-hook", userId: "user-b", sessionId: s2.id, status: "success", createdAt: fresh }); + + const since = new Date(now - 7 * 86_400_000).toISOString(); + const usage = audit.usageForUserSince(since, "user-a"); + expect(usage["dns-flush"]?.runs).toBe(3); + expect(usage["dns-flush"]?.successes).toBe(2); + expect(usage["dns-flush"]?.successRate).toBeCloseTo(2 / 3); + expect(usage["other-hook"]).toBeUndefined(); + }); + + it("retention purge deletes old rows but keeps recent ones", () => { + const sessions = createSessionRepository(db); + const audit = createWebhookAuditRepository(db); + const s = sessions.create("user-a", "test"); + const now = Date.now(); + const fresh = new Date(now - 60_000).toISOString(); + const stale = new Date(now - 100 * 86_400_000).toISOString(); + audit.create({ webhookId: "w", userId: "user-a", sessionId: s.id, status: "success", createdAt: fresh }); + audit.create({ webhookId: "w", userId: "user-a", sessionId: s.id, status: "success", createdAt: stale }); + audit.create({ webhookId: "w", userId: "user-a", sessionId: s.id, status: "success", createdAt: stale }); + + const report = runWebhookAuditPurge(db, { retentionDays: 30, maxPerUser: 0 }); + expect(report.deletedByAge).toBe(2); + + const remaining = db.prepare("SELECT COUNT(*) as n FROM webhook_runs").get() as { n: number }; + expect(remaining.n).toBe(1); + }); + + it("per-user cap keeps the most recent N", () => { + const sessions = createSessionRepository(db); + const audit = createWebhookAuditRepository(db); + const s = sessions.create("user-a", "test"); + const now = Date.now(); + for (let i = 0; i < 8; i++) { + const ts = new Date(now - i * 1000).toISOString(); + audit.create({ webhookId: "w", userId: "user-a", sessionId: s.id, status: "success", createdAt: ts }); + } + const report = runWebhookAuditPurge(db, { retentionDays: 0, maxPerUser: 3 }); + expect(report.deletedByCap).toBe(5); + const remaining = db.prepare("SELECT COUNT(*) as n FROM webhook_runs").get() as { n: number }; + expect(remaining.n).toBe(3); + }); + + it("listForSession enforces user_id", () => { + const sessions = createSessionRepository(db); + const audit = createWebhookAuditRepository(db); + const sa = sessions.create("user-a", "test"); + const sb = sessions.create("user-b", "test"); + audit.create({ webhookId: "w", userId: "user-a", sessionId: sa.id, status: "success" }); + audit.create({ webhookId: "w", userId: "user-b", sessionId: sb.id, status: "success" }); + expect(audit.listForSession("user-a", sa.id)).toHaveLength(1); + expect(audit.listForSession("user-b", sb.id)).toHaveLength(1); + expect(audit.listForSession("user-a", sa.id)[0]?.user_id).toBe("user-a"); + }); +}); diff --git a/apps/api/test/rag-client.test.ts b/apps/api/test/rag-client.test.ts new file mode 100644 index 0000000..323113e --- /dev/null +++ b/apps/api/test/rag-client.test.ts @@ -0,0 +1,118 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { createServer, type Server } from "node:http"; + +let server: Server; +let port = 0; + +beforeEach(async () => { + await new Promise((resolve) => { + server = createServer((req, res) => { + const url = new URL(req.url ?? "/", `http://${req.headers.host}`); + // Decode the path so ids with reserved characters (e.g. "runbooks:vpn") + // match whether the client encoded the colon as %3A or not. + const pathname = decodeURIComponent(url.pathname); + if (req.method === "POST" && pathname === "/search") { + let body = ""; + req.on("data", (c) => (body += c)); + req.on("end", () => { + res.writeHead(200, { "content-type": "application/json" }); + res.end( + JSON.stringify({ + items: [ + { id: "remote:1", title: "Remote doc", source: "remote", tags: ["remote"], relevance: 0.9, excerpt: "x" }, + ], + }), + ); + }); + return; + } + if (req.method === "GET" && pathname === "/docs/remote:1") { + res.writeHead(200, { "content-type": "application/json" }); + res.end( + JSON.stringify({ + id: "remote:1", + title: "Remote doc", + source: "remote", + tags: ["remote"], + headings: ["Section"], + content: "Full remote content", + }), + ); + return; + } + res.writeHead(404).end(); + }); + server.listen(0, "127.0.0.1", () => { + const a = server.address(); + port = typeof a === "object" && a ? a.port : 0; + resolve(); + }); + }); +}); + +afterEach(async () => { + await new Promise((resolve) => server.close(() => resolve())); +}); + +describe("rag client", () => { + it("searches via the configured endpoint when set", async () => { + const { searchViaRag } = await import("../src/rag/client.js"); + const items = await searchViaRag( + { + endpoint: `http://127.0.0.1:${port}`, + authToken: "", + timeoutMs: 5000, + fallbackToLocal: false, + chunkStrategy: "heading", + chunkSizeChars: 1500, + topK: 5, + minRelevance: 0, + includeTags: [], + excludeTags: [], + }, + "anything", + 3, + ); + expect(items).toHaveLength(1); + expect(items[0]?.id).toBe("remote:1"); + expect(items[0]?.relevance).toBe(0.9); + }); + + it("fetches a single doc via the endpoint", async () => { + const { getViaRag } = await import("../src/rag/client.js"); + const doc = await getViaRag( + { + endpoint: `http://127.0.0.1:${port}`, + authToken: "secret", + timeoutMs: 5000, + fallbackToLocal: false, + chunkStrategy: "heading", + chunkSizeChars: 1500, + topK: 5, + minRelevance: 0, + includeTags: [], + excludeTags: [], + }, + "remote:1", + ); + expect(doc?.id).toBe("remote:1"); + expect(doc?.content).toBe("Full remote content"); + }); + + it("isRagRemote returns true when endpoint is set, false otherwise", async () => { + const { isRagRemote } = await import("../src/rag/client.js"); + const base = { + authToken: "", + timeoutMs: 1000, + fallbackToLocal: true, + chunkStrategy: "heading" as const, + chunkSizeChars: 1500, + topK: 5, + minRelevance: 0, + includeTags: [], + excludeTags: [], + }; + expect(isRagRemote({ ...base, endpoint: "" })).toBe(false); + expect(isRagRemote({ ...base, endpoint: "http://x" })).toBe(true); + }); +}); diff --git a/apps/api/test/rate-limit.test.ts b/apps/api/test/rate-limit.test.ts new file mode 100644 index 0000000..111e6bf --- /dev/null +++ b/apps/api/test/rate-limit.test.ts @@ -0,0 +1,57 @@ +import { describe, expect, it } from "vitest"; +import { createRateLimiter } from "../src/rate-limit.js"; + +describe("rate-limit", () => { + it("accepts up to burst then rejects", () => { + const lim = createRateLimiter({ perMinute: 60, burst: 3 }); + expect(lim.consume("u1", 0)).toEqual({ ok: true, remaining: 2 }); + expect(lim.consume("u1", 0)).toEqual({ ok: true, remaining: 1 }); + expect(lim.consume("u1", 0)).toEqual({ ok: true, remaining: 0 }); + const denied = lim.consume("u1", 0); + expect(denied.ok).toBe(false); + if (!denied.ok) { + expect(denied.retryAfterMs).toBeGreaterThan(0); + expect(denied.retryAfterMs).toBeLessThanOrEqual(1000); + } + }); + + it("refills tokens over time", () => { + const lim = createRateLimiter({ perMinute: 60, burst: 2 }); + expect(lim.consume("u1", 0).ok).toBe(true); + expect(lim.consume("u1", 0).ok).toBe(true); + expect(lim.consume("u1", 0).ok).toBe(false); + // 1 second later, 1 token refilled + expect(lim.consume("u1", 1000).ok).toBe(true); + expect(lim.consume("u1", 1000).ok).toBe(false); + }); + + it("isolates buckets per id", () => { + const lim = createRateLimiter({ perMinute: 60, burst: 1 }); + expect(lim.consume("u1", 0).ok).toBe(true); + expect(lim.consume("u1", 0).ok).toBe(false); + // u2 has its own bucket + expect(lim.consume("u2", 0).ok).toBe(true); + expect(lim.consume("u2", 0).ok).toBe(false); + }); + + it("caps refill at burst", () => { + const lim = createRateLimiter({ perMinute: 60, burst: 2 }); + // Wait a long time, tokens should still be capped at 2 + const result = lim.consume("u1", 60_000); + expect(result).toEqual({ ok: true, remaining: 1 }); + expect(lim.consume("u1", 60_000).ok).toBe(true); + expect(lim.consume("u1", 60_000).ok).toBe(false); + }); + + it("reset clears a single bucket or all", () => { + const lim = createRateLimiter({ perMinute: 60, burst: 1 }); + lim.consume("u1", 0); + lim.consume("u2", 0); + expect(lim.size()).toBe(2); + lim.reset("u1"); + expect(lim.size()).toBe(1); + expect(lim.consume("u1", 0).ok).toBe(true); + lim.reset(); + expect(lim.size()).toBe(0); + }); +}); diff --git a/apps/api/tsconfig.json b/apps/api/tsconfig.json new file mode 100644 index 0000000..5a24989 --- /dev/null +++ b/apps/api/tsconfig.json @@ -0,0 +1,8 @@ +{ + "extends": "../../tsconfig.base.json", + "compilerOptions": { + "outDir": "dist", + "rootDir": "src" + }, + "include": ["src"] +} diff --git a/apps/web/index.html b/apps/web/index.html new file mode 100644 index 0000000..93cdd4e --- /dev/null +++ b/apps/web/index.html @@ -0,0 +1,28 @@ + + + + + + + SIC — Super Incident Commander + + + + +
+ + + diff --git a/apps/web/package.json b/apps/web/package.json new file mode 100644 index 0000000..2b563c8 --- /dev/null +++ b/apps/web/package.json @@ -0,0 +1,25 @@ +{ + "name": "@pi-chat/web", + "private": true, + "version": "0.1.0", + "type": "module", + "scripts": { + "dev": "vite --host 0.0.0.0", + "typecheck": "tsc --noEmit", + "lint": "tsc --noEmit" + }, + "dependencies": { + "@pi-chat/shared": "workspace:*", + "@vitejs/plugin-react": "^4.5.2", + "react": "^19.1.0", + "react-dom": "^19.1.0", + "react-markdown": "^10.1.0", + "remark-gfm": "^4.0.1", + "vite": "^6.3.5" + }, + "devDependencies": { + "@types/react": "^19.1.8", + "@types/react-dom": "^19.1.6", + "typescript": "^5.8.3" + } +} diff --git a/apps/web/public/agent.png b/apps/web/public/agent.png new file mode 100644 index 0000000..5582056 Binary files /dev/null and b/apps/web/public/agent.png differ diff --git a/apps/web/public/favicon-dark.png b/apps/web/public/favicon-dark.png new file mode 100644 index 0000000..c41a40d Binary files /dev/null and b/apps/web/public/favicon-dark.png differ diff --git a/apps/web/public/favicon-light.png b/apps/web/public/favicon-light.png new file mode 100644 index 0000000..35c5b9c Binary files /dev/null and b/apps/web/public/favicon-light.png differ diff --git a/apps/web/public/logo-dark.png b/apps/web/public/logo-dark.png new file mode 100644 index 0000000..4a68af4 Binary files /dev/null and b/apps/web/public/logo-dark.png differ diff --git a/apps/web/public/logo-light.png b/apps/web/public/logo-light.png new file mode 100644 index 0000000..504cef9 Binary files /dev/null and b/apps/web/public/logo-light.png differ diff --git a/apps/web/src/DocModal.tsx b/apps/web/src/DocModal.tsx new file mode 100644 index 0000000..9cfb478 --- /dev/null +++ b/apps/web/src/DocModal.tsx @@ -0,0 +1,85 @@ +import { useEffect } from "react"; +import { MarkdownView } from "./code-block"; + +export type KnowledgeDoc = { + id: string; + title: string; + source: string; + tags: string[]; + owner?: string; + updated?: string; + headings: string[]; + content: string; +}; + +type DocModalProps = { + doc: KnowledgeDoc; + onClose: () => void; + labels: { + close: string; + tags: string; + owner: string; + updated: string; + }; +}; + +const DocModal = ({ doc, onClose, labels }: DocModalProps) => { + useEffect(() => { + const onKey = (event: KeyboardEvent) => { + if (event.key === "Escape") onClose(); + }; + window.addEventListener("keydown", onKey); + return () => window.removeEventListener("keydown", onKey); + }, [onClose]); + + return ( +
+
event.stopPropagation()}> +
+
+

{doc.title}

+ {doc.source} +
+ +
+
+ {doc.tags.length > 0 ? ( +
+
{labels.tags}
+
{doc.tags.join(", ")}
+
+ ) : null} + {doc.owner ? ( +
+
{labels.owner}
+
{doc.owner}
+
+ ) : null} + {doc.updated ? ( +
+
{labels.updated}
+
{doc.updated}
+
+ ) : null} +
+
+ +
+
+
+ ); +}; + +export default DocModal; diff --git a/apps/web/src/ErrorBoundary.tsx b/apps/web/src/ErrorBoundary.tsx new file mode 100644 index 0000000..737fff0 --- /dev/null +++ b/apps/web/src/ErrorBoundary.tsx @@ -0,0 +1,54 @@ +import { Component, type ReactNode } from "react"; + +type ErrorBoundaryProps = { + children: ReactNode; +}; + +type ErrorBoundaryState = { + error: Error | null; +}; + +export class ErrorBoundary extends Component { + state: ErrorBoundaryState = { error: null }; + + static getDerivedStateFromError(error: Error): ErrorBoundaryState { + return { error }; + } + + componentDidCatch(error: Error, info: { componentStack?: string }): void { + // Surface to the dev console; production telemetry would hook in here. + // eslint-disable-next-line no-console + console.error("[SIC] uncaught render error", error, info.componentStack); + } + + private handleReload = () => { + window.location.reload(); + }; + + private handleReset = () => { + this.setState({ error: null }); + }; + + render() { + const { error } = this.state; + if (!error) return this.props.children; + + return ( +
+
+ {`Unrecoverable UI error`} +

Something went wrong while rendering SIC. Your sessions and messages are still saved on the server.

+
{error.message}
+
+ + +
+
+
+ ); + } +} diff --git a/apps/web/src/WebhookFormTab.tsx b/apps/web/src/WebhookFormTab.tsx new file mode 100644 index 0000000..8757be4 --- /dev/null +++ b/apps/web/src/WebhookFormTab.tsx @@ -0,0 +1,231 @@ +import { useEffect, useMemo, useState } from "react"; +import type { FormEvent } from "react"; +import { authorizedHeaders, authTokenFromStorage, jsonHeaders, api } from "./api"; + +type PublicWebhook = { + id: string; + label: string; + description?: string; + method: string; + required_roles: string[]; + confirmation_required: boolean; +}; + +type RunResult = + | { kind: "idle" } + | { kind: "running" } + | { kind: "success"; responseStatus: number | null; runId: string } + | { kind: "error"; message: string }; + +type WebhookFormTabProps = { + webhookId: string; + sessionId: string; + onBack: () => void; +}; + +const labelsEn = { + title: "Run webhook", + description: "Description", + requiredRoles: "Required roles", + confirmation: "Requires confirmation", + method: "Method", + payload: "Payload (optional JSON)", + payloadHelp: "These fields are merged with the backend template. Available variables: {user}, {session}, {message}.", + run: "Run", + running: "Running...", + resultOk: "Webhook executed", + resultErr: "Failed to execute", + httpStatus: "HTTP", + runId: "Audit ID", + back: "Back to chat", + notFound: "Webhook not found or insufficient permissions", + loading: "Loading webhook...", + user: "User", + session: "Session", +}; + +const detectLanguage = (): "en" => "en"; + +const WebhookFormTabInner = ({ webhookId, sessionId, onBack }: WebhookFormTabProps) => { + const [labels] = useState(() => labelsEn); + const [webhook, setWebhook] = useState(null); + const [payload, setPayload] = useState("{}"); + const [result, setResult] = useState({ kind: "idle" }); + const [error, setError] = useState(null); + + useEffect(() => { + void (async () => { + try { + const data = await api<{ items: PublicWebhook[] }>("/api/webhooks"); + const found = data.items.find((item) => item.id === webhookId); + if (!found) { + setError(labels.notFound); + return; + } + setWebhook(found); + } catch (err) { + console.error(err); + setError(labels.notFound); + } + })(); + }, [webhookId, labels.notFound]); + + const submit = async (event: FormEvent) => { + event.preventDefault(); + if (!webhook) return; + + if (webhook.confirmation_required) { + const ok = window.confirm(`Run ${webhook.label}?`); + if (!ok) return; + } + + let parsed: Record = {}; + if (payload.trim().length > 0) { + try { + const value = JSON.parse(payload); + if (value && typeof value === "object" && !Array.isArray(value)) { + parsed = value as Record; + } + } catch { + setResult({ kind: "error", message: "Payload is not valid JSON" }); + return; + } + } + + setResult({ kind: "running" }); + try { + const response = await fetch(`/api/webhooks/${webhook.id}/run`, { + method: "POST", + headers: jsonHeaders(), + body: JSON.stringify({ + sessionId, + confirmed: true, + lastUserMessage: undefined, + payload: parsed, + }), + }); + if (!response.ok) { + const detail = await response.text().catch(() => ""); + throw new Error(`http_${response.status}: ${detail.slice(0, 200)}`); + } + const body = (await response.json()) as { id: string; response_status: number | null }; + setResult({ kind: "success", responseStatus: body.response_status, runId: body.id }); + } catch (err) { + console.error(err); + const message = err instanceof Error ? err.message : "error"; + setResult({ kind: "error", message }); + } + }; + + const tokenInfo = useMemo(() => { + const t = authTokenFromStorage(); + return t ? `${t.slice(0, 12)}…` : labels.notFound; + }, [labels.notFound]); + + if (error) { + return ( +
+

{labels.title}

+

{error}

+ +
+ ); + } + + if (!webhook) { + return ( +
+

{labels.title}

+

{labels.loading}

+
+ ); + } + + return ( +
+
+
+ SIC +

{webhook.label}

+
+ +
+ +
+ {webhook.description ? ( +
+
{labels.description}
+
{webhook.description}
+
+ ) : null} +
+
{labels.method}
+
{webhook.method}
+
+
+
{labels.requiredRoles}
+
{webhook.required_roles.join(", ")}
+
+
+
{labels.confirmation}
+
{webhook.confirmation_required ? "Yes" : "No"}
+
+
+
{labels.session}
+
{sessionId.slice(0, 8)}…
+
+
+
{labels.user}
+
{tokenInfo}
+
+
+ +
+