https://github.com/FxRayHughes/responsevent

做 CI/CD 机器人类工具的时候,总会遇到一个问题:核心逻辑用 Go 写得很稳,但每次要加一个新的自动化规则——比如飞书群里回复特定消息、GitLab issue 关闭后自动整理内容——都得改 Go 代码、重新编译、重新部署。改动本身可能只有十行业务判断,但部署链路的摩擦远大于逻辑本身。

这就是 responsevent 要解决的:把"业务判断"和"能力执行"拆开,Go 负责事件调度和能力执行,Node.js/TypeScript 插件只负责判断和返回动作声明。

不是微服务,不是 Webhook,是子进程

第一个要回答的问题是:插件用什么方式跑?

常见选择有三种:HTTP 微服务、嵌入式脚本引擎(goja)、子进程。HTTP 微服务太重,每个插件都要独立部署和服务发现;goja 只支持 ES5,生态割裂严重。而我们的 Docker 镜像本身基于 node:22-slim,Node 运行时已经在那了。

因此 responsevent 选择了子进程模型:一次事件触发一个 Node 子进程,通过 stdin 发 JSON、stdout 收 JSON,进程用完即退。

这个选择的代价是每次事件有 Node 启动开销。但当前场景是飞书消息、GitLab webhook、构建事件,频率不高,启动开销完全可以接受。换来的好处是隔离彻底——插件内存泄漏不影响主进程,超时直接杀进程,升级插件不需要重启服务。

事件流水线的结构

整个系统的数据流是一条单向管道:

事件源 → Dispatcher → 插件链(按依赖拓扑排序)→ Action 执行器
drawing-1780655519784

Go 侧的 Dispatcher 是核心调度器。它内部维护一个有界 channel 作为事件队列,启动固定数量的 worker goroutine 消费事件:

type Dispatcher struct {
    registry    *plugin.Registry
    executor    *action.Executor
    queue       chan types.Event
    concurrency int
}

Emit() 是非阻塞投递——队列满了直接丢弃并记日志,不会阻塞调用方。这一点很关键,因为事件源可能是飞书长连接的回调,阻塞它会丢消息。

func (d *Dispatcher) Emit(evt types.Event) bool {
    select {
    case d.queue <- evt:
        return true
    default:
        log.Printf("[dispatcher] queue full, dropping event %s", evt.ID)
        return false
    }
}

同时还有一个 EmitSync() 用于测试场景——同步跑完整条插件链,返回聚合结果,不经过 channel 和 goroutine。

插件排序:拓扑排序 + 优先级

同一个事件可能被多个插件订阅。执行顺序由两件事决定:depends_on 声明的强依赖,以及 priority 字段。

排序算法是经典的 Kahn 拓扑排序。入度为零的节点按 priority 降序出队;同 priority 按名称字典序,保证结果稳定:

func topoSort(plugins []Plugin) ([]Plugin, error) {
    // 构建入度表和下游邻接表
    // Kahn 算法:入度为零先出队
    // 同层按 priority 降序 + name 升序
    // 输出长度 != 输入长度 → 循环依赖
}

如果依赖图有环,直接拒绝加载并报错。这个检查发生在启动阶段而不是运行时,避免事件进来后才发现链不可用。

举个例子,三个插件订阅了 feishu.message

common-filter (priority=1000, 无依赖)
feishu-auto-reply (priority=100, depends_on=[common-filter])
feishu-ai-summary (priority=50, depends_on=[common-filter])

最终执行顺序:common-filter → feishu-auto-reply → feishu-ai-summary

插件只返回"想做什么",不做真事

这是整个设计里最重要的一个决定。

插件不直接调用飞书 API、不直接请求 AI、不直接触发构建。插件只返回一个 actions 数组,描述它希望系统做什么:

{
  "ok": true,
  "handled": true,
  "actions": [
    {
      "type": "feishu.reply",
      "params": { "messageId": "om_xxx", "text": "收到" }
    }
  ]
}

Go 主进程收到后,先校验权限,再查找对应的 action handler 执行。如果插件声明了 permissions: [feishu] 但尝试返回 build.trigger 类型的 action,直接拒绝:

func (e *Executor) isAllowed(pluginName, actionType string) bool {
    allowed, ok := e.permissions[pluginName]
    if !ok {
        return true // 未配置权限 = 全部允许
    }
    for _, a := range allowed {
        if a == actionType || a == "*" {
            return true
        }
    }
    return false
}

