back to index

用Go实现一个AI对话助手

2026/02/02
loading

本文实现一个基于 Go + Gin 的 AI 会话后端服务,核心目标是把大模型调用能力接入到标准 Web API 中,并提供完整的用户鉴权、会话管理、流式响应、异步持久化、历史恢复能力。项目的设计重点不是单点功能,而是通过分层架构把协议处理、业务编排、模型调用、存储与消息系统明确拆分,形成可扩展、可维护、可演进的工程结构。

整体架构

架构图如图所示:

img

架构分层职责

  • 接入层:负责应用生命周期与全局装配,完成配置加载、数据库连接、消息队列初始化、路由注册、服务启动与优雅退出。
  • 路由层:负责路由编排和访问控制边界,将公开接口与受保护接口分组,并统一挂载鉴权中间件。
  • 控制器层:负责 HTTP 协议处理,包含参数绑定、上下文读取、调用 Service 与统一响应输出。
  • 业务层:负责用例编排,串联会话创建、模型调用、流式推送、历史读取与错误码映射。
  • AI 能力层:负责模型抽象、模型实现、模型工厂、会话上下文缓存与多用户隔离。
  • 数据访问层:通过 Factory + Store Interface 提供存储抽象,并由 MySQL 实现具体读写逻辑。
  • 消息队列层:负责消息发布与消费,将对话主链路和持久化写库链路解耦。

通过分层,保证每一层只处理当前层应处理的问题,避免跨层耦合导致的维护成本放大。业务变更通常局限在业务层和 AI 层,基础设施变更通常局限在接入层和数据访问层,能够保持系统演进时的稳定性。

大模型接入

模型抽象

模型能力定义为统一接口:

1
2
3
4
5
6
7
type StreamCallback func(msg string)

type AIModel interface {
GenerateResponse(ctx context.Context, messages []*schema.Message) (*schema.Message, error)
StreamResponse(ctx context.Context, messages []*schema.Message, cb StreamCallback) (string, error)
GetModelType() string
}

该抽象将模型调用约束为统一协议,业务层永远依赖 AIModel 而不是某个厂商 SDK。设计目标是稳定上层接口,允许底层模型实现自由替换。

OpenAI 与 Ollama 实现

两者结构保持一致,便于统一接入策略。

OpenAI 构造代码如下:

1
2
3
4
5
6
7
8
9
10
11
func NewOpenAIModel(ctx context.Context, config map[string]interface{}) (*OpenAIModel, error) {
llm, err := openai.NewChatModel(ctx, &openai.ChatModelConfig{
BaseURL: config["baseURL"].(string),
Model: config["modelName"].(string),
APIKey: config["apiKey"].(string),
})
if err != nil {
return nil, fmt.Errorf("create openai model failed: %v", err)
}
return &OpenAIModel{llm: llm}, nil
}

流式接收代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
func (o *OpenAIModel) StreamResponse(ctx context.Context, messages []*schema.Message, cb StreamCallback) (string, error) {
stream, err := o.llm.Stream(ctx, messages)
if err != nil {
return "", fmt.Errorf("openai stream failed: %v", err)
}
defer stream.Close()

var fullResp strings.Builder
for {
msg, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return "", fmt.Errorf("openai stream recv failed: %v", err)
}
if len(msg.Content) > 0 {
fullResp.WriteString(msg.Content)
cb(msg.Content)
}
}
return fullResp.String(), nil
}

模型实现对齐统一接口,业务层可无差别调用。流式接口同时返回增量回调与聚合文本,分别用于实时传输与会话落盘。这种设计确保“模型差异存在于模型层”,而不是蔓延到业务层和控制器层。

模型工厂

模型工厂通过注册表维护模型类型与构造函数映射。并根据AIModel进一步创建AIHelper,避免重复初始化和并发下重复注册,保证工厂全局一致

注册模型代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
type ModelCreator func(ctx context.Context, config map[string]interface{}) (AImodel.AIModel, error)

type AIModelFactory struct {
creators map[string]ModelCreator
}

