开发指南
Monstrum 插件开发指南
构建、测试和分发 Monstrum 插件的完整参考文档。
目录
- 架构概览
- 设计哲学
- 权限模型
- 快速开始
- 插件结构
- 插件清单 (
monstrum.yaml) - 执行器实现
- 国际化 (i18n)
- PluginClient — 跨插件组合
- Platform SDK
- 插件信任与安全模型
- 测试
- 常见陷阱
- 打包与分发
- 完整参考:GitHub 插件
- API 参考
架构概览
Monstrum 是一个 AI Agent 管控平台。Bot 的每一次工具调用都经过严格的权限管道:
用户 → Gateway → Session → LLM → ToolResolver → Guardian → Executor → Auditor
(LLM 之前) (LLM 之后) (你的代码)
插件向平台添加一个新的 ResourceType。ResourceType 是完整的声明式契约:
| 声明 | 用途 | 消费方 |
|---|---|---|
tools[] | LLM 可调用的工具定义 | ToolCatalog → LLM |
scope_dimensions[] | 权限检查规则 | Guardian(自动检查) |
auth_methods[] | 支持的凭据获取方式 | 前端(自动渲染 UI) |
credential_schema[] | 凭据字段定义 | 前端(自动渲染表单) |
config_schema[] | 资源配置字段定义 | 前端(自动渲染表单) |
平台基于这些声明驱动所有行为。你只需编写一个 Executor 类实现实际的 API 调用;其他一切——权限执行、UI 渲染、审计日志、凭据加密——都由平台处理。
三层资源模型
ResourceType → Resource (+Credential) → Bot
(你的插件) (管理员配置) (通过 BotResource 授权访问)
- ResourceType:你的插件是什么——工具定义、权限、认证方式。
- Resource:一个具体实例——例如”公司的 GitHub”,包含 API URL 和凭据。
- Bot:一个 AI Agent,通过 BotResource 绑定到 Resource,带有权限约束。
设计哲学
为什么是这个架构?
Monstrum 的插件模型建立在一个核心信念之上:平台强制执行权限,而不是靠 AI 自律。LLM 不能被信任自行管理工具使用。平台必须保证,无论 LLM 生成什么,权限检查、审计追踪和 Scope 约束都一定会发生。
这导出了三个设计原则:
1. 声明式优于命令式
插件声明需要什么——工具、权限、认证方式——平台决定如何执行。在清单中添加一条 scope_dimensions 就足以获得参数级权限检查,你不需要写任何授权代码。这消除了一整类 Bug:插件作者忘记检查权限,或者权限检查实现有误。
2. 数据平面与控制平面分离
你的执行器处理数据平面:发起 API 调用、解析响应、返回结果。平台处理控制平面:Bot 能看到哪些工具、参数是否在 Scope 内、注入哪些凭据、记录什么日志。你的代码永远不会直接看到凭据——它们被预注入到 ExecuteRequest.credential_fields 中。你的代码永远不会执行权限检查——Guardian 在你的 handler 被调用之前就已经做了。
3. 约定驱动的集成
一个插件就是一个包含 monstrum.yaml 和 executor.py 的目录。清单驱动一切:前端从 credential_schema 自动渲染凭据表单,ToolCatalog 从 tools[] 索引工具,Guardian 从 scope_dimensions[] 评估权限。这意味着添加一个新的集成不需要修改平台代码——清单就是完整的契约。
插件是什么(和不是什么)
插件是:
- 外部 API 与平台执行模型之间的一层薄适配器
- 描述集成能力和约束的声明式清单
- 将
ExecuteRequest转换为 API 调用并返回ExecuteResult的无状态请求处理器
插件不是:
- 通用 Python 应用程序(没有后台线程、没有启动钩子、没有全局状态)
- 负责安全或审计的(平台处理两者)
- 感知 LLM、用户或会话的(你的 handler 只看到当前的工具调用)
权限模型
在构建插件之前理解权限模型至关重要,因为它决定了你在清单中声明什么、在执行器代码中可以省略什么。
RBAC + 声明式 ABAC 混合模型
Monstrum 使用混合授权模型:
-
RBAC(基于角色的访问控制) 管理 Bot 可以执行哪些操作。管理员为 BotResource 绑定分配角色,角色的
allowed_operations和allowed_tools决定 Bot 能看到哪些工具。这是粗粒度的门控——Bot 要么有issue.read的权限,要么没有。 -
声明式 ABAC(基于属性的访问控制) 管理在已授权操作中哪些参数值是允许的。这就是
scope_dimensions发挥作用的地方。即使 Bot 被授权执行issue.read,它的 Scope 可能被限制为repos: ["myorg/*"],意味着它只能读取匹配该模式的仓库中的 Issue。
对插件开发者的关键启示:你声明 ABAC 规则,平台执行它们。你的 scope_dimensions 条目定义属性、匹配模式和错误模板。Guardian 自动评估——你不需要自己调用 check_scope()。
三层工具权限
第一层:ToolResolver(LLM 之前)
在 LLM 看到任何工具之前,ToolResolver 根据 Bot 的 BotResource 绑定过滤工具列表。LLM 只看到 Bot 被授权使用的工具。
由以下控制:RolePermissions.allowed_operations(glob 模式如 issue.*)和 RolePermissions.allowed_tools(glob 模式如 github_*)。
第二层:Guardian(LLM 之后)
LLM 选择工具并提供参数后,Guardian 根据 scope_dimensions 声明和 RolePermissions.scope_constraints 验证参数。
这就是你的 scope_dimensions 发挥作用的地方。Guardian 调用 check_scope_declarative():
- 遍历你的
scope_dimensions - 对每个维度,通过
param_paths提取参数值 - 检查该值是否匹配
scope_constraints[key]中的任何条目 - 不匹配则返回 Scope 违规
第三层:委派 Scope(Bot 间)
当 Bot A 调用 Bot B(通过 BotExecutor)时,BotResource 绑定可以携带 DelegateConstraints,进一步限制 Bot B 能做什么:
class DelegateConstraints:
allowed_tools: list[str] | None # LLM 之前:fnmatch 工具过滤
scope_constraints: dict[str, list[str]] | None # LLM 之后:与自身 Scope 取交集
这防止了混淆代理攻击:即使 Bot B 有广泛的 GitHub 访问权限,来自 Bot A 绑定的委派约束也可以将 Bot B 限制为只能使用 github_list_* 工具,且只能访问 public-org/* 仓库。
声明式 Scope 检查
对于插件,Scope 检查是完全声明式的——在清单中声明 scope_dimensions,平台处理其余一切:
# 示例:按项目和 Issue 类型限制
scope_dimensions:
- key: projects
param_paths: [project, project_key]
match_mode: pattern
error_template: "项目 {value} 不在授权范围内"
- key: issue_types
param_paths: [issue_type]
match_mode: exact
operation_filter: "issue.write"
error_template: "Issue 类型 {value} 不被允许"
管理员然后配置 BotResource:
{
"scope_constraints": {
"projects": ["PROJ-*", "INFRA"],
"issue_types": ["Bug", "Task"]
}
}
Bot 只能访问匹配 PROJ-* 或 INFRA 的项目,且只能创建 Bug 或 Task 类型的 Issue。
委派 Scope(Bot 间)
当你的插件允许 Bot 间通信时,委派 Scope 防止混淆代理攻击:
{
"delegate": {
"allowed_tools": ["github_list_*"],
"scope_constraints": {
"repos": ["public-org/*"]
}
}
}
这意味着:当此 Bot 调用另一个 Bot 时,被调用的 Bot 只能使用 GitHub 列表类工具,且只能操作 public-org/* 仓库,无论其自身权限如何。
快速开始
用 3 个文件创建一个 Jira 集成插件:
1. 目录结构
plugins/
└── jira/
├── monstrum.yaml # 插件清单
├── executor.py # 执行器实现
└── locales/
├── en-US.json # 英文翻译
└── zh-CN.json # 中文翻译
2. monstrum.yaml
name: jira
version: 1.0.0
description: Jira integration plugin — issues, projects, and transitions
author: Your Name
license: MIT
resource_type:
id: jira
name: Jira
mode: plugin
tool_discovery: static
auth_flow: manual
credential_schema:
- field: api_token
type: secret
required: true
description: "Jira API Token"
- field: email
type: string
required: true
description: "Jira account email"
config_schema:
- field: api_base
type: url
required: true
description: "Jira instance URL (e.g., https://yourcompany.atlassian.net)"
auth_methods:
- method: api_key
label: API Token
description: "Authenticate with email + API token"
credential_schema:
- field: api_token
type: secret
required: true
- field: email
type: string
required: true
tools:
- name: jira_list_issues
description: "List issues from a Jira project."
operation: issue.read
input_schema:
type: object
properties:
project:
type: string
description: "Project key (e.g., PROJ)"
status:
type: string
description: "Filter by status"
max_results:
type: integer
default: 50
required: [project]
- name: jira_create_issue
description: "Create a new Jira issue."
operation: issue.write
input_schema:
type: object
properties:
project:
type: string
description: "Project key"
summary:
type: string
description: "Issue summary"
description:
type: string
description: "Issue description"
issue_type:
type: string
default: Task
description: "Issue type (Task, Bug, Story, etc.)"
required: [project, summary]
scope_dimensions:
- key: projects
param_paths: [project]
match_mode: pattern
error_template: "Project {value} is not authorized"
executor:
module: executor
class_name: JiraExecutor
3. executor.py
from __future__ import annotations
import logging
import httpx
from monstrum_sdk import ExecuteRequest, ExecuteResult, HttpExecutorBase
logger = logging.getLogger(__name__)
class JiraExecutor(HttpExecutorBase):
resource_type = "jira"
default_api_base = "" # 通过 config_schema 按资源设置
default_headers = {
"Accept": "application/json",
"Content-Type": "application/json",
}
supported_operations = ["issue.read", "issue.write"]
OPERATION_HANDLERS = {
"issue.read": "_handle_issue_read",
"issue.write": "_handle_issue_write",
}
# ── 认证覆盖(Jira 使用 Basic Auth,不是 Bearer)──
def _build_auth_headers(self, request: ExecuteRequest) -> dict[str, str]:
import base64
headers = dict(self.default_headers)
if request.credential_fields:
email = request.credential_fields.get("email", "")
token = request.credential_fields.get("api_token", "")
if email and token:
encoded = base64.b64encode(f"{email}:{token}".encode()).decode()
headers["Authorization"] = f"Basic {encoded}"
return headers
# ── 错误处理 ──
async def handle_execute_error(
self, request: ExecuteRequest, error: Exception
) -> ExecuteResult:
if isinstance(error, httpx.HTTPStatusError):
logger.error(f"Jira API error: {error}")
return ExecuteResult.error_result(
f"Jira API error: {error.response.status_code}"
)
return await super().handle_execute_error(request, error)
# ── Handler ──
async def _handle_issue_read(
self, request: ExecuteRequest
) -> ExecuteResult:
project = request.params.get("project", "")
status = request.params.get("status")
max_results = request.params.get("max_results", 50)
jql = f"project = {project}"
if status:
jql += f" AND status = \"{status}\""
data = await self._http_get(
request,
"/rest/api/3/search",
params={"jql": jql, "maxResults": max_results},
)
return ExecuteResult.success_result(data)
async def _handle_issue_write(
self, request: ExecuteRequest
) -> ExecuteResult:
project = request.params.get("project", "")
summary = request.params.get("summary", "")
description = request.params.get("description", "")
issue_type = request.params.get("issue_type", "Task")
data = await self._http_post(
request,
"/rest/api/3/issue",
json={
"fields": {
"project": {"key": project},
"summary": summary,
"description": {
"type": "doc",
"version": 1,
"content": [{
"type": "paragraph",
"content": [{"type": "text", "text": description}],
}],
},
"issuetype": {"name": issue_type},
},
},
)
return ExecuteResult.success_result(data)
搞定。将 jira/ 目录放到 plugins/ 下,平台在启动时会自动发现并注册。
插件结构
plugins/{plugin_name}/
├── monstrum.yaml # 必需:插件清单
├── executor.py # 必需:执行器实现
├── __init__.py # 可选:包初始化
├── locales/ # 可选:i18n 翻译
│ ├── en-US.json
│ └── zh-CN.json
└── requirements.txt # 可选:额外 pip 依赖
自动发现
启动时,PluginManager.scan_and_load_all() 扫描 plugins/ 目录,查找包含 monstrum.yaml 的子目录。对每个插件执行:
- 解析并验证清单
- 将
ResourceType写入数据库(工具、Scope、认证方式、Schema) - 通过
importlib加载执行器类 - 实例化执行器并注册到
ExecutorRegistry - 重新加载
ToolCatalog,使工具对 LLM 可用
支持通过 PluginManager.reload_plugin(name) 热重载。
插件清单 (monstrum.yaml)
顶层字段
| 字段 | 类型 | 必需 | 描述 |
|---|---|---|---|
name | string | 是 | 唯一插件名(小写字母、数字和连字符) |
version | string | 是 | 语义化版本号(如 1.0.0) |
description | string | 是 | 可读描述 |
author | string | 是 | 作者名 |
license | string | 否 | 许可证标识(默认 MIT) |
tags | list[string] | 否 | 可搜索标签 |
homepage | string | 否 | 项目主页 URL |
repository | string | 否 | 源代码仓库 URL |
locales_dir | string | 否 | 翻译文件目录(默认 locales) |
resource_type | object | 是 | ResourceType 声明(见下文) |
executor | object | 是 | 执行器加载配置(见下文) |
resource_type — ResourceType 声明
| 字段 | 类型 | 必需 | 默认值 | 描述 |
|---|---|---|---|---|
id | string | 是 | — | 唯一类型标识符(如 github、jira),必须与 executor.resource_type 匹配 |
name | string | 是 | — | UI 中显示的名称 |
mode | string | 否 | plugin | plugin、endpoint 或 system |
description | string | 否 | "" | 描述 |
icon | string | 否 | "" | 前端图标标识 |
auth_flow | string | 否 | manual | oauth、manual 或 none |
tool_discovery | string | 否 | static | static、dynamic 或 configured(见 tool_discovery 模式) |
tools | list[ToolDef] | 否 | [] | 工具定义 |
scope_dimensions | list[ScopeDimension] | 否 | [] | 权限维度 |
auth_methods | list[AuthMethodDef] | 否 | [] | 支持的认证方式 |
config_schema | list[FieldDef] | 否 | [] | 资源配置字段 |
credential_schema | list[FieldDef] | 否 | [] | 凭据字段 |
tools — 工具定义
tools 列表中的每个条目定义一个 LLM 可调用的工具:
tools:
- name: github_list_issues # 全局唯一的工具名
description: "List issues..." # 展示给 LLM 的描述
operation: issue.read # 映射到 OPERATION_HANDLERS 的键
input_schema: # 参数的 JSON Schema
type: object
properties:
repo:
type: string
description: "owner/repo format"
required: [repo]
output_schema: null # 可选:输出的 JSON Schema
cost: # 可选:计费信息
tokens: 0
credits: 0.0
关键规则:
name必须全局唯一。命名约定:{resource_type}_{action}(如jira_list_issues)。operation映射到执行器OPERATION_HANDLERS字典的键。- 多个工具可以共享同一个
operation——在 handler 中使用request.tool_name区分(见 tool_name 路由)。 input_schema遵循 JSON Schema 规范,传递给 LLM 用于函数调用。
scope_dimensions — 权限维度
Scope 维度定义哪些参数需要接受权限检查。Guardian 在 LLM 选择工具后自动评估——你不需要编写任何权限检查代码。
scope_dimensions:
- key: repos # Scope 字典中的键
param_paths: [repo, owner_repo] # 要提取值的参数名
match_mode: pattern # 匹配策略
operation_filter: "issue.*" # 只对这些操作生效(glob)
error_template: "仓库 {value} 不在授权范围内"
| 字段 | 类型 | 必需 | 默认值 | 描述 |
|---|---|---|---|---|
key | string | 是 | — | Scope 约束字典中的键(如 repos、projects、domains) |
param_paths | list[string] | 是 | — | 要提取值的参数名(按顺序尝试) |
match_mode | string | 否 | pattern | pattern(fnmatch glob)、path(文件系统路径)、exact(字符串相等) |
operation_filter | string | 否 | null | 限制此维度适用于哪些操作的 glob 模式 |
error_template | string | 否 | "" | 含 {value} 占位符的错误消息 |
匹配模式:
pattern— fnmatch glob 匹配。"myorg/*"匹配"myorg/repo","*"匹配一切。path— 文件系统路径匹配。支持**(递归)、/*(单级)、前缀匹配。exact— 仅字符串相等。
运行时工作流程:
- 管理员配置 BotResource 的 scope:
{"repos": ["myorg/*", "otherorg/public-*"]} - Bot 调用
github_list_issues(repo="myorg/myrepo") - Guardian 通过
param_paths提取repo的值 - Guardian 根据
match_mode将"myorg/myrepo"与["myorg/*", "otherorg/public-*"]匹配 - 匹配成功 → 工具调用继续。不匹配 → Scope 违规返回给 LLM。
auth_methods — 认证方式
auth_methods:
- method: oauth2_auth_code
label: OAuth Login
description: "Authorize via GitHub OAuth"
credential_schema:
- field: access_token
type: secret
required: true
oauth_config:
authorization_url: "https://github.com/login/oauth/authorize"
token_url: "https://github.com/login/oauth/access_token"
scopes: [repo, "read:org"]
pkce_required: false
- method: token
label: Personal Access Token
description: "Enter a PAT manually"
credential_schema:
- field: access_token
type: secret
required: true
支持的 method 值:
| 方法 | 描述 |
|---|---|
oauth2_auth_code | OAuth 2.0 授权码(浏览器重定向) |
oauth2_client_creds | OAuth 2.0 客户端凭据(M2M) |
oauth2_device_code | OAuth 2.0 设备码(CLI/IoT) |
api_key | API Key 认证 |
token | Bearer Token |
ssh_key | SSH 密钥对 |
basic | HTTP Basic 认证 |
none | 无需认证 |
前端根据你的 auth_methods 声明自动渲染相应的凭据表单。对于 OAuth 方式,平台处理完整的流程(重定向、Token 交换、刷新)。
oauth_config 字段:
| 字段 | 类型 | 描述 |
|---|---|---|
authorization_url | string | OAuth /authorize 端点 |
token_url | string | OAuth /token 端点 |
scopes | list[string] | 默认请求的 Scope |
pkce_required | bool | 是否强制 PKCE |
device_authorization_url | string | 设备码流端点 |
credential_schema / config_schema — 字段定义
两者使用相同的 FieldDef 结构:
credential_schema:
- field: access_token
type: secret
required: true
description: "API access token"
config_schema:
- field: api_base
type: url
required: false
default: "https://api.example.com"
description: "API base URL"
- field: region
type: enum
required: true
enum_values: [us, eu, ap]
description: "API region"
| 字段 | 类型 | 必需 | 默认值 | 描述 |
|---|---|---|---|---|
field | string | 是 | — | 字段名(用作 credential_fields / resource_config 中的键) |
type | string | 是 | — | string、integer、secret、url、enum |
required | bool | 否 | true | 是否必填 |
default | any | 否 | null | 默认值 |
enum_values | list[string] | 否 | null | 有效值(当 type: enum 时) |
description | string | 否 | "" | UI 中显示的帮助文本 |
credential_schema字段在数据库中使用 加密存储,永远不会暴露给 Bot。config_schema字段以明文存储(用于非敏感配置如 API URL)。
executor — 执行器加载配置
executor:
module: executor # Python 模块名(相对于插件目录)
class_name: GitHubExecutor # 可选:要加载的类(不指定则自动检测)
如果省略 class_name,加载器会扫描模块中第一个 ExecutorBase 子类。
tool_discovery 模式
tool_discovery 字段控制工具如何注册:
static(默认) — 工具在清单中定义,启动时加载。适用于绝大多数插件。ToolCatalog 在加载时一次性索引你的 tools[]。
dynamic — 工具在运行时按资源实例注册。清单不声明 tools[];工具通过 ToolCatalog.register_dynamic_tools(resource_id, tools) 动态注册和注销。
动态工具的执行流程:
- 外部实体连接并为特定
resource_id注册工具 ToolCatalog.register_dynamic_tools(resource_id, tool_defs)存储工具- ToolResolver 检测到
tool_discovery: dynamic,调用catalog.get_resource_tools(resource_id)而非catalog.get_type_tools(type_id) - 每个资源实例拥有独立的工具列表
- 实体断开连接时,
ToolCatalog.unregister_dynamic_tools(resource_id)移除工具
两个内置执行器使用动态发现:
monstrum-agent— Monstrum Agent — 外部 Agent 通过 WebSocket(/ws/agent)连接,使用 API Key 认证后注册工具定义。平台将这些工具提供给绑定到该 Agent 资源的 Bot。Agent 断开连接时工具自动注销。mcp— 平台在启动时(以及资源/凭据创建或更新时)自动从 MCP 服务器发现工具。每个 MCP 服务器暴露各自的工具集,按资源实例独立注册。发现的工具持久化到凭据记录(discovered_tools_json、discovery_status、last_discovery_at),重启后无需重连即可恢复。
对于两种动态类型,Bot 绑定使用 allowed_tools(glob 模式)做逐工具权限控制。前端在绑定时显示发现的工具复选框,而非操作复选框。
何时使用 dynamic: 仅当每个资源实例在运行时暴露不同的工具集时。如果你的工具是固定的,使用 static。
configured — 工具按资源配置定义(存储在 resource config 中)。插件不常用。
执行器实现
ExecutorBase — 抽象基类
每个执行器都继承 ExecutorBase(或 HttpExecutorBase)。从 SDK 导入:
from monstrum_sdk import ExecutorBase, ExecuteRequest, ExecuteResult, ExecuteStatus
需要设置的类变量:
class MyExecutor(ExecutorBase):
resource_type = "my_plugin" # 必须与清单中的 resource_type.id 匹配
supported_operations = [ # 此执行器处理的操作
"data.read",
"data.write",
]
OPERATION_HANDLERS = { # operation → handler 方法名
"data.read": "_handle_read",
"data.write": "_handle_write",
}
HttpExecutorBase — HTTP API 基类
对于调用 REST API 的插件(绝大多数),改为继承 HttpExecutorBase。它提供:
- 自动认证头 — 从凭据中获取 Bearer Token
- 401 自动刷新 — Token 过期时调用
credential_refresh()并重试 - HTTP 便捷方法 —
_http_get、_http_post、_http_patch、_http_delete - 分页 — 通过
_paginate()支持 GitHub 风格的 Link header 分页
from monstrum_sdk import HttpExecutorBase
class MyExecutor(HttpExecutorBase):
resource_type = "my_plugin"
default_api_base = "https://api.example.com" # 所有请求的基础 URL
default_headers = {"Accept": "application/json"} # 合并到每个请求
default_timeout = 30.0 # httpx 超时(秒)
HTTP 方法:
# GET — 返回解析后的 JSON
data = await self._http_get(request, "/endpoint", params={"key": "val"})
# POST — 返回解析后的 JSON
data = await self._http_post(request, "/endpoint", json={"key": "val"})
# PATCH — 返回解析后的 JSON
data = await self._http_patch(request, "/endpoint", json={"key": "val"})
# DELETE — 返回原始 httpx.Response
resp = await self._http_delete(request, "/endpoint")
# 分页(GitHub 风格 Link header)— 返回扁平列表
all_items = await self._paginate(request, "/items", per_page=100)
# 底层方法(完全控制)
result = await self._http_request(
request,
method="PUT",
path="/endpoint",
json={"key": "val"},
raw_response=False, # True → 返回 httpx.Response 而非 JSON
)
覆盖认证头:
如果你的 API 不使用 Bearer Token(如 Basic Auth、Header 中的 API Key),覆盖 _build_auth_headers():
def _build_auth_headers(self, request: ExecuteRequest) -> dict[str, str]:
headers = dict(self.default_headers)
if request.credential_fields:
api_key = request.credential_fields.get("api_key", "")
headers["X-API-Key"] = api_key
return headers
覆盖 API 基础 URL:
API 基础 URL 优先从 request.resource_config["api_base"] 获取,否则使用 default_api_base。覆盖 _get_api_base() 实现自定义逻辑:
def _get_api_base(self, request: ExecuteRequest) -> str:
if request.resource_config:
region = request.resource_config.get("region", "us")
return f"https://api.{region}.example.com"
return self.default_api_base
Web3ExecutorBase — EVM 链上操作基类
如果你的插件需要与 EVM 兼容链(Ethereum、Polygon、Base、Arbitrum 等)交互,请继承 Web3ExecutorBase。它提供:
- Web3 实例管理 — 按 RPC URL 缓存,从 resource config 自动配置
- 账户管理 — 私钥处理(永远不暴露给 LLM)
- ERC20 标准 ABI — 内置常用 Token 操作 ABI
- Gas 价格守卫 — 可配置最大 gas 价格上限
- 异步包装 — 所有同步 web3.py 调用通过
asyncio.to_thread()包装
from monstrum_sdk import Web3ExecutorBase
class MyDeFiExecutor(Web3ExecutorBase):
resource_type = "my_defi"
supported_operations = ["swap", "provide_liquidity"]
async def _handle_swap(self, request):
w3 = self._w3(request) # 缓存的 Web3 实例
account = self._get_account(request) # 从凭据 private_key 构建
# 使用 w3 和 account 执行 DeFi 操作...
原语方法(均为异步,内部使用 asyncio.to_thread):
| 方法 | 说明 |
|---|---|
_get_balance(request, address, token_address?) | 原生或 ERC20 Token 余额 |
_transfer(request, to, value) | 原生代币转账(value 单位为 ether) |
_call_contract(request, contract, abi, function, args?) | 只读合约调用 |
_send_transaction(request, contract, abi, function, args?, value?) | 写合约调用 |
_get_transaction(request, tx_hash) | 交易详情 + 回执 |
_read_events(request, contract, abi, event, from_block, to_block) | 事件日志读取 |
_estimate_gas(request, to, value?, data?) | Gas 估算 |
_wait_for_receipt(request, tx_hash, timeout?) | 等待交易确认 |
ExecuteRequest — 请求对象
每个 handler 接收一个包含所有上下文的 ExecuteRequest:
@dataclass
class ExecuteRequest:
request_id: str # 唯一请求 ID(用于审计追踪)
bot_id: str # 执行操作的 Bot
task_id: str # 任务 ID(用于分组关联调用)
operation: str # 操作名(如 "issue.read")
params: dict[str, Any] # LLM 提供的工具参数
# 凭据(对 Bot 不可见——由平台注入)
credential_value: str | None # 遗留:纯字符串凭据
credential_fields: dict[str, str] | None # 结构化凭据字段
# Scope 和配置
scope: dict[str, Any] | None # 权限 Scope 约束
resource_config: dict[str, Any] | None # 资源配置
# 高级
credential_refresh: Any # async () -> dict | None(OAuth 刷新)
resource_id: str | None # 资源 ID(多资源路由)
tool_name: str # 原始工具名(同 operation 内分发)
delegate: Any # DelegateConstraints(Bot 间委派)
ExecuteResult — 结果对象
Handler 返回 ExecuteResult。使用工厂方法:
# 成功 — 数据返回给 LLM
return ExecuteResult.success_result({"issues": [...]})
# 错误 — 错误消息返回给 LLM
return ExecuteResult.error_result("Repository not found")
# Scope 违规 — 作为权限拒绝处理
return ExecuteResult.scope_violation("Domain not in allowed list")
错误语义
平台区分三种结果类型,LLM 对每种看到不同的反馈:
| 结果类型 | 工厂方法 | LLM 反馈 | 审计状态 | 何时使用 |
|---|---|---|---|---|
| 成功 | ExecuteResult.success_result(data) | 工具结果数据 | SUCCESS | 正常成功执行 |
| 执行错误 | ExecuteResult.error_result(msg) | 错误消息字符串 | FAILURE | API 失败、无效输入、运行时错误 |
| Scope 违规 | ExecuteResult.scope_violation(reason) | "Scope violation: {reason}" | FAILURE | 参数超出授权 Scope |
还有第四种类型,发生在你的执行器被调用之前:
| 结果类型 | 来源 | LLM 反馈 | 何时 |
|---|---|---|---|
| 权限拒绝 | Guardian(执行前) | "Permission denied: {reason}" | 操作/工具未被角色授权 |
error_result 和 scope_violation 的选择:
仅在参数违反管理员配置的 Scope 约束时使用 scope_violation()——它表示授权问题。其他情况一律使用 error_result():API 错误、无效输入、资源未找到、网络故障。
实际上,大多数插件只使用 error_result(),因为声明式 scope_dimensions 已经通过 Guardian 自动处理 Scope 违规。你只在自定义 validate_scope() 覆盖中使用 scope_violation(),处理无法声明式表达的复杂逻辑。
LLM 对错误的行为: 当 LLM 收到错误或 Scope 违规时,它通常会调整策略——用不同参数重试、告知用户限制、或选择替代工具。平台不会自动重试工具调用;由 LLM 决定下一步。
模板方法:execute()
基类提供默认的 execute(),通过 OPERATION_HANDLERS 分发到你的 handler。你通常不需要覆盖 execute()——只需定义 handler 映射和方法:
class MyExecutor(HttpExecutorBase):
OPERATION_HANDLERS = {
"data.read": "_handle_read",
"data.write": "_handle_write",
}
async def _handle_read(self, request: ExecuteRequest) -> ExecuteResult:
data = await self._http_get(request, "/data")
return ExecuteResult.success_result(data)
async def _handle_write(self, request: ExecuteRequest) -> ExecuteResult:
data = await self._http_post(request, "/data", json=request.params)
return ExecuteResult.success_result(data)
默认 execute() 流程:
execute(request)
├── 1. 从 OPERATION_HANDLERS 查找 handler
│ → 未找到则返回 "Unknown operation" 错误
├── 2. pre_execute(request)
│ → 返回 ExecuteResult 则短路
├── 3. validate_scope(operation, params, scope)
│ → 返回错误字符串则 Scope 违规
├── 4. handler(request)
│ → 你的 handler 方法
└── 5. 异常时:handle_execute_error(request, error)
生命周期钩子
覆盖这些钩子来自定义行为,无需替换 execute():
pre_execute(request) → ExecuteResult | None
在 Scope 验证之前调用。返回 ExecuteResult 可短路(如依赖检查),返回 None 则正常继续。
async def pre_execute(self, request: ExecuteRequest) -> ExecuteResult | None:
if not self._api_client:
return ExecuteResult.error_result("API client not configured")
return None
handle_execute_error(request, error) → ExecuteResult
handler 抛出异常时调用。覆盖以做 API 特定的错误映射:
async def handle_execute_error(
self, request: ExecuteRequest, error: Exception
) -> ExecuteResult:
if isinstance(error, httpx.HTTPStatusError):
status = error.response.status_code
body = error.response.text[:200]
return ExecuteResult.error_result(f"API error {status}: {body}")
return await super().handle_execute_error(request, error)
Scope 校验
对于大多数插件,你完全不需要实现 Scope 校验。只需在清单中声明 scope_dimensions,Guardian 会声明式地处理一切。
仅在复杂校验逻辑无法声明式表达时覆盖 validate_scope():
def validate_scope(
self,
operation: str,
params: dict[str, Any],
scope: dict[str, Any] | None,
) -> str | None:
"""Scope 校验失败返回错误消息,通过返回 None。"""
if not scope:
return None
# 自定义:检查 URL scheme
url = params.get("url", "")
if url:
from urllib.parse import urlparse
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return f"不支持的 URL scheme: {parsed.scheme}"
return None
凭据访问
平台将凭据注入 ExecuteRequest——Bot 和 LLM 永远看不到。
使用 _get_token()(推荐用于 Bearer 风格认证):
token = self._get_token(request) # 读取 credential_fields["access_token"]
token = self._get_token(request, field="api_key") # 自定义字段名
优先尝试 credential_fields[field],回退到 credential_value。
访问多个凭据字段:
if request.credential_fields:
email = request.credential_fields.get("email", "")
api_key = request.credential_fields.get("api_key", "")
OAuth Token 刷新:
如果 request.credential_refresh 已设置,平台处理自动 Token 刷新。HttpExecutorBase 在 401 响应时自动调用。对于自定义执行器:
if response.status_code == 401 and request.credential_refresh:
new_fields = await request.credential_refresh()
if new_fields:
request.credential_fields = new_fields
# 用新凭据重试
tool_name 路由
当多个工具共享同一个 operation 时,使用 request.tool_name 区分:
# 在 monstrum.yaml 中
tools:
- name: github_add_labels
operation: issue.label.write # 相同 operation
...
- name: github_remove_labels
operation: issue.label.write # 相同 operation
...
# 在 executor.py 中
OPERATION_HANDLERS = {
"issue.label.write": "_handle_label_write",
}
async def _handle_label_write(self, request: ExecuteRequest) -> ExecuteResult:
if request.tool_name == "github_remove_labels":
# 删除逻辑
...
else:
# 添加逻辑(默认)
...
并发与无状态
执行器是单例的。 平台在插件加载时创建一个执行器类的实例,该单一实例在进程生命周期内处理所有并发请求。这有重要的影响:
不要在 self 上存储请求状态。 每个请求作为独立的 ExecuteRequest 对象到达。所有逐调用的数据——凭据、参数、Scope、资源配置——都在请求上,不在执行器实例上。
# 错误 — 跨并发请求共享状态
class BadExecutor(HttpExecutorBase):
async def _handle_read(self, request: ExecuteRequest) -> ExecuteResult:
self.current_token = request.credential_fields["access_token"] # 竞态条件!
data = await self._http_get(request, "/data")
return ExecuteResult.success_result(data)
# 正确 — 所有状态都是请求范围的
class GoodExecutor(HttpExecutorBase):
async def _handle_read(self, request: ExecuteRequest) -> ExecuteResult:
data = await self._http_get(request, "/data") # 凭据通过 request 流转
return ExecuteResult.success_result(data)
实例状态仅用于初始化。 常量、配置和可复用的客户端(如 httpx 连接池)可以放在 self 上。逐请求的数据不可以。
Handler 是 async 的。 所有 handler 方法使用 async def,应使用 await 进行 I/O。永远不要使用阻塞调用(requests.get、time.sleep)——它们会阻塞事件循环,使所有并发请求停滞。
SDK 函数
将执行器的能力作为独立函数暴露,供程序化使用(不仅仅是 LLM 工具调用):
def get_sdk_functions(self) -> dict[str, Any]:
return {
"search": self._sdk_search,
"create": self._sdk_create,
}
async def _sdk_search(self, *, query: str, max_results: int = 10) -> dict:
"""搜索项目。可通过 platform.my_plugin.search(...) 调用"""
# 实现...
return {"results": [...]}
这些函数通过 Platform SDK 访问:
from monstrum_sdk import platform
results = await platform.my_plugin.search(query="bug", max_results=5)
重要: SDK 函数绕过 Guardian 权限管道。它们是对执行器方法的直接调用,没有 Scope 检查或审计日志。参见 PluginClient vs Platform SDK 了解各自的使用场景。
国际化 (i18n)
在 locales/ 目录中创建 JSON 文件(可通过清单中的 locales_dir 配置):
locales/en-US.json:
{
"description": "Jira integration plugin",
"tools.jira_list_issues.description": "List issues from a Jira project.",
"tools.jira_create_issue.description": "Create a new Jira issue.",
"scope_dimensions.projects.error_template": "Project {value} is not authorized",
"auth_methods.0.label": "API Token",
"auth_methods.0.description": "Authenticate with email + API token"
}
locales/zh-CN.json:
{
"description": "Jira 集成插件",
"tools.jira_list_issues.description": "列出 Jira 项目的 Issue。",
"tools.jira_create_issue.description": "创建新的 Jira Issue。",
"scope_dimensions.projects.error_template": "项目 {value} 不在授权范围内",
"auth_methods.0.label": "API 令牌",
"auth_methods.0.description": "使用邮箱 + API 令牌认证"
}
键命名约定:
| 键模式 | 覆盖内容 |
|---|---|
description | 插件描述 |
tools.{tool_name}.description | 工具描述 |
scope_dimensions.{key}.error_template | Scope 错误消息 |
auth_methods.{index}.label | 认证方式显示名 |
auth_methods.{index}.description | 认证方式帮助文本 |
平台根据用户的语言偏好应用翻译。
关于 auth_methods 键的注意事项: 认证方式的翻译使用基于索引的键(auth_methods.0.label、auth_methods.1.label),而不是基于方法名的键。这意味着在清单中重新排列 auth_methods 的顺序会破坏翻译映射。翻译发布后请保持 auth_methods 的顺序稳定,或同步更新 locale 文件。
PluginClient — 跨插件组合
PluginClient 通过完整的权限管道调用工具(ToolExecutor → Guardian → Executor → Auditor)。当需要权限执行和审计日志时,这是插件、工作流和技能调用其他插件工具的正确方式。
from monstrum_sdk import get_plugin_client, PluginError
# 创建绑定到特定 Bot 和任务的客户端
github = get_plugin_client(
"github",
bot_id="bot-123",
task_id="task-456",
workspace_id="ws-789",
)
# 用短名调用工具(前缀自动添加)
try:
issues = await github.list_issues(repo="myorg/myrepo", state="open")
# 内部调用工具 "github_list_issues"
# 经过:Guardian Scope 检查 → GitHubExecutor → Auditor
await github.create_issue(
repo="myorg/myrepo",
title="Bug: login fails",
body="Steps to reproduce...",
)
except PluginError as e:
print(f"Failed: {e.message} (status: {e.status})")
PluginClient vs Platform SDK:治理边界
这个区别至关重要:
| 方面 | PluginClient | Platform SDK |
|---|---|---|
| 权限执行 | 完整 Guardian 检查 | 无 |
| 审计日志 | 有 | 无 |
| 凭据解析 | Bot 特定的绑定 | 显式提供或无 |
| Scope 约束 | 评估并执行 | 绕过 |
| 使用场景 | Bot 上下文中的跨插件工具调用 | 基础设施代码的直接执行器访问 |
| 导入 | get_plugin_client() | platform.{type}.{fn}() |
经验法则: 如果调用来自 Bot 的执行上下文(工具 handler、工作流步骤、技能),使用 PluginClient。Bot 的权限、Scope 约束和委派限制都会生效。如果调用是在任何 Bot 上下文之外运行的平台基础设施代码(调度器、系统维护任务),使用 Platform SDK。
当你应该使用 PluginClient 却使用了 Platform SDK 时,会产生治理漏洞:调用绕过所有权限检查、Scope 约束和审计日志。在多租户环境中,这意味着 Bot 可能访问未被授权的资源。
Platform SDK
platform 单例提供对内置执行器能力和横切基础设施的访问。这些调用绕过 Guardian 权限管道——它们直接调用执行器方法,没有 Scope 检查或审计日志。
from monstrum_sdk import platform
platform.oauth — OAuth Token 管理
# 列出为资源类型配置的 OAuth Provider
providers = await platform.oauth.list_providers(
resource_type_id="github",
workspace_id="ws-123",
)
# 返回:[{"id", "name", "resource_type_id", "client_id", "is_active"}, ...]
# 获取当前有效的 OAuth Token
token_info = await platform.oauth.get_token(credential_id="cred-456")
# 返回:{"access_token", "token_type", "expires_at", "scope"}
platform.events — 事件系统
事件系统允许插件发出事件并订阅平台范围的事件。
发出自定义事件:
result = await platform.events.emit(
"deploy.completed", # 事件名
data={"version": "2.1.0", "env": "prod"}, # 载荷
workspace_id="ws-123",
bot_id="bot-456",
)
# 返回:{"event_id": "...", "event_type": "custom.deploy.completed"}
事件名规则:字母数字、点、下划线、冒号、连字符。最长 128 字符。平台自动添加 custom. 前缀。
订阅事件:
sub = await platform.events.subscribe(
"task.*", # fnmatch 模式
bot_id="bot-456",
workspace_id="ws-123",
instruction="A task event occurred: {event_type}. Data: {data}",
)
# 返回:{"subscription_id": "...", "pattern": "task.*"}
instruction 字段是匹配事件触发时发送给 Bot 的模板。支持的占位符:{event_type}、{source_type}、{source_id}、{data}、{metadata}。
取消订阅:
result = await platform.events.unsubscribe(
"sub-789",
bot_id="bot-456", # 所有权验证
)
# 返回:{"subscription_id": "sub-789", "removed": true}
列出订阅:
subs = await platform.events.get_subscriptions(bot_id="bot-456")
# 返回:[{"subscription_id", "pattern", "instruction", "active", "created_at"}, ...]
事件→工作流触发器:
事件也可以直接触发工作流(无需经过 Bot)。使用工作流触发器 REST API:
POST /api/workflows/{workflow_id}/triggers — 创建触发器(event_pattern + instruction)
GET /api/workflows/{workflow_id}/triggers — 列出触发器
DELETE /api/workflows/{workflow_id}/triggers/{trigger_id} — 删除触发器
当匹配的事件发生时,平台自动以事件数据作为输入执行关联的工作流。触发器持久化到 workflow_triggers 表中,启动时自动加载到 EventDispatcher。
内置事件类型:
| 模式 | 来源 | 描述 |
|---|---|---|
task.completed | AgentRuntime | 任务成功完成 |
task.failed | AgentRuntime | 任务失败 |
task.cancelled | AgentRuntime | 任务被取消 |
workflow.completed | WorkflowExecutor | 工作流完成 |
workflow.failed | WorkflowExecutor | 工作流失败 |
schedule.fired | SchedulerService | 定时事件触发 |
session.created | SessionManager | 新会话开始 |
session.expired | SessionManager | 会话超时 |
custom.* | Bot 通过 emit | 自定义事件 |
内置执行器命名空间
直接访问内置执行器能力。注意:这些绕过 Guardian,不记录审计日志。
# SSH
result = await platform.ssh.run(
host="prod-01",
command="df -h",
credential="ssh-key-content",
timeout=30,
)
# MCP (Model Context Protocol — 仅 HTTP 传输)
tools = await platform.mcp.list_tools(
server="calculator",
url="https://mcp.example.com/sse",
)
result = await platform.mcp.call_tool(
server="calculator",
tool="add",
arguments={"a": 1, "b": 2},
)
# Bot(跨 Bot 调用)
task = await platform.bot.execute_task(
target_bot_id="bot-789",
instruction="Summarize today's issues",
params={"project": "PROJ"},
)
answer = await platform.bot.query(
target_bot_id="bot-789",
question="What is the current sprint velocity?",
)
status = await platform.bot.status(target_bot_id="bot-789")
# Web
results = await platform.web.search(query="Monstrum docs", max_results=5)
page = await platform.web.fetch(url="https://example.com", extract_mode="markdown")
# Web3(EVM 链上操作)
balance = await platform.web3.get_balance(
resource_id="res-123",
config={"rpc_url": "https://mainnet.infura.io/v3/KEY", "chain_id": 1},
address="0x742d35Cc6634C0532925a3b844Bc9e7595f2bD18",
)
tx = await platform.web3.transfer(
resource_id="res-123",
config={"rpc_url": "https://mainnet.infura.io/v3/KEY"},
credential_fields={"private_key": "0x..."},
to="0x...",
value="0.1",
)
插件信任与安全模型
理解插件与平台之间的信任边界对插件开发者和平台管理员都很重要。
信任假设
插件在进程内与平台一起运行。没有沙箱、没有代码签名、没有对插件可执行的 Python 代码的运行时限制。平台信任:
- 插件来自可信来源。 管理员控制
plugins/目录中安装什么,或通过.mst包导入什么。 - 插件遵循 ExecutorBase 契约。 加载器验证执行器类是
ExecutorBase的子类,但不限制 handler 方法内运行什么代码。 - 插件不篡改平台内部。 插件可以直接导入和调用平台服务,但这样做会绕过所有安全保证。
平台保证什么
尽管在进程内运行,平台在你的插件代码周围提供这些保证:
- 凭据隔离:你的 handler 通过
ExecuteRequest.credential_fields接收凭据。平台注入它们;Bot 和 LLM 永远看不到。凭据在静态时使用 加密。 - Scope 执行:Guardian 在你的 handler 被调用之前评估
scope_dimensions。如果 Scope 检查失败,你的 handler 不会执行。 - 审计追踪:每次工具调用——包括失败的——都被 Auditor 记录,包含请求 ID、Bot ID、操作、参数和结果状态。
- 内置类型保护:平台阻止插件覆盖内置类型(
ssh、mcp、bot)。
平台不保证什么
- 无代码沙箱:插件代码可以完全访问 Python 运行时、文件系统和网络。
- 无导入限制:插件可以导入任何 Python 模块,包括平台内部模块。
- 无运行时资源限制:没有对插件执行的 CPU、内存或网络配额。
- 无代码审查或签名:
.mst导入验证清单结构,不验证代码安全性。
对插件开发者的影响
- 你的执行器与所有其他插件和平台本身在同一进程中运行。handler 中的崩溃可能影响整个平台。
- 不要直接访问平台数据库或内部状态。使用
PluginClient或 Platform SDK。 - 不要启动后台线程或长时间运行的进程。执行器处理单个请求;平台管理生命周期。
- 将凭据字段视为敏感数据——不要记录、缓存或在预期 API 调用之外传输它们。
对管理员的影响
- 只安装来自可信来源的插件。部署前审查执行器代码。
- 使用 Scope 约束限制任何 Bot 通过插件能做什么,无论插件代码允许什么。
- 监控审计日志,关注可能表明插件行为异常的意外工具调用模式。
测试
单元测试执行器
import pytest
from monstrum_sdk import ExecuteRequest, ExecuteResult
def _make_request(operation, params=None, **kwargs):
return ExecuteRequest(
request_id="test-req",
bot_id="test-bot",
task_id="test-task",
operation=operation,
params=params or {},
**kwargs,
)
class TestMyExecutor:
@pytest.fixture
def executor(self):
from plugins.my_plugin.executor import MyExecutor
return MyExecutor()
async def test_read_success(self, executor, httpx_mock):
httpx_mock.add_response(
url="https://api.example.com/data",
json={"items": [1, 2, 3]},
)
request = _make_request(
"data.read",
params={"query": "test"},
credential_fields={"access_token": "test-token"},
)
result = await executor.execute(request)
assert result.success
assert result.data["items"] == [1, 2, 3]
async def test_unknown_operation(self, executor):
request = _make_request("invalid.op")
result = await executor.execute(request)
assert not result.success
assert "Unknown operation" in result.error
async def test_scope_validation(self, executor):
error = executor.validate_scope(
"data.read",
{"project": "SECRET"},
{"projects": ["PUBLIC-*"]},
)
assert error is not None
平台集成测试
from unittest.mock import AsyncMock, MagicMock, patch
async def test_plugin_client_integration():
mock_tool_executor = AsyncMock()
mock_tool_executor.execute.return_value = MagicMock(
success=True,
result={"issues": []},
)
with patch("services.runner.state.get_runner_state") as mock_state:
mock_state.return_value.tool_executor = mock_tool_executor
from monstrum_sdk import get_plugin_client
client = get_plugin_client("github", bot_id="b1", task_id="t1")
result = await client.list_issues(repo="org/repo")
assert result == {"issues": []}
运行测试
# 运行插件测试
pytest tests/plugins/my_plugin/ -v
# 运行完整测试套件检查回归
pytest tests/ -x -q
# Lint
ruff check plugins/my_plugin/
常见陷阱
1. 在 self 上存储请求状态
执行器是单例的——同一个实例处理所有并发请求。在 self 上存储逐请求的数据会导致竞态条件:
# 错误
self.current_user = request.credential_fields["email"]
data = await self._http_get(request, "/data") # 另一个请求覆盖了 self.current_user
# 正确 — 使用请求范围的数据
data = await self._http_get(request, "/data") # 凭据通过 request 对象流转
2. 使用阻塞 I/O
所有 handler 都是 async 的。阻塞调用会使整个事件循环停滞:
# 错误 — 阻塞事件循环
import requests
response = requests.get("https://api.example.com/data")
# 正确 — 使用 async HTTP
data = await self._http_get(request, "/data")
3. 在执行器中实现权限检查
Scope 检查属于 scope_dimensions,不属于 handler 代码:
# 错误 — 在 handler 中手动检查权限
async def _handle_read(self, request: ExecuteRequest) -> ExecuteResult:
repo = request.params["repo"]
if not self._is_repo_allowed(repo, request.scope): # 重新发明 Guardian
return ExecuteResult.error_result("Not allowed")
...
# 正确 — 在清单中声明,Guardian 处理
# scope_dimensions:
# - key: repos
# param_paths: [repo]
# match_mode: pattern
4. 对非 Scope 错误使用 scope_violation()
scope_violation() 表示授权问题。不要用于 API 错误:
# 错误 — API 404 不是 Scope 违规
if response.status_code == 404:
return ExecuteResult.scope_violation("Repository not found")
# 正确 — 这是执行错误
if response.status_code == 404:
return ExecuteResult.error_result("Repository not found")
5. 忘记 resource_type 必须与清单 id 匹配
执行器中的 resource_type 类变量必须与 monstrum.yaml 中的 resource_type.id 完全匹配:
# monstrum.yaml
resource_type:
id: my-plugin # 这个字符串...
# executor.py
class MyExecutor(HttpExecutorBase):
resource_type = "my-plugin" # ...必须与这个匹配
6. 发布翻译后重排 auth_methods
认证方式的翻译键是基于索引的(auth_methods.0.label、auth_methods.1.label)。重排清单中 auth_methods 数组的顺序会破坏翻译映射。如需重排,请同步更新 locale 文件。
7. 需要治理时使用 Platform SDK
如果你的代码在 Bot 上下文中运行且调用另一个插件,使用 PluginClient——不是 Platform SDK。Platform SDK 绕过所有权限检查和审计日志:
# 错误 — 在 Bot handler 中绕过治理
async def _handle_deploy(self, request: ExecuteRequest) -> ExecuteResult:
await platform.ssh.run(host="prod", command="deploy.sh", ...) # 没有 Scope 检查!
# 正确 — 使用 PluginClient 进行受治理的访问
async def _handle_deploy(self, request: ExecuteRequest) -> ExecuteResult:
ssh = get_plugin_client("ssh", bot_id=request.bot_id, task_id=request.task_id)
await ssh.run(host="prod", command="deploy.sh") # Guardian 执行 Scope 检查
8. 记录凭据值
永远不要记录凭据字段。它们包含密钥(API Key、Token、密码):
# 错误
logger.info(f"Calling API with token: {request.credential_fields}")
# 正确
logger.info(f"Calling API for bot={request.bot_id}, operation={request.operation}")
打包与分发
.mst 文件格式
插件可以打包为 .mst 文件(ZIP 格式)进行分发:
cd plugins/
zip -r my_plugin.mst my_plugin/
通过 CLI 安装:
monstrum plugin install my_plugin
monstrum plugin install my_plugin@1.0.0 # 指定版本
插件生命周期
# 安装
monstrum plugin install <package>[@version]
# 卸载
monstrum plugin uninstall <package>
# 列出已安装的插件
monstrum plugin list
# 搜索插件
monstrum plugin search <query>
# 查看插件详情
monstrum plugin info <package>
# 更新插件
monstrum plugin update [package]
完整参考:GitHub 插件
GitHub 插件是标准参考实现。研究它以了解最佳实践。
文件:plugins/github/monstrum.yaml
name: github
version: 1.0.0
description: GitHub integration plugin — issues, comments, labels, and repository info
author: Monstrum
license: MIT
tags: [scm, github, issues]
repository: https://github.com/MonstrumAI/monstrum
resource_type:
id: github
name: GitHub
mode: plugin
tool_discovery: static
description: "GitHub 代码托管平台"
icon: github # 语义名称(映射 Ant Design 图标)或文件名(如 icon.svg)
auth_flow: oauth
credential_schema:
- field: access_token
type: secret
required: true
description: "GitHub Access Token"
config_schema:
- field: api_base
type: url
required: false
default: "https://api.github.com"
description: "GitHub API base URL (customize for GitHub Enterprise)"
auth_methods:
- method: oauth2_auth_code
label: OAuth Login
description: "Authorize via GitHub OAuth"
credential_schema:
- field: access_token
type: secret
required: true
description: "OAuth Access Token (obtained automatically)"
oauth_config:
authorization_url: "https://github.com/login/oauth/authorize"
token_url: "https://github.com/login/oauth/access_token"
scopes: [repo, "read:org"]
- method: token
label: Personal Access Token
description: "Configure manually with a GitHub PAT"
credential_schema:
- field: access_token
type: secret
required: true
description: "GitHub Personal Access Token"
tools:
- name: github_list_issues
description: "List issues from a GitHub repository."
operation: issue.read
input_schema:
type: object
properties:
repo: { type: string, description: "owner/repo format" }
state: { type: string, enum: [open, closed, all], default: open }
labels: { type: array, items: { type: string } }
since: { type: string, description: "ISO 8601 timestamp" }
per_page: { type: integer, default: 30 }
required: [repo]
- name: github_create_issue
description: "Create a new issue in a GitHub repository."
operation: issue.write
input_schema:
type: object
properties:
repo: { type: string }
title: { type: string }
body: { type: string }
labels: { type: array, items: { type: string } }
assignees: { type: array, items: { type: string } }
required: [repo, title]
- name: github_update_issue
description: "Update an existing GitHub issue."
operation: issue.write
input_schema:
type: object
properties:
repo: { type: string }
issue_number: { type: integer }
title: { type: string }
body: { type: string }
state: { type: string, enum: [open, closed] }
labels: { type: array, items: { type: string } }
assignees: { type: array, items: { type: string } }
required: [repo, issue_number]
- name: github_add_labels
description: "Add labels to a GitHub issue."
operation: issue.label.write
input_schema:
type: object
properties:
repo: { type: string }
issue_number: { type: integer }
labels: { type: array, items: { type: string } }
required: [repo, issue_number, labels]
- name: github_remove_labels
description: "Remove labels from a GitHub issue."
operation: issue.label.write
input_schema:
type: object
properties:
repo: { type: string }
issue_number: { type: integer }
labels: { type: array, items: { type: string } }
required: [repo, issue_number, labels]
- name: github_add_comment
description: "Add a comment to a GitHub issue."
operation: issue.comment.write
input_schema:
type: object
properties:
repo: { type: string }
issue_number: { type: integer }
body: { type: string }
required: [repo, issue_number, body]
- name: github_list_comments
description: "List comments on a GitHub issue."
operation: issue.comment.read
input_schema:
type: object
properties:
repo: { type: string }
issue_number: { type: integer }
required: [repo, issue_number]
- name: github_get_repo
description: "Get information about a GitHub repository."
operation: repo.read
input_schema:
type: object
properties:
repo: { type: string }
required: [repo]
scope_dimensions:
- key: repos
param_paths: [repo, owner_repo]
match_mode: pattern
error_template: "Repository {value} is not authorized"
executor:
module: executor
class_name: GitHubExecutor
文件:plugins/github/executor.py
from __future__ import annotations
import logging
import httpx
from monstrum_sdk import ExecuteRequest, ExecuteResult, HttpExecutorBase
logger = logging.getLogger(__name__)
class GitHubExecutor(HttpExecutorBase):
resource_type = "github"
default_api_base = "https://api.github.com"
default_headers = {
"Accept": "application/vnd.github+json",
"X-GitHub-Api-Version": "2022-11-28",
}
supported_operations = [
"repo.read",
"issue.read",
"issue.write",
"issue.comment.read",
"issue.comment.write",
"issue.label.write",
]
OPERATION_HANDLERS = {
"repo.read": "_handle_repo_read",
"issue.read": "_handle_issue_read",
"issue.write": "_handle_issue_write",
"issue.comment.read": "_handle_comment_read",
"issue.comment.write": "_handle_comment_write",
"issue.label.write": "_handle_label_write",
}
async def handle_execute_error(
self, request: ExecuteRequest, error: Exception
) -> ExecuteResult:
if isinstance(error, httpx.HTTPStatusError):
logger.error(f"GitHub API error: {error}")
return ExecuteResult.error_result(
f"GitHub API error: {error.response.status_code} "
f"- {error.response.text[:200]}"
)
if isinstance(error, httpx.RequestError):
logger.error(f"GitHub request error: {error}")
return ExecuteResult.error_result(
f"GitHub request error: {str(error)}"
)
logger.exception(f"GitHub execution error: {error}")
return ExecuteResult.error_result(f"Execution error: {str(error)}")
# ── Helpers ──
@staticmethod
def _parse_repo(params: dict) -> tuple[str, str] | None:
repo = params.get("repo", "")
if not repo or "/" not in repo:
return None
return tuple(repo.split("/", 1))
# ── Handlers ──
async def _handle_repo_read(self, request: ExecuteRequest) -> ExecuteResult:
parsed = self._parse_repo(request.params)
if not parsed:
return ExecuteResult.error_result("Invalid repo format, expected 'owner/repo'")
owner, name = parsed
data = await self._http_get(request, f"/repos/{owner}/{name}")
return ExecuteResult.success_result(data)
async def _handle_issue_read(self, request: ExecuteRequest) -> ExecuteResult:
parsed = self._parse_repo(request.params)
if not parsed:
return ExecuteResult.error_result("Invalid repo format")
owner, name = parsed
issue_number = request.params.get("issue_number")
if issue_number:
data = await self._http_get(request, f"/repos/{owner}/{name}/issues/{issue_number}")
return ExecuteResult.success_result(data)
params = {}
if state := request.params.get("state"):
params["state"] = state
if labels := request.params.get("labels"):
params["labels"] = ",".join(labels) if isinstance(labels, list) else labels
if since := request.params.get("since"):
params["since"] = since
if per_page := request.params.get("per_page"):
params["per_page"] = per_page
data = await self._http_get(request, f"/repos/{owner}/{name}/issues", params=params or None)
return ExecuteResult.success_result(data)
async def _handle_issue_write(self, request: ExecuteRequest) -> ExecuteResult:
parsed = self._parse_repo(request.params)
if not parsed:
return ExecuteResult.error_result("Invalid repo format")
owner, name = parsed
issue_number = request.params.get("issue_number")
body = {}
for key in ("title", "body", "labels", "assignees", "state"):
if val := request.params.get(key if key != "body" else "body"):
body[key] = val
if issue_number:
data = await self._http_patch(
request, f"/repos/{owner}/{name}/issues/{issue_number}", json=body,
)
else:
data = await self._http_post(
request, f"/repos/{owner}/{name}/issues", json=body,
)
return ExecuteResult.success_result(data)
async def _handle_comment_read(self, request: ExecuteRequest) -> ExecuteResult:
parsed = self._parse_repo(request.params)
if not parsed:
return ExecuteResult.error_result("Invalid repo format")
issue_number = request.params.get("issue_number")
if not issue_number:
return ExecuteResult.error_result("issue_number is required")
owner, name = parsed
data = await self._http_get(
request, f"/repos/{owner}/{name}/issues/{issue_number}/comments",
)
return ExecuteResult.success_result(data)
async def _handle_comment_write(self, request: ExecuteRequest) -> ExecuteResult:
parsed = self._parse_repo(request.params)
if not parsed:
return ExecuteResult.error_result("Invalid repo format")
issue_number = request.params.get("issue_number")
body = request.params.get("body")
if not issue_number:
return ExecuteResult.error_result("issue_number is required")
if not body:
return ExecuteResult.error_result("Comment body is required")
owner, name = parsed
data = await self._http_post(
request, f"/repos/{owner}/{name}/issues/{issue_number}/comments",
json={"body": body},
)
return ExecuteResult.success_result(data)
async def _handle_label_write(self, request: ExecuteRequest) -> ExecuteResult:
"""添加或删除标签。通过 tool_name 区分。"""
parsed = self._parse_repo(request.params)
if not parsed:
return ExecuteResult.error_result("Invalid repo format")
issue_number = request.params.get("issue_number")
labels = request.params.get("labels", [])
if not issue_number:
return ExecuteResult.error_result("issue_number is required")
if not labels:
return ExecuteResult.error_result("labels are required")
owner, name = parsed
is_remove = request.tool_name == "github_remove_labels"
if is_remove:
results = []
for label in labels:
resp = await self._http_delete(
request, f"/repos/{owner}/{name}/issues/{issue_number}/labels/{label}",
)
results.append({"label": label, "removed": resp.status_code == 200})
return ExecuteResult.success_result(results)
data = await self._http_post(
request, f"/repos/{owner}/{name}/issues/{issue_number}/labels",
json={"labels": labels},
)
return ExecuteResult.success_result(data)
API 参考
monstrum_sdk 导出
from monstrum_sdk import (
# 执行器基类
ExecutorBase, # 所有执行器的抽象基类
HttpExecutorBase, # HTTP API 执行器基类
Web3ExecutorBase, # EVM 链上操作执行器基类
ExecuteRequest, # 请求数据类
ExecuteResult, # 结果数据类
ExecuteStatus, # 枚举:SUCCESS, ERROR, SCOPE_VIOLATION
# 资源模型
ToolDef, # 工具定义
ScopeDimension, # 权限维度
FieldDef, # 字段定义(凭据/配置)
AuthMethod, # 枚举:OAUTH2_AUTH_CODE, API_KEY, TOKEN, ...
AuthMethodDef, # 认证方式声明
OAuthProviderConfig, # OAuth 端点配置
# 插件清单
PluginManifest, # 完整插件清单
PluginResourceType, # 清单中的 ResourceType
PluginExecutorDef, # 执行器加载配置
# PluginClient(通过 Guardian 的工具级调用)
PluginClient, # 高级工具调用器
PluginError, # 工具调用失败异常
get_plugin_client, # 工厂函数
# Platform SDK(内置执行器能力)
Platform, # 能力入口
PlatformError, # 能力调用失败异常
platform, # 全局单例
)
ExecutorBase 方法
| 方法 | 签名 | 描述 |
|---|---|---|
supports_operation | (operation: str) → bool | 检查是否支持操作(支持 glob 通配符) |
validate_scope | (operation, params, scope) → str | None | 自定义 Scope 校验;返回错误或 None |
_get_token | (request, field="access_token") → str | 从凭据获取 Token |
get_sdk_functions | () → dict[str, Callable] | 暴露 SDK 函数给 Platform SDK |
execute | (request) → ExecuteResult | 主入口(模板方法) |
pre_execute | (request) → ExecuteResult | None | 分发前钩子 |
handle_execute_error | (request, error) → ExecuteResult | 错误处理钩子 |
HttpExecutorBase 方法
| 方法 | 签名 | 描述 |
|---|---|---|
_build_auth_headers | (request) → dict[str, str] | 构建含认证的请求头 |
_get_api_base | (request) → str | 解析 API 基础 URL |
_http_get | (request, path, params=None) → Any | HTTP GET → JSON |
_http_post | (request, path, json=None) → Any | HTTP POST → JSON |
_http_patch | (request, path, json=None) → Any | HTTP PATCH → JSON |
_http_delete | (request, path) → httpx.Response | HTTP DELETE → Response |
_http_request | (request, method, path, ...) → Any | 底层 HTTP(支持 raw_response) |
_paginate | (request, path, params=None, per_page=30) → list | 通过 Link header 分页 |
Web3ExecutorBase 方法
| 方法 | 签名 | 描述 |
|---|---|---|
_w3 | (request) → Web3 | 从 resource config 获取/创建缓存的 Web3 实例 |
_get_account | (request) → Account | 从凭据 private_key 构建账户 |
_get_balance | (request, address, token_address?) → dict | 原生或 ERC20 余额 |
_transfer | (request, to, value) → dict | 原生代币转账 |
_call_contract | (request, contract, abi, function, args?) → dict | 只读合约调用 |
_send_transaction | (request, contract, abi, function, args?, value?) → dict | 写合约调用 |
_get_transaction | (request, tx_hash) → dict | 交易详情 + 回执 |
_read_events | (request, contract, abi, event, from_block, to_block) → dict | 事件日志 |
_estimate_gas | (request, to, value?, data?) → dict | Gas 估算 |
_wait_for_receipt | (request, tx_hash, timeout?) → dict | 等待交易确认 |
_check_gas_price | (request, w3) → None | 检查 gas 价格是否超过上限 |
_native_symbol | (request) → str | 从配置获取原生代币符号 |
_tx_link | (request, tx_hash) → str | None | 构建区块浏览器链接 |
Platform SDK 命名空间
| 命名空间 | 方法 |
|---|---|
platform.oauth | list_providers(resource_type_id, workspace_id), get_token(credential_id) |
platform.events | emit(name, data, ...), subscribe(pattern, bot_id, ...), unsubscribe(sub_id, ...), get_subscriptions(bot_id) |
platform.ssh | run(host, command, credential, ...) |
platform.mcp | list_tools(server, ...), call_tool(server, tool, arguments, ...) |
platform.bot | execute_task(...), query(...), status(...) |
platform.web | search(query, ...), fetch(url, ...) |
platform.web3 | get_balance(...), transfer(...), call_contract(...), send_transaction(...), get_transaction(...), read_events(...) |
platform.{your_plugin} | 来自 get_sdk_functions() 的函数 |
模式匹配工具
shared.utils.matching 模块提供平台内统一使用的模式匹配函数。插件开发者可将其用于自定义 Scope 校验:
from shared.utils.matching import match_glob, match_any, match_path, match_any_path
| 函数 | 签名 | 描述 |
|---|---|---|
match_glob | (value, pattern) → bool | fnmatch glob 匹配 |
match_any | (value, patterns, *, allow_regex=False) → bool | 匹配任一模式 |
match_path | (path, pattern) → bool | 文件系统路径匹配(**、/*) |
match_any_path | (path, patterns) → bool | 路径匹配任一模式 |
示例:
match_glob("issue.read", "issue.*") # True
match_glob("anything", "*") # True
match_any("issue.read", ["issue.*", "pr.*"]) # True
match_any("ls -la", ["^ls.*"], allow_regex=True) # True
match_path("/home/user/docs/file.txt", "/home/user/**") # True
match_path("/tmp/file.txt", "/tmp/*") # True
match_any_path("/tmp/file", ["/home/**", "/tmp/*"]) # True