这个模型的好处:API key 不暴露给插件进程,所有外部调用由 Go 统一限流和审计,一个出错的插件不会直接对外发消息。

handled 语义

插件链里有个特殊字段 handled。语义是:如果任一插件返回 handled: true,整条链跑完后告诉调用方"这个事件已经被处理了,不需要走内置命令"。

handled 不会中断后续插件。事件仍然经过所有订阅插件——这是设计意图,不是疏忽。原因是插件可能做旁路观察(记日志、统计),它们需要看到每一个事件。

for _, p := range plugins {
    pResult, _ := p.Handle(ctx, evt, result.PluginResults)
    result.PluginResults[p.Name()] = pResult
    if pResult.Handled {
        result.Handled = true
    }
    result.Actions = append(result.Actions, pResult.Actions...)
}

Node.js 插件的 stdio 协议

Go 启动 Node 子进程时,通过 stdin 把事件上下文写进去:

{
  "event": { "id": "evt_001", "type": "feishu.message", "payload": {...} },
  "config": { "pluginName": "feishu-auto-reply", "dataDir": "/app/extensions/feishu-auto-reply/data" },
  "previous": {}
}

Node 插件的入口是一个 onEvent 函数,处理完后把结果 JSON 写到 stdout。Go 读取 stdout、解析为 PluginResult,超时则直接杀进程。

previous 字段是前置插件的只读结果快照。如果 feishu-auto-reply 依赖 common-filter,它能在 ctx.previous["common-filter"] 里读到过滤结果。

RPC:插件调用受控能力

插件如果需要 AI 对话或 GitLab 查询,不是自己发 HTTP 请求,而是通过 runner 向 Go 发 RPC:

{ "kind": "rpc", "id": "rpc_001", "method": "ai.chat", "params": { "prompt": "..." } }

Go 校验插件权限、执行调用、返回结果。这样做的效果是:即使插件代码被第三方编写,它也拿不到真实的 API key,所有调用都在 Go 侧统一记录和限流。

TypeScript:类型擦除,不编译

Node.js 22 原生支持运行 TypeScript,但只做类型擦除,不做完整编译。因此插件规范有几条约束:

  • 使用 .ts,ESM 格式。

  • 不用 enumnamespace、decorator。

  • 不用 TS path alias。

  • 类型检查交给 IDE 或 tsc --noEmit

这样就不需要在运行时引入 esbuild 或 tsx,减少一个依赖层。

安全边界

Node 子进程不是强沙箱。但通过几层约束可以把风险控制在合理范围:

  • 插件只能读写自己的 data/ 目录。

  • 启动时开启 Node --permission 模型,显式限制文件系统访问范围。

  • 不暴露飞书、GitLab、AI 的 token 给插件进程。

  • 所有危险操作通过 Go action handler 执行,权限在 plugin.yml 中声明。

不是完美隔离,但对于内部工具、可信维护者场景,这个力度足够。

并发粒度

并发放在事件级别,不是插件级别。不同事件可以被不同 goroutine 并行处理;同一事件内的插件链串行执行。

event A: filter → reply → summary  ← goroutine 1
event B: filter → reply → summary  ← goroutine 2
event C: filter → notify           ← goroutine 3

这样同时满足了吞吐量和依赖顺序。max_concurrency 控制最大并行事件数,queue_size 控制背压上限。

整体编排:App 把一切串起来

最顶层是 App 结构体,把所有注册表和调度器组装到一起:

app, _ := app.New("config.yaml",
    app.WithConcurrency(4),
    app.WithQueueSize(128),
)
app.RegisterPlugin(myPlugin)
app.RegisterAction("feishu.reply", feishuReplyHandler)
app.RegisterSource(feishuSource)
app.RegisterChannel(feishuChannel)
app.Serve(ctx)

Serve() 启动 dispatcher worker、scheduler、所有 channel 和事件源,然后阻塞等 SIGTERM。收到信号后按顺序关闭:先停事件源(不再产生新事件)、再停 channel、停 scheduler、最后 drain dispatcher 队列。

要知其然也要知其所以然——这个框架的核心不是某个具体实现,而是一个原则:让插件只做判断,让主进程掌握执行权