func (f *AIModelFactory) CreateAIModel(ctx context.Context, modelType string, config map[string]interface{}) (AImodel.AIModel, error) {
creator, ok := f.creators[modelType]
if !ok {
return nil, fmt.Errorf("unknown modelType")
}
return creator(ctx, config)
}
// 根据模型创建AIHelper
func (f *AIModelFactory) CreateAIHelper(ctx context.Context, modelType, SessionID string, config map[string]interface{}) (*AIHelper, error) {
model, err := f.CreateAIModel(ctx, modelType, config)
if err != nil {
return nil, fmt.Errorf("created AIModel err : %v", err)
}
return NewAIHelper(model, SessionID), nil
}

模型选择逻辑集中在工厂,避免业务层出现大量 if/else 或 switch。工厂只负责创建,不负责业务流程。新模型接入仅需注册 creator,模型能力升级不会影响核心业务流程结构。

会话上下文管理

AIHelper 是单会话执行核心,一个会话对应一个 Helper 实例。

核心结构如下:

1
2
3
4
5
6
7
type AIHelper struct {
model AImodel.AIModel
messages []*model.Message
mu sync.RWMutex
SessionID string
saveFunc func(*model.Message) (*model.Message, error)
}
  • model:当前使用的 AI 模型实例
  • messages:历史消息队列
  • mu:读写锁,保护消息队列并发安全
  • SessionID:会话唯一 ID
  • saveFunc:消息持久化回调函数

AIHelper封装了单个会话的 上下文管理,把消息记录、模型调用、流式/非流式回复、消息持久化整合在其中。

AddMessage

addMessage构造一条消息并加入当前会话历史,按需持久化。

1
2
3
4
5
6
7
8
9
10
11
12
13
func (a *AIHelper) AddMessage(content string, username string, isUser bool, save bool) {
userMsg := model.Message{
SessionId: a.SessionID,
Content: content,
Username: username,
IsUser: isUser,
}
a.messages = append(a.messages, &userMsg)

if save {
a.saveFunc(&userMsg)
}
}

StreamResponse

StreamResponse先记录用户输入,转换历史消息为schema格式,通过cb向外推送增量内容,流结束后得到完整 content,落库并加入历史。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
func (a *AIHelper) StreamResponse(ctx context.Context, username string, cb AImodel.StreamCallback, input string) (*model.Message, error) {
a.AddMessage(input, username, true, save)

a.mu.RLock()
messages := ConvertToSchemaMessages(a.messages)
a.mu.RUnlock()

content, err := a.model.StreamResponse(ctx, messages, cb)
if err != nil {
return nil, err
}
modelMsg := &model.Message{
SessionId: a.SessionID,
Username: username,
Content: content,
IsUser: false,
}
//调用存储函数
a.AddMessage(modelMsg.Content, username, false, true)

return modelMsg, nil
}

上下文维护、模型调用、持久化触发聚合在单组件,把“会话状态”与“会话行为”封装在一起,形成对话域内聚模型,减少跨模块状态分散。

ConvertToSchemaMessages

该函数用于将系统自定义消息类型转换为schema.Message类型,并将消息内容和对应角色填充到新的消息切片中,与模型库中要求消息接口进行适配

1
2
3
4
5
6
7
8
9
10
11
12
13
14
func ConvertToSchemaMessages(msgs []*model.Message) []*schema.Message {
schemaMsgs := make([]*schema.Message, 0, len(msgs))
for _, m := range msgs {
role := schema.Assistant
if m.IsUser {
role = schema.User
}
schemaMsgs = append(schemaMsgs, &schema.Message{
Role: role,
Content: m.Content,
})
}
return schemaMsgs
}

多用户多会话隔离

Manager作为AI 会话助手的内存注册中心,通过两级映射管理会话实例,实现用户维度和会话维度双重隔离,避免上下文串扰。

1
2
3
4
type Manager struct {
helpers map[string]map[string]*AIHelper
mu sync.RWMutex
}

Manager中复用已存在 Helper,减少重复创建成本,创建或复用逻辑如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
func (m *Manager) CreateAIHelperOr(ctx context.Context, username string, sessionID string, modelType string, config map[string]interface{}) (*AIHelper, error) {
m.mu.Lock()
defer m.mu.Unlock()

userHelpers, exists := m.helpers[username]
if !exists {
userHelpers = make(map[string]*AIHelper)
m.helpers[username] = userHelpers
}
helper, exists := userHelpers[sessionID]
if exists {
return helper, nil
}

factory := GetAIModelFactoryOr()
helper, err := factory.CreateAIHelper(ctx, modelType, sessionID, config)
if err != nil {
return nil, err
}
userHelpers[sessionID] = helper
return helper, nil
}

