开发指南

Monstrum 插件开发指南

构建、测试和分发 Monstrum 插件的完整参考文档。


目录


架构概览

Monstrum 是一个 AI Agent 管控平台。Bot 的每一次工具调用都经过严格的权限管道:

用户 → Gateway → Session → LLM → ToolResolver → Guardian → Executor → Auditor
                                   (LLM 之前)     (LLM 之后)  (你的代码)

插件向平台添加一个新的 ResourceType。ResourceType 是完整的声明式契约:

声明用途消费方
tools[]LLM 可调用的工具定义ToolCatalog → LLM
scope_dimensions[]权限检查规则Guardian(自动检查)
auth_methods[]支持的凭据获取方式前端(自动渲染 UI)
credential_schema[]凭据字段定义前端(自动渲染表单)
config_schema[]资源配置字段定义前端(自动渲染表单)

平台基于这些声明驱动所有行为。你只需编写一个 Executor 类实现实际的 API 调用;其他一切——权限执行、UI 渲染、审计日志、凭据加密——都由平台处理。

三层资源模型

ResourceType  →  Resource (+Credential)  →  Bot
 (你的插件)       (管理员配置)              (通过 BotResource 授权访问)
  • ResourceType:你的插件是什么——工具定义、权限、认证方式。
  • Resource:一个具体实例——例如”公司的 GitHub”,包含 API URL 和凭据。
  • Bot:一个 AI Agent,通过 BotResource 绑定到 Resource,带有权限约束。

设计哲学

为什么是这个架构?

Monstrum 的插件模型建立在一个核心信念之上:平台强制执行权限,而不是靠 AI 自律。LLM 不能被信任自行管理工具使用。平台必须保证,无论 LLM 生成什么,权限检查、审计追踪和 Scope 约束都一定会发生。

这导出了三个设计原则:

1. 声明式优于命令式

插件声明需要什么——工具、权限、认证方式——平台决定如何执行。在清单中添加一条 scope_dimensions 就足以获得参数级权限检查,你不需要写任何授权代码。这消除了一整类 Bug:插件作者忘记检查权限,或者权限检查实现有误。

2. 数据平面与控制平面分离

你的执行器处理数据平面:发起 API 调用、解析响应、返回结果。平台处理控制平面:Bot 能看到哪些工具、参数是否在 Scope 内、注入哪些凭据、记录什么日志。你的代码永远不会直接看到凭据——它们被预注入到 ExecuteRequest.credential_fields 中。你的代码永远不会执行权限检查——Guardian 在你的 handler 被调用之前就已经做了。

3. 约定驱动的集成

一个插件就是一个包含 monstrum.yamlexecutor.py 的目录。清单驱动一切:前端从 credential_schema 自动渲染凭据表单,ToolCatalog 从 tools[] 索引工具,Guardian 从 scope_dimensions[] 评估权限。这意味着添加一个新的集成不需要修改平台代码——清单就是完整的契约。

插件是什么(和不是什么)

插件

  • 外部 API 与平台执行模型之间的一层薄适配器
  • 描述集成能力和约束的声明式清单
  • ExecuteRequest 转换为 API 调用并返回 ExecuteResult 的无状态请求处理器

插件不是

  • 通用 Python 应用程序(没有后台线程、没有启动钩子、没有全局状态)
  • 负责安全或审计的(平台处理两者)
  • 感知 LLM、用户或会话的(你的 handler 只看到当前的工具调用)

权限模型

在构建插件之前理解权限模型至关重要,因为它决定了你在清单中声明什么、在执行器代码中可以省略什么。

RBAC + 声明式 ABAC 混合模型

Monstrum 使用混合授权模型:

  • RBAC(基于角色的访问控制) 管理 Bot 可以执行哪些操作。管理员为 BotResource 绑定分配角色,角色的 allowed_operationsallowed_tools 决定 Bot 能看到哪些工具。这是粗粒度的门控——Bot 要么有 issue.read 的权限,要么没有。

  • 声明式 ABAC(基于属性的访问控制) 管理在已授权操作中哪些参数值是允许的。这就是 scope_dimensions 发挥作用的地方。即使 Bot 被授权执行 issue.read,它的 Scope 可能被限制为 repos: ["myorg/*"],意味着它只能读取匹配该模式的仓库中的 Issue。

对插件开发者的关键启示:你声明 ABAC 规则,平台执行它们。你的 scope_dimensions 条目定义属性、匹配模式和错误模板。Guardian 自动评估——你不需要自己调用 check_scope()

三层工具权限

第一层:ToolResolver(LLM 之前)

在 LLM 看到任何工具之前,ToolResolver 根据 Bot 的 BotResource 绑定过滤工具列表。LLM 只看到 Bot 被授权使用的工具。

由以下控制:RolePermissions.allowed_operations(glob 模式如 issue.*)和 RolePermissions.allowed_tools(glob 模式如 github_*)。

第二层:Guardian(LLM 之后)

LLM 选择工具并提供参数后,Guardian 根据 scope_dimensions 声明和 RolePermissions.scope_constraints 验证参数。

这就是你的 scope_dimensions 发挥作用的地方。Guardian 调用 check_scope_declarative()

  1. 遍历你的 scope_dimensions
  2. 对每个维度,通过 param_paths 提取参数值
  3. 检查该值是否匹配 scope_constraints[key] 中的任何条目
  4. 不匹配则返回 Scope 违规

第三层:委派 Scope(Bot 间)

当 Bot A 调用 Bot B(通过 BotExecutor)时,BotResource 绑定可以携带 DelegateConstraints,进一步限制 Bot B 能做什么:

class DelegateConstraints:
    allowed_tools: list[str] | None    # LLM 之前:fnmatch 工具过滤
    scope_constraints: dict[str, list[str]] | None  # LLM 之后:与自身 Scope 取交集

