做 CI/CD 机器人类工具的时候,总会遇到一个问题:核心逻辑用 Go 写得很稳,但每次要加一个新的自动化规则——比如飞书群里回复特定消息、GitLab issue 关闭后自动整理内容——都得改 Go 代码、重新编译、重新部署。改动本身可能只有十行业务判断,但部署链路的摩擦远大于逻辑本身。
这就是 responsevent 要解决的:把"业务判断"和"能力执行"拆开,Go 负责事件调度和能力执行,Node.js/TypeScript 插件只负责判断和返回动作声明。
不是微服务,不是 Webhook,是子进程
第一个要回答的问题是:插件用什么方式跑?
常见选择有三种:HTTP 微服务、嵌入式脚本引擎(goja)、子进程。HTTP 微服务太重,每个插件都要独立部署和服务发现;goja 只支持 ES5,生态割裂严重。而我们的 Docker 镜像本身基于 node:22-slim,Node 运行时已经在那了。
因此 responsevent 选择了子进程模型:一次事件触发一个 Node 子进程,通过 stdin 发 JSON、stdout 收 JSON,进程用完即退。
这个选择的代价是每次事件有 Node 启动开销。但当前场景是飞书消息、GitLab webhook、构建事件,频率不高,启动开销完全可以接受。换来的好处是隔离彻底——插件内存泄漏不影响主进程,超时直接杀进程,升级插件不需要重启服务。
事件流水线的结构
整个系统的数据流是一条单向管道:
事件源 → Dispatcher → 插件链(按依赖拓扑排序)→ Action 执行器
Go 侧的 Dispatcher 是核心调度器。它内部维护一个有界 channel 作为事件队列,启动固定数量的 worker goroutine 消费事件:
type Dispatcher struct {
registry *plugin.Registry
executor *action.Executor
queue chan types.Event
concurrency int
}
Emit() 是非阻塞投递——队列满了直接丢弃并记日志,不会阻塞调用方。这一点很关键,因为事件源可能是飞书长连接的回调,阻塞它会丢消息。
func (d *Dispatcher) Emit(evt types.Event) bool {
select {
case d.queue <- evt:
return true
default:
log.Printf("[dispatcher] queue full, dropping event %s", evt.ID)
return false
}
}
同时还有一个 EmitSync() 用于测试场景——同步跑完整条插件链,返回聚合结果,不经过 channel 和 goroutine。
插件排序:拓扑排序 + 优先级
同一个事件可能被多个插件订阅。执行顺序由两件事决定:depends_on 声明的强依赖,以及 priority 字段。
排序算法是经典的 Kahn 拓扑排序。入度为零的节点按 priority 降序出队;同 priority 按名称字典序,保证结果稳定:
func topoSort(plugins []Plugin) ([]Plugin, error) {
// 构建入度表和下游邻接表
// Kahn 算法:入度为零先出队
// 同层按 priority 降序 + name 升序
// 输出长度 != 输入长度 → 循环依赖
}
如果依赖图有环,直接拒绝加载并报错。这个检查发生在启动阶段而不是运行时,避免事件进来后才发现链不可用。
举个例子,三个插件订阅了 feishu.message:
common-filter (priority=1000, 无依赖)
feishu-auto-reply (priority=100, depends_on=[common-filter])
feishu-ai-summary (priority=50, depends_on=[common-filter])
最终执行顺序:common-filter → feishu-auto-reply → feishu-ai-summary。
插件只返回"想做什么",不做真事
这是整个设计里最重要的一个决定。
插件不直接调用飞书 API、不直接请求 AI、不直接触发构建。插件只返回一个 actions 数组,描述它希望系统做什么:
{
"ok": true,
"handled": true,
"actions": [
{
"type": "feishu.reply",
"params": { "messageId": "om_xxx", "text": "收到" }
}
]
}
Go 主进程收到后,先校验权限,再查找对应的 action handler 执行。如果插件声明了 permissions: [feishu] 但尝试返回 build.trigger 类型的 action,直接拒绝:
func (e *Executor) isAllowed(pluginName, actionType string) bool {
allowed, ok := e.permissions[pluginName]
if !ok {
return true // 未配置权限 = 全部允许
}
for _, a := range allowed {
if a == actionType || a == "*" {
return true
}
}
return false
}
这个模型的好处:API key 不暴露给插件进程,所有外部调用由 Go 统一限流和审计,一个出错的插件不会直接对外发消息。
handled 语义
插件链里有个特殊字段 handled。语义是:如果任一插件返回 handled: true,整条链跑完后告诉调用方"这个事件已经被处理了,不需要走内置命令"。
但 handled 不会中断后续插件。事件仍然经过所有订阅插件——这是设计意图,不是疏忽。原因是插件可能做旁路观察(记日志、统计),它们需要看到每一个事件。
for _, p := range plugins {
pResult, _ := p.Handle(ctx, evt, result.PluginResults)
result.PluginResults[p.Name()] = pResult
if pResult.Handled {
result.Handled = true
}
result.Actions = append(result.Actions, pResult.Actions...)
}
Node.js 插件的 stdio 协议
Go 启动 Node 子进程时,通过 stdin 把事件上下文写进去:
{
"event": { "id": "evt_001", "type": "feishu.message", "payload": {...} },
"config": { "pluginName": "feishu-auto-reply", "dataDir": "/app/extensions/feishu-auto-reply/data" },
"previous": {}
}
Node 插件的入口是一个 onEvent 函数,处理完后把结果 JSON 写到 stdout。Go 读取 stdout、解析为 PluginResult,超时则直接杀进程。
previous 字段是前置插件的只读结果快照。如果 feishu-auto-reply 依赖 common-filter,它能在 ctx.previous["common-filter"] 里读到过滤结果。
RPC:插件调用受控能力
插件如果需要 AI 对话或 GitLab 查询,不是自己发 HTTP 请求,而是通过 runner 向 Go 发 RPC:
{ "kind": "rpc", "id": "rpc_001", "method": "ai.chat", "params": { "prompt": "..." } }
Go 校验插件权限、执行调用、返回结果。这样做的效果是:即使插件代码被第三方编写,它也拿不到真实的 API key,所有调用都在 Go 侧统一记录和限流。
TypeScript:类型擦除,不编译
Node.js 22 原生支持运行 TypeScript,但只做类型擦除,不做完整编译。因此插件规范有几条约束:
使用
.ts,ESM 格式。不用
enum、namespace、decorator。不用 TS path alias。
类型检查交给 IDE 或
tsc --noEmit。
这样就不需要在运行时引入 esbuild 或 tsx,减少一个依赖层。
安全边界
Node 子进程不是强沙箱。但通过几层约束可以把风险控制在合理范围:
插件只能读写自己的
data/目录。启动时开启 Node
--permission模型,显式限制文件系统访问范围。不暴露飞书、GitLab、AI 的 token 给插件进程。
所有危险操作通过 Go action handler 执行,权限在
plugin.yml中声明。
不是完美隔离,但对于内部工具、可信维护者场景,这个力度足够。
并发粒度
并发放在事件级别,不是插件级别。不同事件可以被不同 goroutine 并行处理;同一事件内的插件链串行执行。
event A: filter → reply → summary ← goroutine 1
event B: filter → reply → summary ← goroutine 2
event C: filter → notify ← goroutine 3
这样同时满足了吞吐量和依赖顺序。max_concurrency 控制最大并行事件数,queue_size 控制背压上限。
整体编排:App 把一切串起来
最顶层是 App 结构体,把所有注册表和调度器组装到一起:
app, _ := app.New("config.yaml",
app.WithConcurrency(4),
app.WithQueueSize(128),
)
app.RegisterPlugin(myPlugin)
app.RegisterAction("feishu.reply", feishuReplyHandler)
app.RegisterSource(feishuSource)
app.RegisterChannel(feishuChannel)
app.Serve(ctx)
Serve() 启动 dispatcher worker、scheduler、所有 channel 和事件源,然后阻塞等 SIGTERM。收到信号后按顺序关闭:先停事件源(不再产生新事件)、再停 channel、停 scheduler、最后 drain dispatcher 队列。
要知其然也要知其所以然——这个框架的核心不是某个具体实现,而是一个原则:让插件只做判断,让主进程掌握执行权。

参与讨论
(Participate in the discussion)
参与讨论