文章目录
系列导航
数据流步骤总览
- 用户在输入框输入内容并按下回车或点击发送。
- 前端
ChatInput调用sendMessage,将用户消息写入 store。 - store 立即追加一条 assistant 占位消息,状态为
streaming。 - 前端向后端
POST /api/chat发起 SSE 请求,并附带中断信号AbortSignal。 - 后端先把用户消息和 assistant 占位消息写入 PostgreSQL。
- 后端先回传一条
meta事件,告知前端本次 assistant 的消息 ID。 - 后端调用 DeepSeek 流式接口,持续接收增量文本(delta)。
- 每收到一段 delta,后端通过 SSE 回传;前端按消息 ID 增量更新
content。 - 流式结束后,后端更新数据库中 assistant 消息状态为
completed,并写入 token 用量。 - 后端发送
done(可附带usage)事件,前端将消息状态更新为完成。
共享类型:packages/shared/types/chat.ts
与概览文档的差异:同目录 ai_assistant.md 中示例曾同时使用 messages[] 与单独的 streamingContent 字段表示流式内容,容易造成双源不一致(例如完成时忘记把 streamingContent 合并回消息)。推荐做法:流式过程中只更新 messages 里对应 assistant 条目的 content 与 status,全局不再维护平行的 streamingContent。
export type MessageRole = "user" | "assistant" | "system";
export type MessageStatus =
| "pending"
| "streaming"
| "completed"
| "error"
| "aborted";
export interface Message {
id: string;
sessionId: string;
role: MessageRole;
content: string;
/** 可选:用于搜索/摘要,可由 marked strip 或纯文本缓存 */
contentText?: string;
createdAt: number;
status: MessageStatus;
tokenUsage?: number;
/** 重新生成版本树:指向被替换的上一条 assistant */
parentId?: string | null;
/** 幂等键:与后端 request_id 对齐 */
requestId?: string;
error?: { code: string; message: string; retryable?: boolean };
abortReason?: string;
}
export interface SessionMeta {
tokenUsage: number;
messageCount: number;
lastMessageRole: MessageRole;
}
export interface Session {
id: string;
title: string;
model: string;
createdAt: number;
updatedAt: number;
messages: Message[];
meta: SessionMeta;
}
/** 后端 SSE 单帧(第二版协议,便于扩展) */
export type ServerStreamEvent =
| { type: "meta"; assistantMessageId: string; requestId: string }
| { type: "delta"; text: string }
| { type: "usage"; totalTokens?: number }
| { type: "done" }
| { type: "error"; code: string; message: string; retryable?: boolean };
请求体协议:ChatRequest
前端 POST /api/chat:
export interface ChatRequest {
sessionId: string;
messages: Array<{ role: MessageRole; content: string }>;
stream: true;
model: string;
temperature: number;
max_tokens?: number;
/** 幂等与去重,整链路透传 */
requestId: string;
/** regenerate:目标 assistant id,后端可据此删占位或更新同 id */
regenerateAssistantId?: string;
}
后端校验:sessionId 存在、messages 最后一条须为 user(或你约定的「工具结果」角色)、requestId 唯一。
前端状态机
- 初始状态:
idle(空闲,可发送)。 idle -> submitting:用户触发sendMessage,开始提交请求。submitting -> streaming:收到首包(首字节或meta事件),进入流式接收。streaming -> completed:收到done事件,生成完成。streaming -> error:收到错误事件或请求异常。streaming -> aborted:用户主动停止,触发中断。error -> idle:用户关闭错误提示或点击重试后回到空闲态。aborted -> idle:用户确认中断结果后回到空闲态。completed -> idle:本轮对话完成,等待下一次输入。
会话级可再维护 phase: 'idle' | 'submitting' | 'streaming';消息级用 Message.status 表达单条 assistant 的生命周期更准确。
zustand Store
apps/web/src/store/chatStore.ts 核心思路:按 slice 拆分文件也可,此处合并便于阅读。
import { create } from "zustand";
import type { Message, MessageRole, Session } from "@ai-chat/shared";
export interface ChatSettings {
model: string;
temperature: number;
maxTokens: number;
theme: "light" | "dark" | "auto";
}
interface ChatState {
sessions: Session[];
currentSessionId: string | null;
messages: Message[];
settings: ChatSettings;
phase: "idle" | "submitting" | "streaming";
setPhase: (p: ChatState["phase"]) => void;
appendUserMessage: (m: Omit<Message, "status"> & { status?: Message["status"] }) => void;
appendAssistantPlaceholder: (m: Message) => void;
patchMessage: (id: string, patch: Partial<Message>) => void;
setCurrentSession: (id: string | null) => void;
}
export const useChatStore = create<ChatState>((set, get) => ({
sessions: [],
currentSessionId: null,
messages: [],
phase: "idle",
settings: {
model: "deepseek-chat",
temperature: 0.7,
maxTokens: 4096,
theme: "dark",
},
setPhase: (phase) => set({ phase }),
appendUserMessage: (m) =>
set((s) => ({
messages: [...s.messages, { ...m, status: m.status ?? "completed" }],
})),
appendAssistantPlaceholder: (m) =>
set((s) => ({
messages: [...s.messages, { ...m, status: "streaming", content: "" }],
})),
patchMessage: (id, patch) =>
set((s) => ({
messages: s.messages.map((x) => (x.id === id ? { ...x, ...patch } : x)),
})),
setCurrentSession: (currentSessionId) => set({ currentSessionId }),
}));
发送一条消息的时序(伪代码,真实 fetch 见第三篇解析器):
setPhase('submitting')appendUserMessage({ id, sessionId, role:'user', content, createdAt, status:'completed' })appendAssistantPlaceholder({ id: assistantId, sessionId, role:'assistant', ... })setPhase('streaming'),收到meta后若后端返回的assistantMessageId与本地占位 id 不一致,用patchMessage对齐(少见,一般预先由前端生成 id 并由后端INSERT同 id)- 每个
delta:patchMessage(assistantId, { content: prev + text })—— 仅改 messages,不用第二个 state 字段 done:patchMessage(assistantId, { status:'completed', tokenUsage });setPhase('idle')
chatService.ts:建立 SSE 请求
用 fetch + AbortController(停止与重试在第四篇展开)。此处展示请求建立与事件分派骨架:
import { v4 as uuid } from "uuid";
import type { ChatRequest, ServerStreamEvent } from "@ai-chat/shared";
import { useChatStore } from "./chatStore";
export async function streamChat(
body: Omit<ChatRequest, "requestId"> & { requestId?: string },
opts: {
signal: AbortSignal;
onEvent: (e: ServerStreamEvent) => void;
}
) {
const requestId = body.requestId ?? uuid();
const res = await fetch("/api/chat", {
method: "POST",
headers: { "Content-Type": "application/json" },
signal: opts.signal,
body: JSON.stringify({ ...body, requestId, stream: true }),
});
if (!res.ok) throw new Error(`HTTP ${res.status}`);
if (!res.body) throw new Error("No body");
const reader = res.body.getReader();
const decoder = new TextDecoder();
let buf = "";
while (true) {
const { done, value } = await reader.read();
if (done) break;
buf += decoder.decode(value, { stream: true });
const parts = buf.split("\n\n");
buf = parts.pop() ?? "";
for (const block of parts) {
for (const line of block.split("\n")) {
const t = line.trim();
if (!t || t.startsWith(":")) continue;
if (!t.startsWith("data:")) continue;
const json = t.slice(5).trim();
try {
opts.onEvent(JSON.parse(json) as ServerStreamEvent);
} catch {
/* 非 JSON 可记录 */
}
}
}
}
}
regenerate:从 messages 中找到目标 assistant,截断其后消息,取上一条 user 内容,带上 regenerateAssistantId 再次调用同一接口;前端可复用原 assistant id 并先 patchMessage(id, { content:'', status:'streaming' })。
后端:占位消息与数据库节奏
推荐顺序
- 校验
ChatRequest,INSERTuser 消息(若尚未持久化该条)。 INSERTassistant 占位行:status='streaming',content='',request_id=requestId。- 打开 SSE,首包发送:
{"type":"meta","assistantMessageId":"<uuid>","requestId":"<same>"}
- 循环
deepseekChatStream,每 token{"type":"delta","text":"..."}。 - 结束后
UPDATE messages SET content=$1, status='completed', token_usage=$2 WHERE id=$3。 - 发送
{"type":"usage",...}与{"type":"done"}。
异常时 flushPartial
客户端断开或上游错误时:
UPDATE messages
SET content = $1,
status = CASE WHEN $2 THEN 'error' ELSE 'aborted' END
WHERE id = $3;
避免库里永远停在 streaming 的僵尸行(第四篇写定时修复任务亦可)。
历史会话列表与分页
GET /api/sessions?limit=20&cursor=<updated_at_iso>:
SELECT id, title, model, updated_at
FROM sessions
WHERE ($1::timestamptz IS NULL OR updated_at < $1::timestamptz)
ORDER BY updated_at DESC
LIMIT $2;
cursor 传上一页最后一条的 updated_at。前端首次进入拉第一页;打开某会话再 GET /api/sessions/:id/messages?before=<message_created_at> 做消息向上分页。
客户端缓存策略(可选)
- localStorage:仅存
currentSessionId+ 最近 N 条消息 id 列表,正文仍以服务端为准,避免泄露与大体积。 - Stale-while-revalidate:先渲染缓存列表,后台静默
GET刷新sessions。
小结
- 单一数据源:流式文本只存在于对应
Message.content,不要并行streamingContent。 - 协议:
meta→delta*→usage?→done/error,前后端类型用ServerStreamEvent锁死。 - 数据库:user 与 assistant 占位尽早入库,结束或异常都要落库终态。
下一篇:流式渲染与增量解析。