diff --git a/.gitignore b/.gitignore index 19d4128..8058323 100644 --- a/.gitignore +++ b/.gitignore @@ -105,3 +105,4 @@ venv.bak/ # mypy .mypy_cache/ +.idea diff --git a/README.md b/README.md index 6fa9a1c..7699332 100644 --- a/README.md +++ b/README.md @@ -101,16 +101,16 @@ All the slaves must build all docker images present in the `config.json` file (i ## Todo -- pylint -- add more docs about `config.json` format -- Extend instance feature -- Display connection string (ex: ssh -p ..., http://host:port, nc host port, ...) -- Better admin panel - - Add challenge host to HTML table - - Monitoring on each hosts - - Search/Select actions filter on HTML table -- Show internal ip: boolean by challenges -- Migrate to FastAPI + React +- [x] pylint +- [ ] add more docs about `config.json` format +- [ ] Extend instance feature +- [x] Display connection string (ex: ssh -p ..., http://host:port, nc host port, ...) +- [ ] Better admin panel + - [ ] Add challenge host to HTML table + - [ ] Monitoring on each hosts + - [ ] Search/Select actions filter on HTML table +- [ ] Show internal ip: boolean by challenges +- [ ] Migrate to FastAPI + React ## Made with diff --git a/app.py b/app.py index 15293ad..f116303 100644 --- a/app.py +++ b/app.py @@ -1,91 +1,102 @@ #!/usr/bin/env python3 -from os import getenv -from datetime import datetime, timedelta +import datetime +import typing +import flask from flask import ( current_app, + flash, + jsonify, + redirect, render_template, - session, request, - redirect, + session, url_for, - jsonify, - flash ) from flask_recaptcha import ReCaptcha +from app.app import create_app +from app.auth import admin_required, login_required from app.config import ( - WEBSITE_TITLE, - CTFD_URL, + ADMIN_ONLY, CHALLENGES, + CTFD_URL, MAX_INSTANCE_COUNT, MAX_INSTANCE_DURATION, MAX_INSTANCE_PER_TEAM, - ADMIN_ONLY + WEBSITE_TITLE, ) +from app.models import Instances from app.utils import ( - get_total_instance_count, - get_challenge_count_per_team, - check_challenge_name, check_access_key, + check_challenge_name, + create_instances, + get_challenge_count_per_team, get_challenge_info, + get_total_instance_count, remove_all_instances, remove_container_by_id, - create_instances, + remove_old_instances, remove_user_running_instance, - remove_old_instances ) -from app.auth import login_required, admin_required -from app.models import Instances -from app.app import create_app app = create_app() recaptcha = ReCaptcha(app) -recaptcha.theme = 'dark' +recaptcha.theme = "dark" -def render(template, **kwargs): - """ - Shortcut for the render_template flask function. - """ - return render_template(template, title=WEBSITE_TITLE, ctfd_url=CTFD_URL, - max_instance_duration=MAX_INSTANCE_DURATION, challenges_option=CHALLENGES, - instances_count=get_total_instance_count(), **kwargs) +def render(template: str, **kwargs: typing.Any) -> str: + """Shortcut for the render_template flask function.""" + return render_template( + template, + title=WEBSITE_TITLE, + ctfd_url=CTFD_URL, + max_instance_duration=MAX_INSTANCE_DURATION, + challenges_option=CHALLENGES, + instances_count=get_total_instance_count(), + **kwargs, + ) -@app.route("/admin", methods=['GET']) +@app.route("/admin", methods=["GET"]) @admin_required -def admin(): - """ - Admin dashboard with all instances. - """ +def admin() -> str: + """Admin dashboard with all instances.""" return render("admin.html") -@app.route("/login", methods=['GET', 'POST']) -def login(): - """ - Handle login process and form. - """ +@app.route("/login", methods=["GET", "POST"]) +def login() -> str | flask.Response: # noqa: PLR0911 + """Handle login process and form.""" + template_name = "login.html" + if session and session["verified"]: return redirect(url_for("index")) if request.method == "GET": - return render("login.html") + return render(template_name) if request.method == "POST": - access_key = request.form['access_key'] if 'access_key' in request.form else None + access_key = request.form.get("access_key") if not access_key: - return render('login.html', error=True, message="Please provide an access key.") + return render( + template_name, + error=True, + message="Please provide an access key.", + ) success, message, user = check_access_key(access_key) if not success: - return render('login.html', error=True, message=message) + return render(template_name, error=True, message=message) if ADMIN_ONLY and not user["is_admin"]: - return render('login.html', error=True, message="You need to be an administrator to login.") + return render( + template_name, + error=True, + message="You need to be an administrator to login.", + ) session["verified"] = True session["user_id"] = user["user_id"] @@ -98,14 +109,14 @@ def login(): return redirect(url_for("login")) -@app.route("/", methods=['GET']) +@app.route("/", methods=["GET"]) @login_required -def index(): +def index() -> str: """ - Display running instances of your team and allows you to submit new instances. + Display running instances of your team and allows you to submit new + instances. """ instances = Instances.query.filter_by(team_id=session["team_id"]).all() - if instances: challenges_info = {} @@ -113,85 +124,103 @@ def index(): for instance in instances: if instance.network_name not in challenges_info: challenges_info[instance.network_name] = [] - - remaining = timedelta(minutes=MAX_INSTANCE_DURATION) - (datetime.utcnow() - instance.creation_date) - if remaining > timedelta(seconds=0): - remaining = f"{remaining.seconds // 60:02d}m{remaining.seconds % 60:02d}s" + + remaining = datetime.timedelta(minutes=MAX_INSTANCE_DURATION) - ( + datetime.datetime.now(datetime.UTC) + - instance.creation_date.replace(tzinfo=datetime.UTC) + ) + if remaining > datetime.timedelta(seconds=0): + remaining = ( + f"{remaining.seconds // 60:02d}m" + f"{remaining.seconds % 60:02d}s" + ) else: remaining = "This instance will be deleted shortly..." - challenges_info[instance.network_name].append({ - "name": instance.challenge_name, - "host": instance.host_domain, - "hostname": instance.hostname, - "ip_address": instance.ip_address, - "ports": instance.ports, - "user_name": instance.user_name, - "time_remaining": remaining - }) - - return render('index.html', challenges=CHALLENGES, captcha=recaptcha, challenges_info=challenges_info) - return render('index.html', challenges=CHALLENGES, captcha=recaptcha) - - -@app.route("/container/all", methods=['GET']) + challenges_info[instance.network_name].append( + { + "id": instance.id, + "name": instance.challenge_name, + "host": instance.host_domain, + "hostname": instance.hostname, + "ip_address": instance.ip_address, + "ports": instance.ports, + "user_name": instance.user_name, + "time_remaining": remaining, + } + ) + + return render( + "index.html", + challenges=CHALLENGES, + captcha=recaptcha, + challenges_info=challenges_info, + ) + return render("index.html", challenges=CHALLENGES, captcha=recaptcha) + + +@app.route("/container/all", methods=["GET"]) @admin_required -def get_all_containers(): - """ - Admin restricted function to retrieve all containers. - """ - return jsonify({'success': True, 'data': - [{ - "id": instance.id, - "team": instance.team_name, - "username": instance.user_name, - "image": instance.docker_image, - "ports": instance.ports, - "instance_name": instance.instance_name, - "date": instance.creation_date - } for instance in Instances.query.all()] - }) - - -@app.route("/container/all", methods=['DELETE']) +def get_all_containers() -> flask.Response: + """Admin restricted function to retrieve all containers.""" + return jsonify( + { + "success": True, + "data": [ + { + "id": instance.id, + "team": instance.team_name, + "username": instance.user_name, + "image": instance.docker_image, + "ports": instance.ports, + "instance_name": instance.instance_name, + "date": instance.creation_date, + } + for instance in Instances.query.all() + ], + } + ) + + +@app.route("/container/all", methods=["DELETE"]) @admin_required -def remove_containers(): - """ - Admin restricted function to remove all containers. - """ +def remove_containers() -> flask.Response: + """Admin restricted function to remove all containers.""" remove_all_instances() - return jsonify({'success': True, 'message': 'Instances removed successfully.'}) + return jsonify( + {"success": True, "message": "Instances removed successfully."} + ) -@app.route("/container/", methods=['DELETE']) +@app.route("/container/", methods=["DELETE"]) @admin_required -def remove_container(container_id=None): - """ - Admin restricted function to remove a container with its ID. - """ +def remove_container(container_id: int) -> flask.Response: + """Admin restricted function to remove a container with its ID.""" remove_container_by_id(container_id) - return jsonify({'success': True, 'message': 'Instances removed successfully.'}) + return jsonify( + {"success": True, "message": "Instances removed successfully."} + ) -@app.route("/remove/me", methods=['GET']) +@app.route("/remove/me", methods=["GET"]) @login_required -def remove_me(): - """ - Allow a user to remove their current instance. - """ +def remove_me() -> flask.Response: + """Allow a user to remove their current instance.""" if remove_user_running_instance(session["user_id"]): - return jsonify({'success': True, 'message': 'Instance removed successfully.'}) + return jsonify( + {"success": True, "message": "Instance removed successfully."} + ) - return jsonify({'success': False, 'message': 'Unable to find an instance to remove.'}) + return jsonify( + {"success": False, "message": "Unable to find an instance to remove."} + ) -@app.route("/logout", methods=['GET']) -def logout(): - """ - Logout the user. - """ +@app.route("/logout", methods=["GET"]) +def logout() -> flask.Response: + """Logout the user.""" keys = list(session.keys()) for key in keys: session.pop(key, None) @@ -199,48 +228,67 @@ def logout(): return redirect(url_for("login")) -@app.route("/run_instance", methods=['POST']) +@app.route("/run_instance", methods=["POST"]) @login_required -def run_instance(): - """ - Allow a user to create a new instance. - """ +def run_instance() -> flask.Response: + """Allow a user to create a new instance.""" challenge_name = request.form.get("challenge_name", None) # Disable captcha on debug mode - if not current_app.config["ENABLE_RECAPTCHA"] and not recaptcha.verify(): + if current_app.config["ENABLE_RECAPTCHA"] and not recaptcha.verify(): flash("Captcha failed.", "red") - return redirect(url_for('index')) + return redirect(url_for("index")) if not challenge_name or challenge_name.strip() == "": flash("Please provide a challenge name.", "red") - return redirect(url_for('index')) + return redirect(url_for("index")) remove_old_instances() if not check_challenge_name(challenge_name): flash("The challenge name is not valid.", "red") - return redirect(url_for('index')) + return redirect(url_for("index")) - if get_challenge_count_per_team(session["team_id"]) >= MAX_CHALLENGES_PER_TEAM: - flash(f"Your team has reached the maximum number of concurrent running instances ({MAX_CHALLENGES_PER_TEAM}).", "red") - return redirect(url_for('index')) + if ( + get_challenge_count_per_team(session["team_id"]) + >= MAX_INSTANCE_PER_TEAM + ): + flash( + ( + "Your team has reached the maximum number of concurrent " + f"running instances ({MAX_INSTANCE_PER_TEAM})." + ), + "red", + ) + return redirect(url_for("index")) remove_user_running_instance(session["user_id"]) - + if get_total_instance_count() > MAX_INSTANCE_COUNT: - flash(f"The maximum number of dynamic instances has been reached (max: {MAX_INSTANCE_COUNT}).", "red") - return redirect(url_for('index')) - + flash( + ( + "The maximum number of dynamic instances has been reached " + f"(max: {MAX_INSTANCE_COUNT})." + ), + "red", + ) + return redirect(url_for("index")) + challenge_info = get_challenge_info(challenge_name) nb_container = create_instances(session, challenge_info) if nb_container > 1: - flash(f"{nb_container} containers are starting for {challenge_name}...", "green") + flash( + f"{nb_container} containers are starting for {challenge_name}...", + "green", + ) else: - flash(f"{nb_container} container is starting for {challenge_name}...", "green") - return redirect(url_for('index')) + flash( + f"{nb_container} container is starting for {challenge_name}...", + "green", + ) + return redirect(url_for("index")) if __name__ == "__main__": - app.run(host='0.0.0.0', port=5000) + app.run(host="0.0.0.0", port=5000) # noqa: S104 diff --git a/app/app.py b/app/app.py index 9003780..ce54425 100644 --- a/app/app.py +++ b/app/app.py @@ -1,24 +1,34 @@ -from flask import Flask -from app.database import db - +import contextlib import logging -from secrets import token_hex +from distutils.util import strtobool from os import getenv +from secrets import token_hex + +from flask import Flask + +from app.database import db -def create_app(): +def create_app() -> Flask: app = Flask(__name__) - app.secret_key = token_hex() - app.debug = getenv("DEBUG", False) in ["1", "True", "TRUE"] app.logger.setLevel(logging.DEBUG) + app.secret_key = token_hex() + + app.debug = False + with contextlib.suppress(ValueError): + app.debug = strtobool(getenv("DEBUG", "0")) app.config["SQLALCHEMY_DATABASE_URI"] = getenv("DATABASE_URI") app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False - app.config["ENABLE_RECAPTCHA"] = getenv("ENABLE_RECAPTCHA", False) + app.config["ENABLE_RECAPTCHA"] = False + with contextlib.suppress(ValueError): + app.config["ENABLE_RECAPTCHA"] = strtobool( + getenv("ENABLE_RECAPTCHA", "0") + ) app.config["RECAPTCHA_SITE_KEY"] = getenv("RECAPTCHA_SITE_KEY", "") - app.config["RECAPTCHA_SECRET_KEY"] = getenv("RECAPTCHA_SECRET_KEY", "") + app.config["RECAPTCHA_SECRET_KEY"] = getenv("RECAPTCHA_SECRET_KEY", "") db.init_app(app) with app.app_context(): diff --git a/app/auth.py b/app/auth.py index ed0a102..7ef584d 100644 --- a/app/auth.py +++ b/app/auth.py @@ -1,23 +1,29 @@ -from flask import session, redirect, url_for - +import typing from functools import wraps +import flask +from flask import redirect, session, url_for + + +P = typing.ParamSpec("P") +R = typing.TypeVar("R", bound=flask.Response) -def login_required(f): + +def login_required(f: typing.Callable[P, R]) -> typing.Callable[P, R]: @wraps(f) - def wrap(*args, **kwargs): + def wrap(*args: P.args, **kwargs: P.kwargs) -> R: if session and session["verified"]: return f(*args, **kwargs) - else: - return redirect(url_for("login")) + return redirect(url_for("login")) + return wrap -def admin_required(f): +def admin_required(f: typing.Callable[P, R]) -> typing.Callable[P, R]: @wraps(f) - def wrap(*args, **kwargs): + def wrap(*args: P.args, **kwargs: P.kwargs) -> R: if session and session["admin"]: return f(*args, **kwargs) - else: - return redirect(url_for("index")) + return redirect(url_for("index")) + return wrap diff --git a/app/config.py b/app/config.py index 6d11f93..f9b8885 100644 --- a/app/config.py +++ b/app/config.py @@ -1,21 +1,27 @@ #!/usr/bin/env python3 -from os import getenv +from distutils.util import strtobool from json import load +from os import getenv +from pathlib import Path from docker import DockerClient +from app.types import ConfigChallengeType, ConfigDockerHostType + -if getenv("DEBUG") and getenv("DEBUG").strip().upper() in ['1', 'TRUE']: - DEBUG = True -else: +try: + DEBUG = strtobool(getenv("DEBUG", "0")) +except ValueError: DEBUG = False -if getenv("ADMIN_ONLY") and getenv("ADMIN_ONLY").strip().upper() in ['1', 'TRUE']: - ADMIN_ONLY = True -else: + +try: + ADMIN_ONLY = strtobool(getenv("ADMIN_ONLY", "0")) +except ValueError: ADMIN_ONLY = False -with open("config.json", "r") as config_file: + +with Path("config.json").open() as config_file: config = load(config_file) WEBSITE_TITLE = config["website_title"] @@ -27,9 +33,8 @@ MIN_PORTS = config["random_ports"]["min"] MAX_PORTS = config["random_ports"]["max"] - - CHALLENGES = config["challenges"] - DOCKER_HOSTS = config["hosts"] + CHALLENGES: list[ConfigChallengeType] = config["challenges"] + DOCKER_HOSTS: list[ConfigDockerHostType] = config["hosts"] for host in DOCKER_HOSTS: host["client"] = DockerClient(base_url=host["api"]) diff --git a/app/database.py b/app/database.py index 2e1eeb6..f606adc 100644 --- a/app/database.py +++ b/app/database.py @@ -1,3 +1,4 @@ from flask_sqlalchemy import SQLAlchemy -db = SQLAlchemy() \ No newline at end of file + +db = SQLAlchemy() diff --git a/app/models.py b/app/models.py index 139afab..473c63d 100644 --- a/app/models.py +++ b/app/models.py @@ -1,19 +1,21 @@ -from app.database import db +import typing from datetime import datetime +from app.database import db + class Instances(db.Model): """ - id (int) : Primary key. - user_id (int) : CTFd User ID. - user_name (str) : CTFd User name. - team_id (int) : CTFd Team ID. - team_name (str) : CTFd Team name. - docker_image (str) : Docker image deployed by the user. - ports (str) : Port mapped for the docker instance. - instance_name (str) : Random name for the instance. - docker_client_id (int) : Challenges hosts ID. - creation_date (date) : Date of instance creation. + id (int) : Primary key. + user_id (int) : CTFd User ID. + user_name (str) : CTFd User name. + team_id (int) : CTFd Team ID. + team_name (str) : CTFd Team name. + docker_image (str) : Docker image deployed by the user. + ports (str) : Port mapped for the docker instance. + instance_name (str) : Random name for the instance. + docker_client_id (int) : Challenges hosts ID. + creation_date (date) : Date of instance creation. """ id = db.Column(db.Integer, primary_key=True) @@ -32,7 +34,12 @@ class Instances(db.Model): host_domain = db.Column(db.String(128), unique=False, nullable=False) ports = db.Column(db.String(256), unique=False, nullable=True) - creation_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + creation_date = db.Column( + db.DateTime, nullable=False, default=datetime.utcnow + ) - def __repr__(self): - return f"[{self.id}] {self.docker_image} on port {self.port}, created at {self.creation_date}." + def __repr__(self: typing.Self) -> str: + return ( + f"[{self.id}] {self.docker_image} on port {self.port}, " + f"created at {self.creation_date}." + ) diff --git a/app/templates/index.html b/app/templates/index.html index 7c4bf4a..2bf6ca7 100644 --- a/app/templates/index.html +++ b/app/templates/index.html @@ -1,5 +1,31 @@ {% extends 'base.html' %} +{% macro quickLaunch(host, service) %} + {% set p, credential = service.split(' @ ') %} + {% set port, proto = p.split('/') %} + {% set username, password = credential.split(':') %} + + {% if proto in ["http", "https"] %} + {{ proto }}://{{ host }}:{{ port }} + {% elif proto == "ssh" %} + + ssh -p {{ port }} {{ username }}@{{ host }} + + {% elif proto == "tcp" %} + + nc {{ host }} {{ port }} + + {% elif proto == "mysql" %} + + mysql -u {{ username }} -p {{ password }} -h {{ host }} -P {{ port }} + + {% else %} +