func (m *Manager) GetAIHelper(username,sessionId string)(*AIHelper, error) {
//根据按“用户+会话”获取已存在的 helper...
}

func (m *Manager) GetUserSessions(username string) []string {
// 根据用户名获取所有对话ID...
}

创建动作统一收口在 Manager,Manage使用单例模式,便于集中加锁与生命周期管理。将会话隔离与资源复用之间的平衡,满足多用户并发对话的基础要求。

消息队列模块

队列天然提供可扩展事件通道,后续可承载审计、统计、风控等异步任务。

在该系统中,我们把“聊天消息入库”从主业务链路里异步拆出去,让 AI 对话处理更轻、响应更快。

1
2
3
4
5
6
type RabbitMQ struct {
conn *amqp.Connection // 与 RabbitMQ 服务建立的连接
channel *amqp.Channel // 用于消息发布和消费的 AMQP 通道
Exchange string // 交换机名称
Key string // 路由键(或队列名)
}

发布代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
func (r *RabbitMQ) Publish(message []byte) error {
_, err := r.channel.QueueDeclare(r.Key, false, false, false, false, nil)
if err != nil {
return err
}
return r.channel.Publish(r.Exchange, r.Key, false, false,
amqp.Publishing{
ContentType: "text/plain",
Body: message,
},
)
}

消费代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func MQMessage(msg *amqp.Delivery) error {
var param MessageMQParam
err := json.Unmarshal(msg.Body, &param)
if err != nil {
return err
}
newMsg := &model.Message{
SessionId: param.SessionId,
Content: param.Content,
Username: param.Username,
IsUser: param.IsUser,
}
store.Client().Messages().Create(newMsg)
return nil
}

初始化代码如下:

1
2
3
4
func InitRabbitMQ() {
RMQMessage = NewWorkRabbitMQ("Message")
go RMQMessage.Consume(MQMessage)
}

对话请求只负责发布消息,不阻塞等待数据库写入,降低接口耗时。持久化通过消费者异步完成,提升系统吞吐能力与抗峰值能力。

鉴权模块

鉴权模块采用策略模式实现,一个鉴权方法对应一种策略。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
type AuthStrategy interface {
AuthFunc() gin.HandlerFunc
}

type AuthOperator struct {
strategy AuthStrategy
}

func (operator *AuthOperator) SetStrategy(strategy AuthStrategy) {
operator.strategy = strategy
}

func (operator *AuthOperator) AuthFunc() gin.HandlerFunc {
return operator.strategy.AuthFunc()
}
  • AuthStrategy:行为规范接口,定义了 AuthFunc 方法用于返回 gin 的中间件处理函数;
  • AuthOperator :上下文环境类,通过组合 AuthStrategy 接口成员,可以在运行时动态指定具体的认证策略;
  • SetStrategy:切换策略

在该系统中我们基于gin-jwt中间件实现JWT鉴权,首先实现策略接口

1
2
3
4
5
6
7
8
9
10
11
12
13
type JWTStrategy struct {
ginjwt.GinJWTMiddleware
}

var _ AuthStrategy = &JWTStrategy{}

func NewJWTStrategy(gjwt ginjwt.GinJWTMiddleware) JWTStrategy {
return JWTStrategy{gjwt}
}

func (j JWTStrategy) AuthFunc() gin.HandlerFunc {
return j.MiddlewareFunc()
}

AuthFunc中返回ginjwt的中间件处理方法。

newJWTAuth中创建GinJWTMiddleware实例,注册一系列的回调函数,实现认证核心流程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
func newJWTAuth() auth.AuthStrategy {
ginjwt, _ := jwt.New(&jwt.GinJWTMiddleware{
Realm: viper.GetString("jwt.Realm"),
Key: []byte(viper.GetString("jwt.key")),
Timeout: viper.GetDuration("jwt.timeout"),
MaxRefresh: viper.GetDuration("jwt.max-refresh"),
Authenticator: authenticator(),
PayloadFunc: payloadFunc(),
IdentityKey: UsernameKey,
Authorizator: authorizator(),
TokenLookup: "header: Authorization,query: token, cookie: jwt",
TokenHeadName: "Bearer",
})
return auth.NewJWTStrategy(*ginjwt)
}