这防止了混淆代理攻击:即使 Bot B 有广泛的 GitHub 访问权限,来自 Bot A 绑定的委派约束也可以将 Bot B 限制为只能使用 github_list_* 工具,且只能访问 public-org/* 仓库。

声明式 Scope 检查

对于插件,Scope 检查是完全声明式的——在清单中声明 scope_dimensions,平台处理其余一切:

# 示例:按项目和 Issue 类型限制
scope_dimensions:
  - key: projects
    param_paths: [project, project_key]
    match_mode: pattern
    error_template: "项目 {value} 不在授权范围内"

  - key: issue_types
    param_paths: [issue_type]
    match_mode: exact
    operation_filter: "issue.write"
    error_template: "Issue 类型 {value} 不被允许"

管理员然后配置 BotResource:

{
  "scope_constraints": {
    "projects": ["PROJ-*", "INFRA"],
    "issue_types": ["Bug", "Task"]
  }
}

Bot 只能访问匹配 PROJ-*INFRA 的项目,且只能创建 BugTask 类型的 Issue。

委派 Scope(Bot 间)

当你的插件允许 Bot 间通信时,委派 Scope 防止混淆代理攻击:

{
  "delegate": {
    "allowed_tools": ["github_list_*"],
    "scope_constraints": {
      "repos": ["public-org/*"]
    }
  }
}

这意味着:当此 Bot 调用另一个 Bot 时,被调用的 Bot 只能使用 GitHub 列表类工具,且只能操作 public-org/* 仓库,无论其自身权限如何。


快速开始

用 3 个文件创建一个 Jira 集成插件:

1. 目录结构

plugins/
└── jira/
    ├── monstrum.yaml      # 插件清单
    ├── executor.py         # 执行器实现
    └── locales/
        ├── en-US.json      # 英文翻译
        └── zh-CN.json      # 中文翻译

2. monstrum.yaml

name: jira
version: 1.0.0
description: Jira integration plugin — issues, projects, and transitions
author: Your Name
license: MIT

resource_type:
  id: jira
  name: Jira
  mode: plugin
  tool_discovery: static
  auth_flow: manual

  credential_schema:
    - field: api_token
      type: secret
      required: true
      description: "Jira API Token"
    - field: email
      type: string
      required: true
      description: "Jira account email"

  config_schema:
    - field: api_base
      type: url
      required: true
      description: "Jira instance URL (e.g., https://yourcompany.atlassian.net)"

  auth_methods:
    - method: api_key
      label: API Token
      description: "Authenticate with email + API token"
      credential_schema:
        - field: api_token
          type: secret
          required: true
        - field: email
          type: string
          required: true

  tools:
    - name: jira_list_issues
      description: "List issues from a Jira project."
      operation: issue.read
      input_schema:
        type: object
        properties:
          project:
            type: string
            description: "Project key (e.g., PROJ)"
          status:
            type: string
            description: "Filter by status"
          max_results:
            type: integer
            default: 50
        required: [project]

    - name: jira_create_issue
      description: "Create a new Jira issue."
      operation: issue.write
      input_schema:
        type: object
        properties:
          project:
            type: string
            description: "Project key"
          summary:
            type: string
            description: "Issue summary"
          description:
            type: string
            description: "Issue description"
          issue_type:
            type: string
            default: Task
            description: "Issue type (Task, Bug, Story, etc.)"
        required: [project, summary]

  scope_dimensions:
    - key: projects
      param_paths: [project]
      match_mode: pattern
      error_template: "Project {value} is not authorized"

executor:
  module: executor
  class_name: JiraExecutor

3. executor.py

from __future__ import annotations

import logging

import httpx

from monstrum_sdk import ExecuteRequest, ExecuteResult, HttpExecutorBase

logger = logging.getLogger(__name__)


class JiraExecutor(HttpExecutorBase):
    resource_type = "jira"
    default_api_base = ""  # 通过 config_schema 按资源设置
    default_headers = {
        "Accept": "application/json",
        "Content-Type": "application/json",
    }
    supported_operations = ["issue.read", "issue.write"]

    OPERATION_HANDLERS = {
        "issue.read": "_handle_issue_read",
        "issue.write": "_handle_issue_write",
    }

    # ── 认证覆盖(Jira 使用 Basic Auth,不是 Bearer)──

    def _build_auth_headers(self, request: ExecuteRequest) -> dict[str, str]:
        import base64
        headers = dict(self.default_headers)
        if request.credential_fields:
            email = request.credential_fields.get("email", "")
            token = request.credential_fields.get("api_token", "")
            if email and token:
                encoded = base64.b64encode(f"{email}:{token}".encode()).decode()
                headers["Authorization"] = f"Basic {encoded}"
        return headers

    # ── 错误处理 ──

    async def handle_execute_error(
        self, request: ExecuteRequest, error: Exception
    ) -> ExecuteResult:
        if isinstance(error, httpx.HTTPStatusError):
            logger.error(f"Jira API error: {error}")
            return ExecuteResult.error_result(
                f"Jira API error: {error.response.status_code}"
            )
        return await super().handle_execute_error(request, error)

    # ── Handler ──

    async def _handle_issue_read(
        self, request: ExecuteRequest
    ) -> ExecuteResult:
        project = request.params.get("project", "")
        status = request.params.get("status")
        max_results = request.params.get("max_results", 50)

        jql = f"project = {project}"
        if status:
            jql += f" AND status = \"{status}\""

        data = await self._http_get(
            request,
            "/rest/api/3/search",
            params={"jql": jql, "maxResults": max_results},
        )
        return ExecuteResult.success_result(data)

    async def _handle_issue_write(
        self, request: ExecuteRequest
    ) -> ExecuteResult:
        project = request.params.get("project", "")
        summary = request.params.get("summary", "")
        description = request.params.get("description", "")
        issue_type = request.params.get("issue_type", "Task")

        data = await self._http_post(
            request,
            "/rest/api/3/issue",
            json={
                "fields": {
                    "project": {"key": project},
                    "summary": summary,
                    "description": {
                        "type": "doc",
                        "version": 1,
                        "content": [{
                            "type": "paragraph",
                            "content": [{"type": "text", "text": description}],
                        }],
                    },
                    "issuetype": {"name": issue_type},
                },
            },
        )
        return ExecuteResult.success_result(data)

搞定。将 jira/ 目录放到 plugins/ 下,平台在启动时会自动发现并注册。


插件结构

plugins/{plugin_name}/
├── monstrum.yaml          # 必需:插件清单
├── executor.py            # 必需:执行器实现
├── __init__.py            # 可选:包初始化
├── locales/               # 可选:i18n 翻译
│   ├── en-US.json
│   └── zh-CN.json
└── requirements.txt       # 可选:额外 pip 依赖

自动发现

启动时,PluginManager.scan_and_load_all() 扫描 plugins/ 目录,查找包含 monstrum.yaml 的子目录。对每个插件执行:

  1. 解析并验证清单
  2. ResourceType 写入数据库(工具、Scope、认证方式、Schema)
  3. 通过 importlib 加载执行器类
  4. 实例化执行器并注册到 ExecutorRegistry
  5. 重新加载 ToolCatalog,使工具对 LLM 可用

支持通过 PluginManager.reload_plugin(name) 热重载。


插件清单 (monstrum.yaml)

顶层字段

字段类型必需描述
namestring唯一插件名(小写字母、数字和连字符)
versionstring语义化版本号(如 1.0.0
descriptionstring可读描述
authorstring作者名
licensestring许可证标识(默认 MIT
tagslist[string]可搜索标签
homepagestring项目主页 URL
repositorystring源代码仓库 URL
locales_dirstring翻译文件目录(默认 locales
resource_typeobjectResourceType 声明(见下文)
executorobject执行器加载配置(见下文)

resource_type — ResourceType 声明

字段类型必需默认值描述
idstring唯一类型标识符(如 githubjira),必须与 executor.resource_type 匹配
namestringUI 中显示的名称
modestringpluginpluginendpointsystem
descriptionstring""描述
iconstring""前端图标标识
auth_flowstringmanualoauthmanualnone
tool_discoverystringstaticstaticdynamicconfigured(见 tool_discovery 模式
toolslist[ToolDef][]工具定义
scope_dimensionslist[ScopeDimension][]权限维度
auth_methodslist[AuthMethodDef][]支持的认证方式
config_schemalist[FieldDef][]资源配置字段
credential_schemalist[FieldDef][]凭据字段

tools — 工具定义

tools 列表中的每个条目定义一个 LLM 可调用的工具:

tools:
  - name: github_list_issues          # 全局唯一的工具名
    description: "List issues..."      # 展示给 LLM 的描述
    operation: issue.read              # 映射到 OPERATION_HANDLERS 的键
    input_schema:                      # 参数的 JSON Schema
      type: object
      properties:
        repo:
          type: string
          description: "owner/repo format"
      required: [repo]
    output_schema: null                # 可选:输出的 JSON Schema
    cost:                              # 可选:计费信息
      tokens: 0
      credits: 0.0

关键规则:

  • name 必须全局唯一。命名约定:{resource_type}_{action}(如 jira_list_issues)。
  • operation 映射到执行器 OPERATION_HANDLERS 字典的键。
  • 多个工具可以共享同一个 operation——在 handler 中使用 request.tool_name 区分(见 tool_name 路由)。
  • input_schema 遵循 JSON Schema 规范,传递给 LLM 用于函数调用。

scope_dimensions — 权限维度

Scope 维度定义哪些参数需要接受权限检查。Guardian 在 LLM 选择工具后自动评估——你不需要编写任何权限检查代码。

scope_dimensions:
  - key: repos                         # Scope 字典中的键
    param_paths: [repo, owner_repo]    # 要提取值的参数名
    match_mode: pattern                # 匹配策略
    operation_filter: "issue.*"        # 只对这些操作生效(glob)
    error_template: "仓库 {value} 不在授权范围内"
字段类型必需默认值描述
keystringScope 约束字典中的键(如 reposprojectsdomains
param_pathslist[string]要提取值的参数名(按顺序尝试)
match_modestringpatternpattern(fnmatch glob)、path(文件系统路径)、exact(字符串相等)
operation_filterstringnull限制此维度适用于哪些操作的 glob 模式
error_templatestring""{value} 占位符的错误消息

匹配模式:

  • pattern — fnmatch glob 匹配。"myorg/*" 匹配 "myorg/repo""*" 匹配一切。
  • path — 文件系统路径匹配。支持 **(递归)、/*(单级)、前缀匹配。
  • exact — 仅字符串相等。

运行时工作流程:

  1. 管理员配置 BotResource 的 scope:{"repos": ["myorg/*", "otherorg/public-*"]}
  2. Bot 调用 github_list_issues(repo="myorg/myrepo")
  3. Guardian 通过 param_paths 提取 repo 的值
  4. Guardian 根据 match_mode"myorg/myrepo"["myorg/*", "otherorg/public-*"] 匹配
  5. 匹配成功 → 工具调用继续。不匹配 → Scope 违规返回给 LLM。

auth_methods — 认证方式

auth_methods:
  - method: oauth2_auth_code
    label: OAuth Login
    description: "Authorize via GitHub OAuth"
    credential_schema:
      - field: access_token
        type: secret
        required: true
    oauth_config:
      authorization_url: "https://github.com/login/oauth/authorize"
      token_url: "https://github.com/login/oauth/access_token"
      scopes: [repo, "read:org"]
      pkce_required: false

  - method: token
    label: Personal Access Token
    description: "Enter a PAT manually"
    credential_schema:
      - field: access_token
        type: secret
        required: true

支持的 method 值:

方法描述
oauth2_auth_codeOAuth 2.0 授权码(浏览器重定向)
oauth2_client_credsOAuth 2.0 客户端凭据(M2M)
oauth2_device_codeOAuth 2.0 设备码(CLI/IoT)
api_keyAPI Key 认证
tokenBearer Token
ssh_keySSH 密钥对
basicHTTP Basic 认证
none无需认证

前端根据你的 auth_methods 声明自动渲染相应的凭据表单。对于 OAuth 方式,平台处理完整的流程(重定向、Token 交换、刷新)。

oauth_config 字段:

字段类型描述
authorization_urlstringOAuth /authorize 端点
token_urlstringOAuth /token 端点
scopeslist[string]默认请求的 Scope
pkce_requiredbool是否强制 PKCE
device_authorization_urlstring设备码流端点

credential_schema / config_schema — 字段定义

两者使用相同的 FieldDef 结构:

credential_schema:
  - field: access_token
    type: secret
    required: true
    description: "API access token"

config_schema:
  - field: api_base
    type: url
    required: false
    default: "https://api.example.com"
    description: "API base URL"
  - field: region
    type: enum
    required: true
    enum_values: [us, eu, ap]
    description: "API region"
字段类型必需默认值描述
fieldstring字段名(用作 credential_fields / resource_config 中的键)
typestringstringintegersecreturlenum
requiredbooltrue是否必填
defaultanynull默认值
enum_valueslist[string]null有效值(当 type: enum 时)
descriptionstring""UI 中显示的帮助文本
  • credential_schema 字段在数据库中使用 加密存储,永远不会暴露给 Bot。
  • config_schema 字段以明文存储(用于非敏感配置如 API URL)。

executor — 执行器加载配置

executor:
  module: executor           # Python 模块名(相对于插件目录)
  class_name: GitHubExecutor # 可选:要加载的类(不指定则自动检测)

如果省略 class_name,加载器会扫描模块中第一个 ExecutorBase 子类。

tool_discovery 模式

tool_discovery 字段控制工具如何注册:

static(默认) — 工具在清单中定义,启动时加载。适用于绝大多数插件。ToolCatalog 在加载时一次性索引你的 tools[]

dynamic — 工具在运行时按资源实例注册。清单不声明 tools[];工具通过 ToolCatalog.register_dynamic_tools(resource_id, tools) 动态注册和注销。

动态工具的执行流程:

  1. 外部实体连接并为特定 resource_id 注册工具
  2. ToolCatalog.register_dynamic_tools(resource_id, tool_defs) 存储工具
  3. ToolResolver 检测到 tool_discovery: dynamic,调用 catalog.get_resource_tools(resource_id) 而非 catalog.get_type_tools(type_id)
  4. 每个资源实例拥有独立的工具列表
  5. 实体断开连接时,ToolCatalog.unregister_dynamic_tools(resource_id) 移除工具

两个内置执行器使用动态发现:

  • monstrum-agent — Monstrum Agent — 外部 Agent 通过 WebSocket(/ws/agent)连接,使用 API Key 认证后注册工具定义。平台将这些工具提供给绑定到该 Agent 资源的 Bot。Agent 断开连接时工具自动注销。
  • mcp — 平台在启动时(以及资源/凭据创建或更新时)自动从 MCP 服务器发现工具。每个 MCP 服务器暴露各自的工具集,按资源实例独立注册。发现的工具持久化到凭据记录(discovered_tools_jsondiscovery_statuslast_discovery_at),重启后无需重连即可恢复。

对于两种动态类型,Bot 绑定使用 allowed_tools(glob 模式)做逐工具权限控制。前端在绑定时显示发现的工具复选框,而非操作复选框。

何时使用 dynamic: 仅当每个资源实例在运行时暴露不同的工具集时。如果你的工具是固定的,使用 static

configured — 工具按资源配置定义(存储在 resource config 中)。插件不常用。


执行器实现

ExecutorBase — 抽象基类

每个执行器都继承 ExecutorBase(或 HttpExecutorBase)。从 SDK 导入:

from monstrum_sdk import ExecutorBase, ExecuteRequest, ExecuteResult, ExecuteStatus

需要设置的类变量:

class MyExecutor(ExecutorBase):
    resource_type = "my_plugin"      # 必须与清单中的 resource_type.id 匹配
    supported_operations = [          # 此执行器处理的操作
        "data.read",
        "data.write",
    ]
    OPERATION_HANDLERS = {            # operation → handler 方法名
        "data.read": "_handle_read",
        "data.write": "_handle_write",
    }

HttpExecutorBase — HTTP API 基类

对于调用 REST API 的插件(绝大多数),改为继承 HttpExecutorBase。它提供:

  • 自动认证头 — 从凭据中获取 Bearer Token
  • 401 自动刷新 — Token 过期时调用 credential_refresh() 并重试
  • HTTP 便捷方法_http_get_http_post_http_patch_http_delete
  • 分页 — 通过 _paginate() 支持 GitHub 风格的 Link header 分页
from monstrum_sdk import HttpExecutorBase

class MyExecutor(HttpExecutorBase):
    resource_type = "my_plugin"
    default_api_base = "https://api.example.com"   # 所有请求的基础 URL
    default_headers = {"Accept": "application/json"}  # 合并到每个请求
    default_timeout = 30.0                          # httpx 超时(秒)

HTTP 方法:

# GET — 返回解析后的 JSON
data = await self._http_get(request, "/endpoint", params={"key": "val"})

# POST — 返回解析后的 JSON
data = await self._http_post(request, "/endpoint", json={"key": "val"})

# PATCH — 返回解析后的 JSON
data = await self._http_patch(request, "/endpoint", json={"key": "val"})

# DELETE — 返回原始 httpx.Response
resp = await self._http_delete(request, "/endpoint")

# 分页(GitHub 风格 Link header)— 返回扁平列表
all_items = await self._paginate(request, "/items", per_page=100)

# 底层方法(完全控制)
result = await self._http_request(
    request,
    method="PUT",
    path="/endpoint",
    json={"key": "val"},
    raw_response=False,  # True → 返回 httpx.Response 而非 JSON
)

覆盖认证头:

如果你的 API 不使用 Bearer Token(如 Basic Auth、Header 中的 API Key),覆盖 _build_auth_headers()

def _build_auth_headers(self, request: ExecuteRequest) -> dict[str, str]:
    headers = dict(self.default_headers)
    if request.credential_fields:
        api_key = request.credential_fields.get("api_key", "")
        headers["X-API-Key"] = api_key
    return headers

覆盖 API 基础 URL:

API 基础 URL 优先从 request.resource_config["api_base"] 获取,否则使用 default_api_base。覆盖 _get_api_base() 实现自定义逻辑:

def _get_api_base(self, request: ExecuteRequest) -> str:
    if request.resource_config:
        region = request.resource_config.get("region", "us")
        return f"https://api.{region}.example.com"
    return self.default_api_base

Web3ExecutorBase — EVM 链上操作基类

如果你的插件需要与 EVM 兼容链(Ethereum、Polygon、Base、Arbitrum 等)交互,请继承 Web3ExecutorBase。它提供:

  • Web3 实例管理 — 按 RPC URL 缓存,从 resource config 自动配置
  • 账户管理 — 私钥处理(永远不暴露给 LLM)
  • ERC20 标准 ABI — 内置常用 Token 操作 ABI
  • Gas 价格守卫 — 可配置最大 gas 价格上限
  • 异步包装 — 所有同步 web3.py 调用通过 asyncio.to_thread() 包装
from monstrum_sdk import Web3ExecutorBase

class MyDeFiExecutor(Web3ExecutorBase):
    resource_type = "my_defi"
    supported_operations = ["swap", "provide_liquidity"]

    async def _handle_swap(self, request):
        w3 = self._w3(request)           # 缓存的 Web3 实例
        account = self._get_account(request)  # 从凭据 private_key 构建
        # 使用 w3 和 account 执行 DeFi 操作...

原语方法(均为异步,内部使用 asyncio.to_thread):

方法说明
_get_balance(request, address, token_address?)原生或 ERC20 Token 余额
_transfer(request, to, value)原生代币转账(value 单位为 ether)
_call_contract(request, contract, abi, function, args?)只读合约调用
_send_transaction(request, contract, abi, function, args?, value?)写合约调用
_get_transaction(request, tx_hash)交易详情 + 回执
_read_events(request, contract, abi, event, from_block, to_block)事件日志读取
_estimate_gas(request, to, value?, data?)Gas 估算
_wait_for_receipt(request, tx_hash, timeout?)等待交易确认

ExecuteRequest — 请求对象

每个 handler 接收一个包含所有上下文的 ExecuteRequest

@dataclass
class ExecuteRequest:
    request_id: str           # 唯一请求 ID(用于审计追踪)
    bot_id: str               # 执行操作的 Bot
    task_id: str              # 任务 ID(用于分组关联调用)
    operation: str            # 操作名(如 "issue.read")
    params: dict[str, Any]    # LLM 提供的工具参数

    # 凭据(对 Bot 不可见——由平台注入)
    credential_value: str | None       # 遗留:纯字符串凭据
    credential_fields: dict[str, str] | None  # 结构化凭据字段

    # Scope 和配置
    scope: dict[str, Any] | None          # 权限 Scope 约束
    resource_config: dict[str, Any] | None  # 资源配置

    # 高级
    credential_refresh: Any        # async () -> dict | None(OAuth 刷新)
    resource_id: str | None        # 资源 ID(多资源路由)
    tool_name: str                 # 原始工具名(同 operation 内分发)
    delegate: Any                  # DelegateConstraints(Bot 间委派)

ExecuteResult — 结果对象

Handler 返回 ExecuteResult。使用工厂方法:

# 成功 — 数据返回给 LLM
return ExecuteResult.success_result({"issues": [...]})

# 错误 — 错误消息返回给 LLM
return ExecuteResult.error_result("Repository not found")

# Scope 违规 — 作为权限拒绝处理
return ExecuteResult.scope_violation("Domain not in allowed list")

错误语义

平台区分三种结果类型,LLM 对每种看到不同的反馈:

结果类型工厂方法LLM 反馈审计状态何时使用
成功ExecuteResult.success_result(data)工具结果数据SUCCESS正常成功执行
执行错误ExecuteResult.error_result(msg)错误消息字符串FAILUREAPI 失败、无效输入、运行时错误
Scope 违规ExecuteResult.scope_violation(reason)"Scope violation: {reason}"FAILURE参数超出授权 Scope

还有第四种类型,发生在你的执行器被调用之前

结果类型来源LLM 反馈何时
权限拒绝Guardian(执行前)"Permission denied: {reason}"操作/工具未被角色授权

error_resultscope_violation 的选择:

仅在参数违反管理员配置的 Scope 约束时使用 scope_violation()——它表示授权问题。其他情况一律使用 error_result():API 错误、无效输入、资源未找到、网络故障。

实际上,大多数插件只使用 error_result(),因为声明式 scope_dimensions 已经通过 Guardian 自动处理 Scope 违规。你只在自定义 validate_scope() 覆盖中使用 scope_violation(),处理无法声明式表达的复杂逻辑。

LLM 对错误的行为: 当 LLM 收到错误或 Scope 违规时,它通常会调整策略——用不同参数重试、告知用户限制、或选择替代工具。平台不会自动重试工具调用;由 LLM 决定下一步。

模板方法:execute()

基类提供默认的 execute(),通过 OPERATION_HANDLERS 分发到你的 handler。你通常不需要覆盖 execute()——只需定义 handler 映射和方法:

class MyExecutor(HttpExecutorBase):
    OPERATION_HANDLERS = {
        "data.read": "_handle_read",
        "data.write": "_handle_write",
    }

    async def _handle_read(self, request: ExecuteRequest) -> ExecuteResult:
        data = await self._http_get(request, "/data")
        return ExecuteResult.success_result(data)

    async def _handle_write(self, request: ExecuteRequest) -> ExecuteResult:
        data = await self._http_post(request, "/data", json=request.params)
        return ExecuteResult.success_result(data)

默认 execute() 流程:

execute(request)
  ├── 1. 从 OPERATION_HANDLERS 查找 handler
  │      → 未找到则返回 "Unknown operation" 错误
  ├── 2. pre_execute(request)
  │      → 返回 ExecuteResult 则短路
  ├── 3. validate_scope(operation, params, scope)
  │      → 返回错误字符串则 Scope 违规
  ├── 4. handler(request)
  │      → 你的 handler 方法
  └── 5. 异常时:handle_execute_error(request, error)

生命周期钩子

覆盖这些钩子来自定义行为,无需替换 execute()

pre_execute(request) → ExecuteResult | None

在 Scope 验证之前调用。返回 ExecuteResult 可短路(如依赖检查),返回 None 则正常继续。

async def pre_execute(self, request: ExecuteRequest) -> ExecuteResult | None:
    if not self._api_client:
        return ExecuteResult.error_result("API client not configured")
    return None

handle_execute_error(request, error) → ExecuteResult

handler 抛出异常时调用。覆盖以做 API 特定的错误映射:

async def handle_execute_error(
    self, request: ExecuteRequest, error: Exception
) -> ExecuteResult:
    if isinstance(error, httpx.HTTPStatusError):
        status = error.response.status_code
        body = error.response.text[:200]
        return ExecuteResult.error_result(f"API error {status}: {body}")
    return await super().handle_execute_error(request, error)

Scope 校验

对于大多数插件,你完全不需要实现 Scope 校验。只需在清单中声明 scope_dimensions,Guardian 会声明式地处理一切。

仅在复杂校验逻辑无法声明式表达时覆盖 validate_scope()

def validate_scope(
    self,
    operation: str,
    params: dict[str, Any],
    scope: dict[str, Any] | None,
) -> str | None:
    """Scope 校验失败返回错误消息,通过返回 None。"""
    if not scope:
        return None

    # 自定义:检查 URL scheme
    url = params.get("url", "")
    if url:
        from urllib.parse import urlparse
        parsed = urlparse(url)
        if parsed.scheme not in ("http", "https"):
            return f"不支持的 URL scheme: {parsed.scheme}"

    return None

凭据访问

平台将凭据注入 ExecuteRequest——Bot 和 LLM 永远看不到。

使用 _get_token()(推荐用于 Bearer 风格认证):

token = self._get_token(request)  # 读取 credential_fields["access_token"]
token = self._get_token(request, field="api_key")  # 自定义字段名

优先尝试 credential_fields[field],回退到 credential_value

访问多个凭据字段:

if request.credential_fields:
    email = request.credential_fields.get("email", "")
    api_key = request.credential_fields.get("api_key", "")

OAuth Token 刷新:

如果 request.credential_refresh 已设置,平台处理自动 Token 刷新。HttpExecutorBase 在 401 响应时自动调用。对于自定义执行器:

if response.status_code == 401 and request.credential_refresh:
    new_fields = await request.credential_refresh()
    if new_fields:
        request.credential_fields = new_fields
        # 用新凭据重试

tool_name 路由

当多个工具共享同一个 operation 时,使用 request.tool_name 区分:

# 在 monstrum.yaml 中
tools:
  - name: github_add_labels
    operation: issue.label.write    # 相同 operation
    ...
  - name: github_remove_labels
    operation: issue.label.write    # 相同 operation
    ...
# 在 executor.py 中
OPERATION_HANDLERS = {
    "issue.label.write": "_handle_label_write",
}

async def _handle_label_write(self, request: ExecuteRequest) -> ExecuteResult:
    if request.tool_name == "github_remove_labels":
        # 删除逻辑
        ...
    else:
        # 添加逻辑(默认)
        ...

并发与无状态

执行器是单例的。 平台在插件加载时创建一个执行器类的实例,该单一实例在进程生命周期内处理所有并发请求。这有重要的影响:

不要在 self 上存储请求状态。 每个请求作为独立的 ExecuteRequest 对象到达。所有逐调用的数据——凭据、参数、Scope、资源配置——都在请求上,不在执行器实例上。

# 错误 — 跨并发请求共享状态
class BadExecutor(HttpExecutorBase):
    async def _handle_read(self, request: ExecuteRequest) -> ExecuteResult:
        self.current_token = request.credential_fields["access_token"]  # 竞态条件!
        data = await self._http_get(request, "/data")
        return ExecuteResult.success_result(data)

# 正确 — 所有状态都是请求范围的
class GoodExecutor(HttpExecutorBase):
    async def _handle_read(self, request: ExecuteRequest) -> ExecuteResult:
        data = await self._http_get(request, "/data")  # 凭据通过 request 流转
        return ExecuteResult.success_result(data)

实例状态仅用于初始化。 常量、配置和可复用的客户端(如 httpx 连接池)可以放在 self 上。逐请求的数据不可以。

Handler 是 async 的。 所有 handler 方法使用 async def,应使用 await 进行 I/O。永远不要使用阻塞调用(requests.gettime.sleep)——它们会阻塞事件循环,使所有并发请求停滞。

SDK 函数

将执行器的能力作为独立函数暴露,供程序化使用(不仅仅是 LLM 工具调用):

def get_sdk_functions(self) -> dict[str, Any]:
    return {
        "search": self._sdk_search,
        "create": self._sdk_create,
    }

async def _sdk_search(self, *, query: str, max_results: int = 10) -> dict:
    """搜索项目。可通过 platform.my_plugin.search(...) 调用"""
    # 实现...
    return {"results": [...]}

这些函数通过 Platform SDK 访问:

from monstrum_sdk import platform

results = await platform.my_plugin.search(query="bug", max_results=5)

重要: SDK 函数绕过 Guardian 权限管道。它们是对执行器方法的直接调用,没有 Scope 检查或审计日志。参见 PluginClient vs Platform SDK 了解各自的使用场景。


国际化 (i18n)

locales/ 目录中创建 JSON 文件(可通过清单中的 locales_dir 配置):

locales/en-US.json

{
  "description": "Jira integration plugin",
  "tools.jira_list_issues.description": "List issues from a Jira project.",
  "tools.jira_create_issue.description": "Create a new Jira issue.",
  "scope_dimensions.projects.error_template": "Project {value} is not authorized",
  "auth_methods.0.label": "API Token",
  "auth_methods.0.description": "Authenticate with email + API token"
}

locales/zh-CN.json

{
  "description": "Jira 集成插件",
  "tools.jira_list_issues.description": "列出 Jira 项目的 Issue。",
  "tools.jira_create_issue.description": "创建新的 Jira Issue。",
  "scope_dimensions.projects.error_template": "项目 {value} 不在授权范围内",
  "auth_methods.0.label": "API 令牌",
  "auth_methods.0.description": "使用邮箱 + API 令牌认证"
}

键命名约定:

键模式覆盖内容
description插件描述
tools.{tool_name}.description工具描述
scope_dimensions.{key}.error_templateScope 错误消息
auth_methods.{index}.label认证方式显示名
auth_methods.{index}.description认证方式帮助文本

平台根据用户的语言偏好应用翻译。

关于 auth_methods 键的注意事项: 认证方式的翻译使用基于索引的键(auth_methods.0.labelauth_methods.1.label),而不是基于方法名的键。这意味着在清单中重新排列 auth_methods 的顺序会破坏翻译映射。翻译发布后请保持 auth_methods 的顺序稳定,或同步更新 locale 文件。


PluginClient — 跨插件组合

PluginClient 通过完整的权限管道调用工具(ToolExecutor → Guardian → Executor → Auditor)。当需要权限执行和审计日志时,这是插件、工作流和技能调用其他插件工具的正确方式。

from monstrum_sdk import get_plugin_client, PluginError

# 创建绑定到特定 Bot 和任务的客户端
github = get_plugin_client(
    "github",
    bot_id="bot-123",
    task_id="task-456",
    workspace_id="ws-789",
)

# 用短名调用工具(前缀自动添加)
try:
    issues = await github.list_issues(repo="myorg/myrepo", state="open")
    # 内部调用工具 "github_list_issues"
    # 经过:Guardian Scope 检查 → GitHubExecutor → Auditor

    await github.create_issue(
        repo="myorg/myrepo",
        title="Bug: login fails",
        body="Steps to reproduce...",
    )
except PluginError as e:
    print(f"Failed: {e.message} (status: {e.status})")

PluginClient vs Platform SDK:治理边界

这个区别至关重要:

方面PluginClientPlatform SDK
权限执行完整 Guardian 检查
审计日志
凭据解析Bot 特定的绑定显式提供或无
Scope 约束评估并执行绕过
使用场景Bot 上下文中的跨插件工具调用基础设施代码的直接执行器访问
导入get_plugin_client()platform.{type}.{fn}()

经验法则: 如果调用来自 Bot 的执行上下文(工具 handler、工作流步骤、技能),使用 PluginClient。Bot 的权限、Scope 约束和委派限制都会生效。如果调用是在任何 Bot 上下文之外运行的平台基础设施代码(调度器、系统维护任务),使用 Platform SDK。

当你应该使用 PluginClient 却使用了 Platform SDK 时,会产生治理漏洞:调用绕过所有权限检查、Scope 约束和审计日志。在多租户环境中,这意味着 Bot 可能访问未被授权的资源。


Platform SDK

platform 单例提供对内置执行器能力和横切基础设施的访问。这些调用绕过 Guardian 权限管道——它们直接调用执行器方法,没有 Scope 检查或审计日志。

from monstrum_sdk import platform

platform.oauth — OAuth Token 管理

# 列出为资源类型配置的 OAuth Provider
providers = await platform.oauth.list_providers(
    resource_type_id="github",
    workspace_id="ws-123",
)
# 返回:[{"id", "name", "resource_type_id", "client_id", "is_active"}, ...]

# 获取当前有效的 OAuth Token
token_info = await platform.oauth.get_token(credential_id="cred-456")
# 返回:{"access_token", "token_type", "expires_at", "scope"}

platform.events — 事件系统

事件系统允许插件发出事件并订阅平台范围的事件。

发出自定义事件:

result = await platform.events.emit(
    "deploy.completed",                    # 事件名
    data={"version": "2.1.0", "env": "prod"},  # 载荷
    workspace_id="ws-123",
    bot_id="bot-456",
)
# 返回:{"event_id": "...", "event_type": "custom.deploy.completed"}

事件名规则:字母数字、点、下划线、冒号、连字符。最长 128 字符。平台自动添加 custom. 前缀。

订阅事件:

sub = await platform.events.subscribe(
    "task.*",                              # fnmatch 模式
    bot_id="bot-456",
    workspace_id="ws-123",
    instruction="A task event occurred: {event_type}. Data: {data}",
)
# 返回:{"subscription_id": "...", "pattern": "task.*"}

instruction 字段是匹配事件触发时发送给 Bot 的模板。支持的占位符:{event_type}{source_type}{source_id}{data}{metadata}

取消订阅:

result = await platform.events.unsubscribe(
    "sub-789",
    bot_id="bot-456",  # 所有权验证
)
# 返回:{"subscription_id": "sub-789", "removed": true}

列出订阅:

subs = await platform.events.get_subscriptions(bot_id="bot-456")
# 返回:[{"subscription_id", "pattern", "instruction", "active", "created_at"}, ...]

事件→工作流触发器:

事件也可以直接触发工作流(无需经过 Bot)。使用工作流触发器 REST API:

POST   /api/workflows/{workflow_id}/triggers   — 创建触发器(event_pattern + instruction)
GET    /api/workflows/{workflow_id}/triggers    — 列出触发器
DELETE /api/workflows/{workflow_id}/triggers/{trigger_id} — 删除触发器

当匹配的事件发生时,平台自动以事件数据作为输入执行关联的工作流。触发器持久化到 workflow_triggers 表中,启动时自动加载到 EventDispatcher。

内置事件类型:

模式来源描述
task.completedAgentRuntime任务成功完成
task.failedAgentRuntime任务失败
task.cancelledAgentRuntime任务被取消
workflow.completedWorkflowExecutor工作流完成
workflow.failedWorkflowExecutor工作流失败
schedule.firedSchedulerService定时事件触发
session.createdSessionManager新会话开始
session.expiredSessionManager会话超时
custom.*Bot 通过 emit自定义事件

内置执行器命名空间

直接访问内置执行器能力。注意:这些绕过 Guardian,不记录审计日志。

# SSH
result = await platform.ssh.run(
    host="prod-01",
    command="df -h",
    credential="ssh-key-content",
    timeout=30,
)

# MCP (Model Context Protocol — 仅 HTTP 传输)
tools = await platform.mcp.list_tools(
    server="calculator",
    url="https://mcp.example.com/sse",
)
result = await platform.mcp.call_tool(
    server="calculator",
    tool="add",
    arguments={"a": 1, "b": 2},
)

# Bot(跨 Bot 调用)
task = await platform.bot.execute_task(
    target_bot_id="bot-789",
    instruction="Summarize today's issues",
    params={"project": "PROJ"},
)
answer = await platform.bot.query(
    target_bot_id="bot-789",
    question="What is the current sprint velocity?",
)
status = await platform.bot.status(target_bot_id="bot-789")

# Web
results = await platform.web.search(query="Monstrum docs", max_results=5)
page = await platform.web.fetch(url="https://example.com", extract_mode="markdown")

# Web3(EVM 链上操作)
balance = await platform.web3.get_balance(
    resource_id="res-123",
    config={"rpc_url": "https://mainnet.infura.io/v3/KEY", "chain_id": 1},
    address="0x742d35Cc6634C0532925a3b844Bc9e7595f2bD18",
)
tx = await platform.web3.transfer(
    resource_id="res-123",
    config={"rpc_url": "https://mainnet.infura.io/v3/KEY"},
    credential_fields={"private_key": "0x..."},
    to="0x...",
    value="0.1",
)

插件信任与安全模型

理解插件与平台之间的信任边界对插件开发者和平台管理员都很重要。

信任假设

插件在进程内与平台一起运行。没有沙箱、没有代码签名、没有对插件可执行的 Python 代码的运行时限制。平台信任:

  1. 插件来自可信来源。 管理员控制 plugins/ 目录中安装什么,或通过 .mst 包导入什么。
  2. 插件遵循 ExecutorBase 契约。 加载器验证执行器类是 ExecutorBase 的子类,但不限制 handler 方法内运行什么代码。
  3. 插件不篡改平台内部。 插件可以直接导入和调用平台服务,但这样做会绕过所有安全保证。

平台保证什么

尽管在进程内运行,平台在你的插件代码周围提供这些保证:

  • 凭据隔离:你的 handler 通过 ExecuteRequest.credential_fields 接收凭据。平台注入它们;Bot 和 LLM 永远看不到。凭据在静态时使用 加密。
  • Scope 执行:Guardian 在你的 handler 被调用之前评估 scope_dimensions。如果 Scope 检查失败,你的 handler 不会执行。
  • 审计追踪:每次工具调用——包括失败的——都被 Auditor 记录,包含请求 ID、Bot ID、操作、参数和结果状态。
  • 内置类型保护:平台阻止插件覆盖内置类型(sshmcpbot)。

平台不保证什么

  • 无代码沙箱:插件代码可以完全访问 Python 运行时、文件系统和网络。
  • 无导入限制:插件可以导入任何 Python 模块,包括平台内部模块。
  • 无运行时资源限制:没有对插件执行的 CPU、内存或网络配额。
  • 无代码审查或签名.mst 导入验证清单结构,不验证代码安全性。

对插件开发者的影响

  • 你的执行器与所有其他插件和平台本身在同一进程中运行。handler 中的崩溃可能影响整个平台。
  • 不要直接访问平台数据库或内部状态。使用 PluginClient 或 Platform SDK。
  • 不要启动后台线程或长时间运行的进程。执行器处理单个请求;平台管理生命周期。
  • 将凭据字段视为敏感数据——不要记录、缓存或在预期 API 调用之外传输它们。

对管理员的影响

  • 只安装来自可信来源的插件。部署前审查执行器代码。
  • 使用 Scope 约束限制任何 Bot 通过插件能做什么,无论插件代码允许什么。
  • 监控审计日志,关注可能表明插件行为异常的意外工具调用模式。

测试

单元测试执行器

import pytest
from monstrum_sdk import ExecuteRequest, ExecuteResult


def _make_request(operation, params=None, **kwargs):
    return ExecuteRequest(
        request_id="test-req",
        bot_id="test-bot",
        task_id="test-task",
        operation=operation,
        params=params or {},
        **kwargs,
    )


class TestMyExecutor:
    @pytest.fixture
    def executor(self):
        from plugins.my_plugin.executor import MyExecutor
        return MyExecutor()

    async def test_read_success(self, executor, httpx_mock):
        httpx_mock.add_response(
            url="https://api.example.com/data",
            json={"items": [1, 2, 3]},
        )
        request = _make_request(
            "data.read",
            params={"query": "test"},
            credential_fields={"access_token": "test-token"},
        )
        result = await executor.execute(request)
        assert result.success
        assert result.data["items"] == [1, 2, 3]

    async def test_unknown_operation(self, executor):
        request = _make_request("invalid.op")
        result = await executor.execute(request)
        assert not result.success
        assert "Unknown operation" in result.error

    async def test_scope_validation(self, executor):
        error = executor.validate_scope(
            "data.read",
            {"project": "SECRET"},
            {"projects": ["PUBLIC-*"]},
        )
        assert error is not None

平台集成测试

from unittest.mock import AsyncMock, MagicMock, patch


async def test_plugin_client_integration():
    mock_tool_executor = AsyncMock()
    mock_tool_executor.execute.return_value = MagicMock(
        success=True,
        result={"issues": []},
    )

    with patch("services.runner.state.get_runner_state") as mock_state:
        mock_state.return_value.tool_executor = mock_tool_executor
        from monstrum_sdk import get_plugin_client

        client = get_plugin_client("github", bot_id="b1", task_id="t1")
        result = await client.list_issues(repo="org/repo")
        assert result == {"issues": []}

运行测试

# 运行插件测试
pytest tests/plugins/my_plugin/ -v

# 运行完整测试套件检查回归
pytest tests/ -x -q

# Lint
ruff check plugins/my_plugin/

常见陷阱

1. 在 self 上存储请求状态

执行器是单例的——同一个实例处理所有并发请求。在 self 上存储逐请求的数据会导致竞态条件:

# 错误
self.current_user = request.credential_fields["email"]
data = await self._http_get(request, "/data")  # 另一个请求覆盖了 self.current_user

# 正确 — 使用请求范围的数据
data = await self._http_get(request, "/data")  # 凭据通过 request 对象流转

2. 使用阻塞 I/O

所有 handler 都是 async 的。阻塞调用会使整个事件循环停滞:

# 错误 — 阻塞事件循环
import requests
response = requests.get("https://api.example.com/data")

# 正确 — 使用 async HTTP
data = await self._http_get(request, "/data")

3. 在执行器中实现权限检查

Scope 检查属于 scope_dimensions,不属于 handler 代码:

# 错误 — 在 handler 中手动检查权限
async def _handle_read(self, request: ExecuteRequest) -> ExecuteResult:
    repo = request.params["repo"]
    if not self._is_repo_allowed(repo, request.scope):  # 重新发明 Guardian
        return ExecuteResult.error_result("Not allowed")
    ...

# 正确 — 在清单中声明,Guardian 处理
# scope_dimensions:
#   - key: repos
#     param_paths: [repo]
#     match_mode: pattern

4. 对非 Scope 错误使用 scope_violation()

scope_violation() 表示授权问题。不要用于 API 错误:

# 错误 — API 404 不是 Scope 违规
if response.status_code == 404:
    return ExecuteResult.scope_violation("Repository not found")

# 正确 — 这是执行错误
if response.status_code == 404:
    return ExecuteResult.error_result("Repository not found")

5. 忘记 resource_type 必须与清单 id 匹配

执行器中的 resource_type 类变量必须与 monstrum.yaml 中的 resource_type.id 完全匹配:

# monstrum.yaml
resource_type:
  id: my-plugin  # 这个字符串...
# executor.py
class MyExecutor(HttpExecutorBase):
    resource_type = "my-plugin"  # ...必须与这个匹配

6. 发布翻译后重排 auth_methods

认证方式的翻译键是基于索引的(auth_methods.0.labelauth_methods.1.label)。重排清单中 auth_methods 数组的顺序会破坏翻译映射。如需重排,请同步更新 locale 文件。

7. 需要治理时使用 Platform SDK

如果你的代码在 Bot 上下文中运行且调用另一个插件,使用 PluginClient——不是 Platform SDK。Platform SDK 绕过所有权限检查和审计日志:

# 错误 — 在 Bot handler 中绕过治理
async def _handle_deploy(self, request: ExecuteRequest) -> ExecuteResult:
    await platform.ssh.run(host="prod", command="deploy.sh", ...)  # 没有 Scope 检查!

# 正确 — 使用 PluginClient 进行受治理的访问
async def _handle_deploy(self, request: ExecuteRequest) -> ExecuteResult:
    ssh = get_plugin_client("ssh", bot_id=request.bot_id, task_id=request.task_id)
    await ssh.run(host="prod", command="deploy.sh")  # Guardian 执行 Scope 检查

8. 记录凭据值

永远不要记录凭据字段。它们包含密钥(API Key、Token、密码):

# 错误
logger.info(f"Calling API with token: {request.credential_fields}")

# 正确
logger.info(f"Calling API for bot={request.bot_id}, operation={request.operation}")

打包与分发

.mst 文件格式

插件可以打包为 .mst 文件(ZIP 格式)进行分发:

cd plugins/
zip -r my_plugin.mst my_plugin/

通过 CLI 安装:

monstrum plugin install my_plugin
monstrum plugin install my_plugin@1.0.0  # 指定版本

插件生命周期

# 安装
monstrum plugin install <package>[@version]

# 卸载
monstrum plugin uninstall <package>

# 列出已安装的插件
monstrum plugin list

# 搜索插件
monstrum plugin search <query>

# 查看插件详情
monstrum plugin info <package>

# 更新插件
monstrum plugin update [package]

完整参考:GitHub 插件

GitHub 插件是标准参考实现。研究它以了解最佳实践。

文件:plugins/github/monstrum.yaml

name: github
version: 1.0.0
description: GitHub integration plugin — issues, comments, labels, and repository info
author: Monstrum
license: MIT
tags: [scm, github, issues]
repository: https://github.com/MonstrumAI/monstrum

resource_type:
  id: github
  name: GitHub
  mode: plugin
  tool_discovery: static
  description: "GitHub 代码托管平台"
  icon: github          # 语义名称(映射 Ant Design 图标)或文件名(如 icon.svg)
  auth_flow: oauth

  credential_schema:
    - field: access_token
      type: secret
      required: true
      description: "GitHub Access Token"

  config_schema:
    - field: api_base
      type: url
      required: false
      default: "https://api.github.com"
      description: "GitHub API base URL (customize for GitHub Enterprise)"

  auth_methods:
    - method: oauth2_auth_code
      label: OAuth Login
      description: "Authorize via GitHub OAuth"
      credential_schema:
        - field: access_token
          type: secret
          required: true
          description: "OAuth Access Token (obtained automatically)"
      oauth_config:
        authorization_url: "https://github.com/login/oauth/authorize"
        token_url: "https://github.com/login/oauth/access_token"
        scopes: [repo, "read:org"]
    - method: token
      label: Personal Access Token
      description: "Configure manually with a GitHub PAT"
      credential_schema:
        - field: access_token
          type: secret
          required: true
          description: "GitHub Personal Access Token"

  tools:
    - name: github_list_issues
      description: "List issues from a GitHub repository."
      operation: issue.read
      input_schema:
        type: object
        properties:
          repo: { type: string, description: "owner/repo format" }
          state: { type: string, enum: [open, closed, all], default: open }
          labels: { type: array, items: { type: string } }
          since: { type: string, description: "ISO 8601 timestamp" }
          per_page: { type: integer, default: 30 }
        required: [repo]

    - name: github_create_issue
      description: "Create a new issue in a GitHub repository."
      operation: issue.write
      input_schema:
        type: object
        properties:
          repo: { type: string }
          title: { type: string }
          body: { type: string }
          labels: { type: array, items: { type: string } }
          assignees: { type: array, items: { type: string } }
        required: [repo, title]

    - name: github_update_issue
      description: "Update an existing GitHub issue."
      operation: issue.write
      input_schema:
        type: object
        properties:
          repo: { type: string }
          issue_number: { type: integer }
          title: { type: string }
          body: { type: string }
          state: { type: string, enum: [open, closed] }
          labels: { type: array, items: { type: string } }
          assignees: { type: array, items: { type: string } }
        required: [repo, issue_number]

    - name: github_add_labels
      description: "Add labels to a GitHub issue."
      operation: issue.label.write
      input_schema:
        type: object
        properties:
          repo: { type: string }
          issue_number: { type: integer }
          labels: { type: array, items: { type: string } }
        required: [repo, issue_number, labels]

    - name: github_remove_labels
      description: "Remove labels from a GitHub issue."
      operation: issue.label.write
      input_schema:
        type: object
        properties:
          repo: { type: string }
          issue_number: { type: integer }
          labels: { type: array, items: { type: string } }
        required: [repo, issue_number, labels]

    - name: github_add_comment
      description: "Add a comment to a GitHub issue."
      operation: issue.comment.write
      input_schema:
        type: object
        properties:
          repo: { type: string }
          issue_number: { type: integer }
          body: { type: string }
        required: [repo, issue_number, body]

    - name: github_list_comments
      description: "List comments on a GitHub issue."
      operation: issue.comment.read
      input_schema:
        type: object
        properties:
          repo: { type: string }
          issue_number: { type: integer }
        required: [repo, issue_number]

    - name: github_get_repo
      description: "Get information about a GitHub repository."
      operation: repo.read
      input_schema:
        type: object
        properties:
          repo: { type: string }
        required: [repo]

  scope_dimensions:
    - key: repos
      param_paths: [repo, owner_repo]
      match_mode: pattern
      error_template: "Repository {value} is not authorized"

executor:
  module: executor
  class_name: GitHubExecutor

文件:plugins/github/executor.py

from __future__ import annotations

import logging

import httpx

from monstrum_sdk import ExecuteRequest, ExecuteResult, HttpExecutorBase

logger = logging.getLogger(__name__)


class GitHubExecutor(HttpExecutorBase):
    resource_type = "github"
    default_api_base = "https://api.github.com"
    default_headers = {
        "Accept": "application/vnd.github+json",
        "X-GitHub-Api-Version": "2022-11-28",
    }
    supported_operations = [
        "repo.read",
        "issue.read",
        "issue.write",
        "issue.comment.read",
        "issue.comment.write",
        "issue.label.write",
    ]

    OPERATION_HANDLERS = {
        "repo.read": "_handle_repo_read",
        "issue.read": "_handle_issue_read",
        "issue.write": "_handle_issue_write",
        "issue.comment.read": "_handle_comment_read",
        "issue.comment.write": "_handle_comment_write",
        "issue.label.write": "_handle_label_write",
    }

    async def handle_execute_error(
        self, request: ExecuteRequest, error: Exception
    ) -> ExecuteResult:
        if isinstance(error, httpx.HTTPStatusError):
            logger.error(f"GitHub API error: {error}")
            return ExecuteResult.error_result(
                f"GitHub API error: {error.response.status_code} "
                f"- {error.response.text[:200]}"
            )
        if isinstance(error, httpx.RequestError):
            logger.error(f"GitHub request error: {error}")
            return ExecuteResult.error_result(
                f"GitHub request error: {str(error)}"
            )
        logger.exception(f"GitHub execution error: {error}")
        return ExecuteResult.error_result(f"Execution error: {str(error)}")

    # ── Helpers ──

    @staticmethod
    def _parse_repo(params: dict) -> tuple[str, str] | None:
        repo = params.get("repo", "")
        if not repo or "/" not in repo:
            return None
        return tuple(repo.split("/", 1))

    # ── Handlers ──

    async def _handle_repo_read(self, request: ExecuteRequest) -> ExecuteResult:
        parsed = self._parse_repo(request.params)
        if not parsed:
            return ExecuteResult.error_result("Invalid repo format, expected 'owner/repo'")
        owner, name = parsed
        data = await self._http_get(request, f"/repos/{owner}/{name}")
        return ExecuteResult.success_result(data)

    async def _handle_issue_read(self, request: ExecuteRequest) -> ExecuteResult:
        parsed = self._parse_repo(request.params)
        if not parsed:
            return ExecuteResult.error_result("Invalid repo format")
        owner, name = parsed
        issue_number = request.params.get("issue_number")

        if issue_number:
            data = await self._http_get(request, f"/repos/{owner}/{name}/issues/{issue_number}")
            return ExecuteResult.success_result(data)

        params = {}
        if state := request.params.get("state"):
            params["state"] = state
        if labels := request.params.get("labels"):
            params["labels"] = ",".join(labels) if isinstance(labels, list) else labels
        if since := request.params.get("since"):
            params["since"] = since
        if per_page := request.params.get("per_page"):
            params["per_page"] = per_page

        data = await self._http_get(request, f"/repos/{owner}/{name}/issues", params=params or None)
        return ExecuteResult.success_result(data)

    async def _handle_issue_write(self, request: ExecuteRequest) -> ExecuteResult:
        parsed = self._parse_repo(request.params)
        if not parsed:
            return ExecuteResult.error_result("Invalid repo format")
        owner, name = parsed
        issue_number = request.params.get("issue_number")

        body = {}
        for key in ("title", "body", "labels", "assignees", "state"):
            if val := request.params.get(key if key != "body" else "body"):
                body[key] = val

        if issue_number:
            data = await self._http_patch(
                request, f"/repos/{owner}/{name}/issues/{issue_number}", json=body,
            )
        else:
            data = await self._http_post(
                request, f"/repos/{owner}/{name}/issues", json=body,
            )
        return ExecuteResult.success_result(data)

    async def _handle_comment_read(self, request: ExecuteRequest) -> ExecuteResult:
        parsed = self._parse_repo(request.params)
        if not parsed:
            return ExecuteResult.error_result("Invalid repo format")
        issue_number = request.params.get("issue_number")
        if not issue_number:
            return ExecuteResult.error_result("issue_number is required")
        owner, name = parsed
        data = await self._http_get(
            request, f"/repos/{owner}/{name}/issues/{issue_number}/comments",
        )
        return ExecuteResult.success_result(data)

    async def _handle_comment_write(self, request: ExecuteRequest) -> ExecuteResult:
        parsed = self._parse_repo(request.params)
        if not parsed:
            return ExecuteResult.error_result("Invalid repo format")
        issue_number = request.params.get("issue_number")
        body = request.params.get("body")
        if not issue_number:
            return ExecuteResult.error_result("issue_number is required")
        if not body:
            return ExecuteResult.error_result("Comment body is required")
        owner, name = parsed
        data = await self._http_post(
            request, f"/repos/{owner}/{name}/issues/{issue_number}/comments",
            json={"body": body},
        )
        return ExecuteResult.success_result(data)

    async def _handle_label_write(self, request: ExecuteRequest) -> ExecuteResult:
        """添加或删除标签。通过 tool_name 区分。"""
        parsed = self._parse_repo(request.params)
        if not parsed:
            return ExecuteResult.error_result("Invalid repo format")
        issue_number = request.params.get("issue_number")
        labels = request.params.get("labels", [])
        if not issue_number:
            return ExecuteResult.error_result("issue_number is required")
        if not labels:
            return ExecuteResult.error_result("labels are required")
        owner, name = parsed
        is_remove = request.tool_name == "github_remove_labels"

        if is_remove:
            results = []
            for label in labels:
                resp = await self._http_delete(
                    request, f"/repos/{owner}/{name}/issues/{issue_number}/labels/{label}",
                )
                results.append({"label": label, "removed": resp.status_code == 200})
            return ExecuteResult.success_result(results)

        data = await self._http_post(
            request, f"/repos/{owner}/{name}/issues/{issue_number}/labels",
            json={"labels": labels},
        )
        return ExecuteResult.success_result(data)

API 参考

monstrum_sdk 导出

from monstrum_sdk import (
    # 执行器基类
    ExecutorBase,         # 所有执行器的抽象基类
    HttpExecutorBase,     # HTTP API 执行器基类
    Web3ExecutorBase,     # EVM 链上操作执行器基类
    ExecuteRequest,       # 请求数据类
    ExecuteResult,        # 结果数据类
    ExecuteStatus,        # 枚举:SUCCESS, ERROR, SCOPE_VIOLATION

    # 资源模型
    ToolDef,              # 工具定义
    ScopeDimension,       # 权限维度
    FieldDef,             # 字段定义(凭据/配置)
    AuthMethod,           # 枚举:OAUTH2_AUTH_CODE, API_KEY, TOKEN, ...
    AuthMethodDef,        # 认证方式声明
    OAuthProviderConfig,  # OAuth 端点配置

    # 插件清单
    PluginManifest,       # 完整插件清单
    PluginResourceType,   # 清单中的 ResourceType
    PluginExecutorDef,    # 执行器加载配置

    # PluginClient(通过 Guardian 的工具级调用)
    PluginClient,         # 高级工具调用器
    PluginError,          # 工具调用失败异常
    get_plugin_client,    # 工厂函数

    # Platform SDK(内置执行器能力)
    Platform,             # 能力入口
    PlatformError,        # 能力调用失败异常
    platform,             # 全局单例
)

ExecutorBase 方法

方法签名描述
supports_operation(operation: str) → bool检查是否支持操作(支持 glob 通配符)
validate_scope(operation, params, scope) → str | None自定义 Scope 校验;返回错误或 None
_get_token(request, field="access_token") → str从凭据获取 Token
get_sdk_functions() → dict[str, Callable]暴露 SDK 函数给 Platform SDK
execute(request) → ExecuteResult主入口(模板方法)
pre_execute(request) → ExecuteResult | None分发前钩子
handle_execute_error(request, error) → ExecuteResult错误处理钩子

HttpExecutorBase 方法

方法签名描述
_build_auth_headers(request) → dict[str, str]构建含认证的请求头
_get_api_base(request) → str解析 API 基础 URL
_http_get(request, path, params=None) → AnyHTTP GET → JSON
_http_post(request, path, json=None) → AnyHTTP POST → JSON
_http_patch(request, path, json=None) → AnyHTTP PATCH → JSON
_http_delete(request, path) → httpx.ResponseHTTP DELETE → Response
_http_request(request, method, path, ...) → Any底层 HTTP(支持 raw_response)
_paginate(request, path, params=None, per_page=30) → list通过 Link header 分页

Web3ExecutorBase 方法

方法签名描述
_w3(request) → Web3从 resource config 获取/创建缓存的 Web3 实例
_get_account(request) → Account从凭据 private_key 构建账户
_get_balance(request, address, token_address?) → dict原生或 ERC20 余额
_transfer(request, to, value) → dict原生代币转账
_call_contract(request, contract, abi, function, args?) → dict只读合约调用
_send_transaction(request, contract, abi, function, args?, value?) → dict写合约调用
_get_transaction(request, tx_hash) → dict交易详情 + 回执
_read_events(request, contract, abi, event, from_block, to_block) → dict事件日志
_estimate_gas(request, to, value?, data?) → dictGas 估算
_wait_for_receipt(request, tx_hash, timeout?) → dict等待交易确认
_check_gas_price(request, w3) → None检查 gas 价格是否超过上限
_native_symbol(request) → str从配置获取原生代币符号
_tx_link(request, tx_hash) → str | None构建区块浏览器链接

Platform SDK 命名空间

命名空间方法
platform.oauthlist_providers(resource_type_id, workspace_id), get_token(credential_id)
platform.eventsemit(name, data, ...), subscribe(pattern, bot_id, ...), unsubscribe(sub_id, ...), get_subscriptions(bot_id)
platform.sshrun(host, command, credential, ...)
platform.mcplist_tools(server, ...), call_tool(server, tool, arguments, ...)
platform.botexecute_task(...), query(...), status(...)
platform.websearch(query, ...), fetch(url, ...)
platform.web3get_balance(...), transfer(...), call_contract(...), send_transaction(...), get_transaction(...), read_events(...)
platform.{your_plugin}来自 get_sdk_functions() 的函数

模式匹配工具

shared.utils.matching 模块提供平台内统一使用的模式匹配函数。插件开发者可将其用于自定义 Scope 校验:

from shared.utils.matching import match_glob, match_any, match_path, match_any_path
函数签名描述
match_glob(value, pattern) → boolfnmatch glob 匹配
match_any(value, patterns, *, allow_regex=False) → bool匹配任一模式
match_path(path, pattern) → bool文件系统路径匹配(**/*
match_any_path(path, patterns) → bool路径匹配任一模式

示例:

match_glob("issue.read", "issue.*")    # True
match_glob("anything", "*")            # True

match_any("issue.read", ["issue.*", "pr.*"])          # True
match_any("ls -la", ["^ls.*"], allow_regex=True)      # True

match_path("/home/user/docs/file.txt", "/home/user/**")  # True
match_path("/tmp/file.txt", "/tmp/*")                     # True

match_any_path("/tmp/file", ["/home/**", "/tmp/*"])    # True