+ Wtf is this protocol? Just Google that bro. +

+ {% endif %} +{%- endmacro %} + {% block main %}
@@ -51,7 +77,7 @@

Deploy unique instances for your challenges

Logout
- +

Instances of your team {{ session['team_name'] }}:

@@ -69,7 +95,33 @@

Instances of your team {{ session['team_name'] }}:

Remaining time: {{ container['time_remaining'] }} {% endif %} -

Host: {{ container['host'] }}{% if container['ports'] %}, ports: {{ container['ports'] }}{% endif %} ({{ container['hostname'] }})

+ {% set services = container['ports'].split(', ') %} + {% if services |length == 1 %} + {% set port, credential = services[0].split(' @ ') %} +

+ Host: {{ container['host'] }}{% if port %}, Port: {{ port }}{% endif %}{% if credential.split(':')[0] != 'None' %}, Username: {{ credential.split(':')[0] }}{% endif %}{% if credential.split(':')[1] != 'None' %}, Password: {{ credential.split(':')[1] }}{% endif %} ({{ container['id'] }}) +

+ {% elif services |length >= 1 %} +

Host: {{ container['host'] }} ({{ container['id'] }})

+
    + {% for service in services %} + {% set port, credential = service.split(' @ ') %} +
  • + {% if port %}Port: {{ port }}{% endif %}{% if credential.split(':')[0] != 'None' %}, Username: {{ credential.split(':')[0] }}{% endif %}{% if credential.split(':')[1] != 'None' %}, Password: {{ credential.split(':')[1] }}{% endif %} +
  • + {% endfor %} +
+ {% endif %} + {% if container['ports'] %} +

Quick launch :

+
    + {% for service in services %} +
  • + {{ quickLaunch(container['host'], service) }} +
  • + {% endfor %} +
+ {% endif %} {% endfor %}
{% endfor %} @@ -80,6 +132,11 @@

{{ instances_count }} instance{% if instances_count > 1 %}s{% endif %} runni