其中Authenticator用于登录时验证用户身份:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
func authenticator() func(c *gin.Context) (interface{}, error) {
return func(c *gin.Context) (interface{}, error) {
login, err := parseWithBody(c)
if err != nil {
return "", werrors.WithCode(code.ErrBind, err.Error())
}
user, err := store.Client().Users().Get(c, login.Username)
if err != nil {
wlog.Errorf("get user information failed: %s", err.Error())
return "", werrors.WithCode(code.ErrUserNotFound, "user not found")
}
if err := user.Compare(login.Password); err != nil {
return "", jwt.ErrFailedAuthentication
}
return user, nil
}
}

业务层

业务层会话服务承担了模型调用流程的主编排职责。业务层采用面向接口编程,将控制层/路由对接与底层具体实现细节解耦,便于后期进行扩展和维护

1
2
3
4
5
6
7
type SessionSrv interface {
Get(ctx context.Context, username string) ([]model.SessionInfo, error)
Create(ctx context.Context, username string, title string) (string, error)
Send(ctx context.Context, sessionID string, username string, input string, modelType string) (*model.Message, error)
StreamSend(ctx context.Context, sessionID, username, input, modelType string, writer http.ResponseWriter) error
GetHistory(username, sessionID string) ([]model.History, error)
}

sessionService 结构体实现了 SessionSrv 接口,内部持有对数据持久层 store.Factory 的依赖,通过依赖注入方式将存储操作委托给数据访问层,实现业务逻辑与数据存储解耦。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type sessionService struct {
store store.Factory
}

var _ SessionSrv = (*sessionService)(nil)

func (s *sessionService) Create(ctx context.Context, username string, title string) (string, error) {
//1:创建一个新的会话
newSession := &model.Session{
ID: uuid.New().String(),
Username: username,
Title: title,
}
createdSession, err := s.store.Sessions().Create(ctx, newSession)
if err != nil {
wlog.Errorf("failed to createSession: %s", err.Error())
return "-1", werrors.WithCode(code.ErrDatabase, err.Error())
}
return createdSession.ID, nil
}

业务层承担流程编排职责,控制器只做协议适配,分层边界清晰。业务层统一封装错误码映射,保证系统行为一致性。该设计使业务语义集中,后续扩展限流、配额、审计、重试逻辑时只需改业务层,不会向上游控制器扩散。

数据访问层

该层通过工厂统一暴露用户、会话、消息三个 Store 接口,将接口抽象与 MySQL 实现分离

工厂接口如下:

1
2
3
4
5
type Factory interface {
Users() UserStore
Sessions() SessionStore
Messages() MessageStore
}

SQL实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
type datastore struct {
db *gorm.DB
}

func (ds *datastore) Users() store.UserStore {
return newUsers(ds)
}

type users struct {
db *gorm.DB
}

func newUsers(ds *datastore) *users {
return &users{ds.db}
}

func (u *users) Create(ctx context.Context, user *model.User) error {
return u.db.Create(&user).Error
}
// ...else

上层只依赖接口,不依赖具体 ORM,实现解耦。按领域拆分 Store,职责清晰,接口粒度稳定。查询逻辑在数据层收口,业务层无需关心 SQL/ORM 细节。该设计使存储策略具备替换能力,后续切换数据库或新增缓存层时改动范围可控。

控制器层

控制器层负责是把 HTTP 请求转换为业务层可处理的参数,并把业务结果统一回写给客户端。

1
2
3
4
5
6
7
type SessionController struct {
srv service.Service
}

func NewSessionController(store store.Factory) *SessionController {
return &SessionController{srv: service.NewService(store)}
}
  • SessionController 结构体用于处理与会话相关的路由和请求,通过组合 service.Service 实例,实现对会话业务逻辑的操作。通过传出service实例实现控制层与业务逻辑层的解耦。
  • NewSessionController SessionController的构造函数,接收一个数据存储工厂,创建Controller实例,
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
func (s *SessionController) StreamSend(c *gin.Context) {
wlog.Info("send streamMsg func called")
var req SendReq
username := c.GetString("username")
if err := c.ShouldBindJSON(&req); err != nil {
core.WriteResponse(c, werrors.WithCode(code.ErrBind, "ErrBind SendReq"), nil)
return
}
c.Header("Content-Type", "text/event-stream")
c.Header("Cache-Control", "no-cache")
c.Header("Connection", "keep-alive")
c.Header("Access-Control-Allow-Origin", "*")
c.Header("X-Accel-Buffering", "no")
err := s.srv.Sessions().StreamSend(c, req.SessionID, username, req.Input, req.ModelType, http.ResponseWriter(c.Writer))
if err != nil {
core.WriteResponse(c, err, nil)
return
}
core.WriteResponse(c, nil, "success")
}

StreamSend用于处理用户的流式对话请求,通过SSE实时返回AI生成的信息流。并根据统一回复返回消息。

接入层

该模块完成整个系统从静态代码到运行态实例的装配过程。把运行态依赖收拢到单入口统一管理,简化模块初始化复杂度,减少模块内部初始化导致的隐性耦合。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
func Run() {
gin.SetMode(gin.DebugMode)
// viper init...

var mysqlOpts options.MySQLOptions
if err := viper.UnmarshalKey("mysql_options", &mysqlOpts); err != nil {
panic(fmt.Errorf("failed reading mysqlOpts: %v", err))
}
r := gin.Default()
storeIns, _ := mysql.GetMySQLFactoryOr(&mysqlOpts)
store.SetClient(storeIns)

GlobalManager = aihelper.GetGlobalManager()
ModelConfig = map[string]interface{}{
"apiKey": viper.GetString("apiKey"),
"baseURL": viper.GetString("baseURL"),
"modelName": viper.GetString("modelName"),
}
initManager()
initRouter(r)
rabbitmq.InitRabbitMQ()
srv := &http.Server{
Addr: ":8088",
Handler: r,
}
go func() {
if err := srv.ListenAndServe(); err != nil && err != http.ErrServerClosed {
wlog.Fatalf("listen: %s", err)
}
}()

quit := make(chan os.Signal)
signal.Notify(quit, os.Interrupt)

<-quit
wlog.Info("Shutdown Server ...")
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(ctx); err != nil {
wlog.Fatal("Server Shutdown: ", err)
}
}
  1. 先读取配置,再初始化依赖,再注册路由,最后启动服务,保证系统启动顺序符合依赖关系。
  2. store.SetClient 将存储工厂注入全局,保证业务层通过统一入口访问数据层。
  3. initManager 在服务启动时回放历史消息,恢复会话上下文,保证重启后对话连续性。
  4. rabbitmq.InitRabbitMQ 在启动阶段注册消费者,保证消息异步链路开机即可用。

项目功能

当前项目已实现的功能有:

  1. 用户身份建立:注册、登录、token 刷新、受保护接口访问。
  2. 会话生命周期管理:创建会话、查询会话、按会话维度隔离上下文。
  3. 模型调用能力:统一模型接口、工厂化构建、支持多模型切换。
  4. 输出模式支持:非流式完整回复与 SSE 流式回复并行提供。
  5. 数据持久化能力:消息通过 MQ 异步写库,会话与用户状态存储到 MySQL。
  6. 启动恢复能力:重启后通过历史消息回放恢复会话上下文。
CATALOG
  1. 1. 整体架构
    1. 1.1. 架构分层职责
  2. 2. 大模型接入
    1. 2.1. 模型抽象
    2. 2.2. OpenAI 与 Ollama 实现
  3. 3. 模型工厂
  4. 4. 会话上下文管理
    1. 4.1. AddMessage
    2. 4.2. StreamResponse
    3. 4.3. ConvertToSchemaMessages
  5. 5. 多用户多会话隔离
  6. 6. 消息队列模块
  7. 7. 鉴权模块
  8. 8. 业务层
  9. 9. 数据访问层
  10. 10. 控制器层
  11. 11. 接入层
  12. 12. 项目功能