From f6e55abd608c672babf606159050a842ba3fdba8 Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 16:17:33 +0200 Subject: [PATCH 01/13] update: make readme todo dynamic --- README.md | 20 ++++++++++---------- 1 file changed, 10 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 6fa9a1c..adcb5a7 100644 --- a/README.md +++ b/README.md @@ -101,16 +101,16 @@ All the slaves must build all docker images present in the `config.json` file (i ## Todo -- pylint -- add more docs about `config.json` format -- Extend instance feature -- Display connection string (ex: ssh -p ..., http://host:port, nc host port, ...) -- Better admin panel - - Add challenge host to HTML table - - Monitoring on each hosts - - Search/Select actions filter on HTML table -- Show internal ip: boolean by challenges -- Migrate to FastAPI + React +- [ ] pylint +- [ ] add more docs about `config.json` format +- [ ] Extend instance feature +- [ ] Display connection string (ex: ssh -p ..., http://host:port, nc host port, ...) +- [ ] Better admin panel + - [ ] Add challenge host to HTML table + - [ ] Monitoring on each hosts + - [ ] Search/Select actions filter on HTML table +- [ ] Show internal ip: boolean by challenges +- [ ] Migrate to FastAPI + React ## Made with From 84d1a43346bb9514154b407849f644dd0a0a526a Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 16:43:27 +0200 Subject: [PATCH 02/13] feat: add linter configuration for ruff --- pyproject.toml | 123 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 123 insertions(+) create mode 100644 pyproject.toml diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..adb3da5 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,123 @@ +# ==== PyLint ==== +[tool.pylint.FORMAT] +max-line-length = 79 + +[tool.pylint."MESSAGES CONTROL"] +disable = [ + "missing-docstring", + "invalid-name", +] + +[tool.pylint.DESIGN] +max-parents = 13 + +[tool.pylint.TYPECHECK] +generated-members = [ + "REQUEST", + "acl_users", + "aq_parent", + "[a-zA-Z]+_set{1,2}", + "save", + "delete", +] + + +# ==== Ruff ==== +[tool.ruff] +target-version = "py311" +line-length = 79 +extend-include = ["*.pyi"] +extend-exclude = [ + "docs/*", + "nginx/*", + "data/*" +] +output-format = "grouped" + +[tool.ruff.lint] +ignore = [ + "D100", # Missing docstring in public module + "D101", # Missing docstring in public class + "D102", # Missing docstring in public method + "D103", # Missing docstring in public function + "D104", # Missing docstring in public package + "D105", # Missing docstring in magic method + "D106", # Missing docstring in public nested class + "D107", # Missing docstring in `__init__` + "D203", # one-blank-line-before-class + "D205", # 1 blank line required between summary line and description + "D212", # multi-line-summary-second-line + "D401", # First line of docstring should be in imperative mood: "X" + "PLR2004", # Magic value used in comparison, consider replacing X with a constant variable + "ANN401", # Dynamically typed expressions (typing.Any) are disallowed in `X` + "RUF012", # Mutable class attributes should be annotated with `typing.ClassVar` + "FIX002", # Line contains TODO, consider resolving the issue + "TD002", # Missing author in TODO + "TD003", # Missing issue link on the line following this TODO +] +select = [ + "F", # Pyflakes + "E", # pycodestyle + "C90", # mccabe + "I", # isort + "N", # pep8-naming + "D", # pydocstyle + "UP", # pyupgrade + "ANN", # flake8-annotations + "S", # flake8-bandit + "BLE", # flake8-blind-except + "FBT", # flake8-boolean-trap + "B", # flake8-bugbear + "A", # flake8-builtins + "C4", # flake8-comprehensions + "EM", # flake8-errmsg + "ISC", # flake8-implicit-str-concat + "ICN", # flake8-import-conventions + "G", # flake8-logging-format + "INP", # flake8-no-pep420 + "PIE", # flake8-pie + "T20", # flake8-print + "PYI", # flake8-pyi + "Q", # flake8-quotes + "RSE", # flake8-raise + "RET", # flake8-return + "SLOT", # flake8-slots + "SIM", # flake8-simplify + "TID", # flake8-tidy-imports + "TCH", # flake8-type-checking + "ARG", # flake8-unused-arguments + "PTH", # flake8-use-pathlib + "TD", # flake8-todos + "FIX", # flake8-fixme + "ERA", # eradicate + "PGH", # pygrep-hooks + "PL", # Pylint + "TRY", # tryceratops + "FLY", # flynt + "PERF", # Perflint + "RUF", # Ruff-specific rules +] + +[tool.ruff.lint.isort] +lines-after-imports = 2 + + +[tool.black] +line-length = 79 +target-version = ['py311'] +include = '\.pyi?$' + + +[tool.pyright] +include = ["app"] +exclude = [ + "docs", + "nginx" +] +venvPath = "./" +venv = "venv" +reportMissingImports = false +reportMissingTypeStubs = false +useLibraryCodeForTypes = true +pythonVersion = "3.11" +pythonPlatform = "Linux" From f1f7fa22ebac7dc0a59526bb7fedc21ff33aa5bd Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 16:50:11 +0200 Subject: [PATCH 03/13] update: make app/app.py compliant to linter --- app/app.py | 28 +++++++++++++++++++--------- 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/app/app.py b/app/app.py index 9003780..ce54425 100644 --- a/app/app.py +++ b/app/app.py @@ -1,24 +1,34 @@ -from flask import Flask -from app.database import db - +import contextlib import logging -from secrets import token_hex +from distutils.util import strtobool from os import getenv +from secrets import token_hex + +from flask import Flask + +from app.database import db -def create_app(): +def create_app() -> Flask: app = Flask(__name__) - app.secret_key = token_hex() - app.debug = getenv("DEBUG", False) in ["1", "True", "TRUE"] app.logger.setLevel(logging.DEBUG) + app.secret_key = token_hex() + + app.debug = False + with contextlib.suppress(ValueError): + app.debug = strtobool(getenv("DEBUG", "0")) app.config["SQLALCHEMY_DATABASE_URI"] = getenv("DATABASE_URI") app.config["SQLALCHEMY_TRACK_MODIFICATIONS"] = False - app.config["ENABLE_RECAPTCHA"] = getenv("ENABLE_RECAPTCHA", False) + app.config["ENABLE_RECAPTCHA"] = False + with contextlib.suppress(ValueError): + app.config["ENABLE_RECAPTCHA"] = strtobool( + getenv("ENABLE_RECAPTCHA", "0") + ) app.config["RECAPTCHA_SITE_KEY"] = getenv("RECAPTCHA_SITE_KEY", "") - app.config["RECAPTCHA_SECRET_KEY"] = getenv("RECAPTCHA_SECRET_KEY", "") + app.config["RECAPTCHA_SECRET_KEY"] = getenv("RECAPTCHA_SECRET_KEY", "") db.init_app(app) with app.app_context(): From 5fe5659add678039e0d50d0fd6c483ee33ff4462 Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 17:02:34 +0200 Subject: [PATCH 04/13] update: make app/auth.py compliant to linter --- app/auth.py | 26 ++++++++++++++++---------- 1 file changed, 16 insertions(+), 10 deletions(-) diff --git a/app/auth.py b/app/auth.py index ed0a102..7ef584d 100644 --- a/app/auth.py +++ b/app/auth.py @@ -1,23 +1,29 @@ -from flask import session, redirect, url_for - +import typing from functools import wraps +import flask +from flask import redirect, session, url_for + + +P = typing.ParamSpec("P") +R = typing.TypeVar("R", bound=flask.Response) -def login_required(f): + +def login_required(f: typing.Callable[P, R]) -> typing.Callable[P, R]: @wraps(f) - def wrap(*args, **kwargs): + def wrap(*args: P.args, **kwargs: P.kwargs) -> R: if session and session["verified"]: return f(*args, **kwargs) - else: - return redirect(url_for("login")) + return redirect(url_for("login")) + return wrap -def admin_required(f): +def admin_required(f: typing.Callable[P, R]) -> typing.Callable[P, R]: @wraps(f) - def wrap(*args, **kwargs): + def wrap(*args: P.args, **kwargs: P.kwargs) -> R: if session and session["admin"]: return f(*args, **kwargs) - else: - return redirect(url_for("index")) + return redirect(url_for("index")) + return wrap From a4e65b37f25f863a4b39bfcec7ea651a79190891 Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 17:09:26 +0200 Subject: [PATCH 05/13] update: make app/config.py compliant to linter --- app/config.py | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/app/config.py b/app/config.py index 6d11f93..955dc37 100644 --- a/app/config.py +++ b/app/config.py @@ -1,21 +1,25 @@ #!/usr/bin/env python3 -from os import getenv +from distutils.util import strtobool from json import load +from os import getenv +from pathlib import Path from docker import DockerClient -if getenv("DEBUG") and getenv("DEBUG").strip().upper() in ['1', 'TRUE']: - DEBUG = True -else: +try: + DEBUG = strtobool(getenv("DEBUG", "0")) +except ValueError: DEBUG = False -if getenv("ADMIN_ONLY") and getenv("ADMIN_ONLY").strip().upper() in ['1', 'TRUE']: - ADMIN_ONLY = True -else: + +try: + ADMIN_ONLY = strtobool(getenv("ADMIN_ONLY", "0")) +except ValueError: ADMIN_ONLY = False -with open("config.json", "r") as config_file: + +with Path("config.json").open() as config_file: config = load(config_file) WEBSITE_TITLE = config["website_title"] @@ -27,7 +31,6 @@ MIN_PORTS = config["random_ports"]["min"] MAX_PORTS = config["random_ports"]["max"] - CHALLENGES = config["challenges"] DOCKER_HOSTS = config["hosts"] From 26fed7354977ccb95589701e9d1725494041bf21 Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 17:10:13 +0200 Subject: [PATCH 06/13] update: make app/database.py compliant to linter --- app/database.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/app/database.py b/app/database.py index 2e1eeb6..f606adc 100644 --- a/app/database.py +++ b/app/database.py @@ -1,3 +1,4 @@ from flask_sqlalchemy import SQLAlchemy -db = SQLAlchemy() \ No newline at end of file + +db = SQLAlchemy() From c474f545bc0b5922f156830acaaafcafb187802e Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 17:13:07 +0200 Subject: [PATCH 07/13] update: make app/models.py compliant to linter --- app/models.py | 35 +++++++++++++++++++++-------------- 1 file changed, 21 insertions(+), 14 deletions(-) diff --git a/app/models.py b/app/models.py index 139afab..473c63d 100644 --- a/app/models.py +++ b/app/models.py @@ -1,19 +1,21 @@ -from app.database import db +import typing from datetime import datetime +from app.database import db + class Instances(db.Model): """ - id (int) : Primary key. - user_id (int) : CTFd User ID. - user_name (str) : CTFd User name. - team_id (int) : CTFd Team ID. - team_name (str) : CTFd Team name. - docker_image (str) : Docker image deployed by the user. - ports (str) : Port mapped for the docker instance. - instance_name (str) : Random name for the instance. - docker_client_id (int) : Challenges hosts ID. - creation_date (date) : Date of instance creation. + id (int) : Primary key. + user_id (int) : CTFd User ID. + user_name (str) : CTFd User name. + team_id (int) : CTFd Team ID. + team_name (str) : CTFd Team name. + docker_image (str) : Docker image deployed by the user. + ports (str) : Port mapped for the docker instance. + instance_name (str) : Random name for the instance. + docker_client_id (int) : Challenges hosts ID. + creation_date (date) : Date of instance creation. """ id = db.Column(db.Integer, primary_key=True) @@ -32,7 +34,12 @@ class Instances(db.Model): host_domain = db.Column(db.String(128), unique=False, nullable=False) ports = db.Column(db.String(256), unique=False, nullable=True) - creation_date = db.Column(db.DateTime, nullable=False, default=datetime.utcnow) + creation_date = db.Column( + db.DateTime, nullable=False, default=datetime.utcnow + ) - def __repr__(self): - return f"[{self.id}] {self.docker_image} on port {self.port}, created at {self.creation_date}." + def __repr__(self: typing.Self) -> str: + return ( + f"[{self.id}] {self.docker_image} on port {self.port}, " + f"created at {self.creation_date}." + ) From 773976683adec97b668fb69441f9f6e17f04a5d7 Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 18:34:23 +0200 Subject: [PATCH 08/13] update: make app.py compliant to linter --- app.py | 292 ++++++++++++++++++++++++++++--------------------- app/types.py | 11 ++ pyproject.toml | 2 + 3 files changed, 182 insertions(+), 123 deletions(-) create mode 100644 app/types.py diff --git a/app.py b/app.py index c6d586f..2fd33c3 100644 --- a/app.py +++ b/app.py @@ -1,91 +1,102 @@ #!/usr/bin/env python3 -from os import getenv -from datetime import datetime, timedelta +import datetime +import typing +import flask from flask import ( current_app, + flash, + jsonify, + redirect, render_template, - session, request, - redirect, + session, url_for, - jsonify, - flash ) from flask_recaptcha import ReCaptcha +from app.app import create_app +from app.auth import admin_required, login_required from app.config import ( - WEBSITE_TITLE, - CTFD_URL, + ADMIN_ONLY, CHALLENGES, + CTFD_URL, MAX_INSTANCE_COUNT, MAX_INSTANCE_DURATION, MAX_INSTANCE_PER_TEAM, - ADMIN_ONLY + WEBSITE_TITLE, ) +from app.models import Instances from app.utils import ( - get_total_instance_count, - get_challenge_count_per_team, - check_challenge_name, check_access_key, + check_challenge_name, + create_instances, + get_challenge_count_per_team, get_challenge_info, + get_total_instance_count, remove_all_instances, remove_container_by_id, - create_instances, + remove_old_instances, remove_user_running_instance, - remove_old_instances ) -from app.auth import login_required, admin_required -from app.models import Instances -from app.app import create_app app = create_app() recaptcha = ReCaptcha(app) -recaptcha.theme = 'dark' +recaptcha.theme = "dark" -def render(template, **kwargs): - """ - Shortcut for the render_template flask function. - """ - return render_template(template, title=WEBSITE_TITLE, ctfd_url=CTFD_URL, - max_instance_duration=MAX_INSTANCE_DURATION, challenges_option=CHALLENGES, - instances_count=get_total_instance_count(), **kwargs) +def render(template: str, **kwargs: typing.Any) -> str: + """Shortcut for the render_template flask function.""" + return render_template( + template, + title=WEBSITE_TITLE, + ctfd_url=CTFD_URL, + max_instance_duration=MAX_INSTANCE_DURATION, + challenges_option=CHALLENGES, + instances_count=get_total_instance_count(), + **kwargs, + ) -@app.route("/admin", methods=['GET']) +@app.route("/admin", methods=["GET"]) @admin_required -def admin(): - """ - Admin dashboard with all instances. - """ +def admin() -> str: + """Admin dashboard with all instances.""" return render("admin.html") -@app.route("/login", methods=['GET', 'POST']) -def login(): - """ - Handle login process and form. - """ +@app.route("/login", methods=["GET", "POST"]) +def login() -> str | flask.Response: # noqa: PLR0911 + """Handle login process and form.""" + template_name = "login.html" + if session and session["verified"]: return redirect(url_for("index")) if request.method == "GET": - return render("login.html") + return render(template_name) if request.method == "POST": - access_key = request.form['access_key'] if 'access_key' in request.form else None + access_key = request.form.get("access_key") if not access_key: - return render('login.html', error=True, message="Please provide an access key.") + return render( + template_name, + error=True, + message="Please provide an access key.", + ) success, message, user = check_access_key(access_key) if not success: - return render('login.html', error=True, message=message) + return render(template_name, error=True, message=message) if ADMIN_ONLY and not user["is_admin"]: - return render('login.html', error=True, message="You need to be an administrator to login.") + return render( + template_name, + error=True, + message="You need to be an administrator to login.", + ) session["verified"] = True session["user_id"] = user["user_id"] @@ -98,14 +109,14 @@ def login(): return redirect(url_for("login")) -@app.route("/", methods=['GET']) +@app.route("/", methods=["GET"]) @login_required -def index(): +def index() -> str: """ - Display running instances of your team and allows you to submit new instances. + Display running instances of your team and allows you to submit new + instances. """ instances = Instances.query.filter_by(team_id=session["team_id"]).all() - if instances: challenges_info = {} @@ -113,85 +124,101 @@ def index(): for instance in instances: if instance.network_name not in challenges_info: challenges_info[instance.network_name] = [] - - remaining = timedelta(minutes=MAX_INSTANCE_DURATION) - (datetime.utcnow() - instance.creation_date) - if remaining > timedelta(seconds=0): - remaining = f"{remaining.seconds // 60:02d}m{remaining.seconds % 60:02d}s" + + remaining = datetime.timedelta(minutes=MAX_INSTANCE_DURATION) - ( + datetime.datetime.now(datetime.UTC) - instance.creation_date + ) + if remaining > datetime.timedelta(seconds=0): + remaining = ( + f"{remaining.seconds // 60:02d}m" + f"{remaining.seconds % 60:02d}s" + ) else: remaining = "This instance will be deleted shortly..." - challenges_info[instance.network_name].append({ - "name": instance.challenge_name, - "host": instance.host_domain, - "hostname": instance.hostname, - "ip_address": instance.ip_address, - "ports": instance.ports, - "user_name": instance.user_name, - "time_remaining": remaining - }) - - return render('index.html', challenges=CHALLENGES, captcha=recaptcha, challenges_info=challenges_info) - return render('index.html', challenges=CHALLENGES, captcha=recaptcha) - - -@app.route("/container/all", methods=['GET']) + challenges_info[instance.network_name].append( + { + "name": instance.challenge_name, + "host": instance.host_domain, + "hostname": instance.hostname, + "ip_address": instance.ip_address, + "ports": instance.ports, + "user_name": instance.user_name, + "time_remaining": remaining, + } + ) + + return render( + "index.html", + challenges=CHALLENGES, + captcha=recaptcha, + challenges_info=challenges_info, + ) + return render("index.html", challenges=CHALLENGES, captcha=recaptcha) + + +@app.route("/container/all", methods=["GET"]) @admin_required -def get_all_containers(): - """ - Admin restricted function to retrieve all containers. - """ - return jsonify({'success': True, 'data': - [{ - "id": instance.id, - "team": instance.team_name, - "username": instance.user_name, - "image": instance.docker_image, - "ports": instance.ports, - "instance_name": instance.instance_name, - "date": instance.creation_date - } for instance in Instances.query.all()] - }) - - -@app.route("/container/all", methods=['DELETE']) +def get_all_containers() -> flask.Response: + """Admin restricted function to retrieve all containers.""" + return jsonify( + { + "success": True, + "data": [ + { + "id": instance.id, + "team": instance.team_name, + "username": instance.user_name, + "image": instance.docker_image, + "ports": instance.ports, + "instance_name": instance.instance_name, + "date": instance.creation_date, + } + for instance in Instances.query.all() + ], + } + ) + + +@app.route("/container/all", methods=["DELETE"]) @admin_required -def remove_containers(): - """ - Admin restricted function to remove all containers. - """ +def remove_containers() -> flask.Response: + """Admin restricted function to remove all containers.""" remove_all_instances() - return jsonify({'success': True, 'message': 'Instances removed successfully.'}) + return jsonify( + {"success": True, "message": "Instances removed successfully."} + ) -@app.route("/container/", methods=['DELETE']) +@app.route("/container/", methods=["DELETE"]) @admin_required -def remove_container(container_id=None): - """ - Admin restricted function to remove a container with its ID. - """ +def remove_container(container_id: int) -> flask.Response: + """Admin restricted function to remove a container with its ID.""" remove_container_by_id(container_id) - return jsonify({'success': True, 'message': 'Instances removed successfully.'}) + return jsonify( + {"success": True, "message": "Instances removed successfully."} + ) -@app.route("/remove/me", methods=['GET']) +@app.route("/remove/me", methods=["GET"]) @login_required -def remove_me(): - """ - Allow a user to remove their current instance. - """ +def remove_me() -> flask.Response: + """Allow a user to remove their current instance.""" if remove_user_running_instance(session["user_id"]): - return jsonify({'success': True, 'message': 'Instance removed successfully.'}) + return jsonify( + {"success": True, "message": "Instance removed successfully."} + ) - return jsonify({'success': False, 'message': 'Unable to find an instance to remove.'}) + return jsonify( + {"success": False, "message": "Unable to find an instance to remove."} + ) -@app.route("/logout", methods=['GET']) -def logout(): - """ - Logout the user. - """ +@app.route("/logout", methods=["GET"]) +def logout() -> flask.Response: + """Logout the user.""" keys = list(session.keys()) for key in keys: session.pop(key, None) @@ -199,48 +226,67 @@ def logout(): return redirect(url_for("login")) -@app.route("/run_instance", methods=['POST']) +@app.route("/run_instance", methods=["POST"]) @login_required -def run_instance(): - """ - Allow a user to create a new instance. - """ +def run_instance() -> flask.Response: + """Allow a user to create a new instance.""" challenge_name = request.form.get("challenge_name", None) # Disable captcha on debug mode if not current_app.config["ENABLE_RECAPTCHA"] and not recaptcha.verify(): flash("Captcha failed.", "red") - return redirect(url_for('index')) + return redirect(url_for("index")) if not challenge_name or challenge_name.strip() == "": flash("Please provide a challenge name.", "red") - return redirect(url_for('index')) + return redirect(url_for("index")) remove_old_instances() if not check_challenge_name(challenge_name): flash("The challenge name is not valid.", "red") - return redirect(url_for('index')) + return redirect(url_for("index")) - if get_challenge_count_per_team(session["team_id"]) >= MAX_INSTANCE_PER_TEAM: - flash(f"Your team has reached the maximum number of concurrent running instances ({MAX_INSTANCE_PER_TEAM}).", "red") - return redirect(url_for('index')) + if ( + get_challenge_count_per_team(session["team_id"]) + >= MAX_INSTANCE_PER_TEAM + ): + flash( + ( + "Your team has reached the maximum number of concurrent " + f"running instances ({MAX_INSTANCE_PER_TEAM})." + ), + "red", + ) + return redirect(url_for("index")) remove_user_running_instance(session["user_id"]) - + if get_total_instance_count() > MAX_INSTANCE_COUNT: - flash(f"The maximum number of dynamic instances has been reached (max: {MAX_INSTANCE_COUNT}).", "red") - return redirect(url_for('index')) - + flash( + ( + "The maximum number of dynamic instances has been reached " + f"(max: {MAX_INSTANCE_COUNT})." + ), + "red", + ) + return redirect(url_for("index")) + challenge_info = get_challenge_info(challenge_name) nb_container = create_instances(session, challenge_info) if nb_container > 1: - flash(f"{nb_container} containers are starting for {challenge_name}...", "green") + flash( + f"{nb_container} containers are starting for {challenge_name}...", + "green", + ) else: - flash(f"{nb_container} container is starting for {challenge_name}...", "green") - return redirect(url_for('index')) + flash( + f"{nb_container} container is starting for {challenge_name}...", + "green", + ) + return redirect(url_for("index")) if __name__ == "__main__": - app.run(host='0.0.0.0', port=5000) + app.run(host="0.0.0.0", port=5000) # noqa: S104 diff --git a/app/types.py b/app/types.py new file mode 100644 index 0000000..24bdecc --- /dev/null +++ b/app/types.py @@ -0,0 +1,11 @@ +import typing + + +class UserType(typing.TypedDict): + user_id: int | None + username: str | None + + team_id: int | None + team_name: str | None + + is_admin: bool diff --git a/pyproject.toml b/pyproject.toml index adb3da5..2d5b0d2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -48,12 +48,14 @@ ignore = [ "D205", # 1 blank line required between summary line and description "D212", # multi-line-summary-second-line "D401", # First line of docstring should be in imperative mood: "X" + "S311", # Standard pseudo-random generators are not suitable for cryptographic purposes "PLR2004", # Magic value used in comparison, consider replacing X with a constant variable "ANN401", # Dynamically typed expressions (typing.Any) are disallowed in `X` "RUF012", # Mutable class attributes should be annotated with `typing.ClassVar` "FIX002", # Line contains TODO, consider resolving the issue "TD002", # Missing author in TODO "TD003", # Missing issue link on the line following this TODO + "TRY401", # Redundant exception object included in `logging.exception` call ] select = [ "F", # Pyflakes From 5ced277eb8f03e9c9aa1229f91a0efae64a4cf88 Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 19:00:10 +0200 Subject: [PATCH 09/13] update: make app/utils.py compliant to linter --- .gitignore | 1 + app/config.py | 6 +- app/types.py | 35 ++++++ app/utils.py | 317 ++++++++++++++++++++++++++++++------------------- pyproject.toml | 5 +- 5 files changed, 238 insertions(+), 126 deletions(-) diff --git a/.gitignore b/.gitignore index 19d4128..8058323 100644 --- a/.gitignore +++ b/.gitignore @@ -105,3 +105,4 @@ venv.bak/ # mypy .mypy_cache/ +.idea diff --git a/app/config.py b/app/config.py index 955dc37..f9b8885 100644 --- a/app/config.py +++ b/app/config.py @@ -6,6 +6,8 @@ from docker import DockerClient +from app.types import ConfigChallengeType, ConfigDockerHostType + try: DEBUG = strtobool(getenv("DEBUG", "0")) @@ -31,8 +33,8 @@ MIN_PORTS = config["random_ports"]["min"] MAX_PORTS = config["random_ports"]["max"] - CHALLENGES = config["challenges"] - DOCKER_HOSTS = config["hosts"] + CHALLENGES: list[ConfigChallengeType] = config["challenges"] + DOCKER_HOSTS: list[ConfigDockerHostType] = config["hosts"] for host in DOCKER_HOSTS: host["client"] = DockerClient(base_url=host["api"]) diff --git a/app/types.py b/app/types.py index 24bdecc..616c92c 100644 --- a/app/types.py +++ b/app/types.py @@ -1,5 +1,7 @@ import typing +from docker import DockerClient + class UserType(typing.TypedDict): user_id: int | None @@ -9,3 +11,36 @@ class UserType(typing.TypedDict): team_name: str | None is_admin: bool + + +class ConfigDockerHostType(typing.TypedDict): + domain: str + api: str + client: DockerClient + + +class ContainerPortType(typing.TypedDict): + port: str + protocol: typing.Literal["http", "ssh"] + + +class ChallengeContainerType(typing.TypedDict): + docker_image: str + + hostname: str | None + ports: list[ContainerPortType] + + command: str | None + environment: dict[str, typing.Any] | None + privileged: bool | None + + mem_limit: str + read_only: bool + + cpu_period: int | None + cpu_quota: int | None + + +class ConfigChallengeType(typing.TypedDict): + name: str + containers: list[ChallengeContainerType] diff --git a/app/utils.py b/app/utils.py index 959eda1..17618c5 100644 --- a/app/utils.py +++ b/app/utils.py @@ -1,100 +1,130 @@ #!/usr/bin/env python3 -from flask import current_app -import requests -from docker.errors import ImageNotFound, NotFound, APIError - +import datetime +import random import re import secrets -import random -from datetime import datetime, timedelta +import typing + +import flask +import requests +from docker.errors import APIError, ImageNotFound, NotFound +from docker.models.containers import Container +from flask import current_app from app.config import ( - DOCKER_HOSTS, CHALLENGES, CTFD_URL, + DOCKER_HOSTS, + MAX_INSTANCE_DURATION, MAX_PORTS, MIN_PORTS, - MAX_INSTANCE_DURATION ) from app.database import db from app.models import Instances +from app.types import ConfigChallengeType, ConfigDockerHostType, UserType -def remove_old_instances(): - """ - Remove old instances (creation_date > X minutes). - """ - instances = db.session.query(Instances).filter( - Instances.creation_date < datetime.utcnow() - timedelta(minutes=MAX_INSTANCE_DURATION)).all() +if typing.TYPE_CHECKING: + import docker + + +REQUESTS_TIMEOUT = 60 + + +def remove_old_instances() -> None: + """Remove old instances (creation_date > X minutes).""" + instances = ( + db.session.query(Instances) + .filter( + Instances.creation_date + < ( + datetime.datetime.now(datetime.UTC) + - datetime.timedelta(minutes=MAX_INSTANCE_DURATION) + ) + ) + .all() + ) # TODO: Remove all instances of a challenge. for instance in instances: remove_container_by_id(instance.id) -def remove_user_running_instance(user_id): - """ - Remove instance if the user has already run an instance. - """ +def remove_user_running_instance(user_id: int) -> int: + """Remove instance if the user has already run an instance.""" instances = Instances.query.filter_by(user_id=user_id).all() - + for instance in instances: - current_app.logger.debug("User n°%d is removing '%s'...", user_id, instance.instance_name) + current_app.logger.debug( + "User n°%d is removing '%s'...", user_id, instance.instance_name + ) remove_container_by_id(instance.id) - + return len(instances) > 0 -def find_ip_address(container): - """ - Find IP Adress of a running container. - """ +def find_ip_address(container: Container) -> str: + """Find IP Address of a running container.""" ret = container.exec_run("hostname -i") if ret.exit_code == 0: return ret.output.decode().strip() - ret = container.exec_run('cat /etc/hosts') + ret = container.exec_run("cat /etc/hosts") if ret.exit_code == 0: return ret.output.split()[-2].decode() return "UNKNOWN" -def create_instances(session, challenge_info): - """ - Create new instances. - """ +def create_instances( + session: flask.session, + challenge_info: ConfigChallengeType +) -> int: + """Create new instances.""" # Generate deploy environment deploy_config = { "network_name": secrets.token_hex(16), "host": random.choice(DOCKER_HOSTS), - "containers": [] + "containers": [], } - worker = deploy_config["host"]["client"] + worker: docker.DockerClient = deploy_config["host"]["client"] worker.networks.create(deploy_config["network_name"], driver="bridge") - current_app.logger.debug("Starting deployment '%s' for challenge '%s'.", deploy_config["network_name"], challenge_info["name"]) + current_app.logger.debug( + "Starting deployment '%s' for challenge '%s'.", + deploy_config["network_name"], + challenge_info["name"], + ) - # Generate containers environment + # Generate containers environment for container in challenge_info["containers"]: instance_name = secrets.token_hex(16) - deploy_config["containers"].append({ - "docker_image": container["docker_image"], - "command": container.get("command", None), - "hostname": container.get("hostname", instance_name), - "instance_name": instance_name, - "ports": { - pinfo["port"]: find_unused_port(deploy_config["host"]) for pinfo in container["ports"] - }, - "protocols": [pinfo["protocol"] for pinfo in container["ports"]], - "environment": container.get("environment", {}), - "mem_limit": container.get("mem_limit", "512m"), - "privileged": container.get("privileged", False), - "read_only": container.get("read_only", False), - "cpu_period": container.get("cpu_period", None), - "cpu_quota": container.get("cpu_quota", None) - }) + deploy_config["containers"].append( + { + "docker_image": container["docker_image"], + "command": container.get("command", None), + "hostname": container.get("hostname", instance_name), + "instance_name": instance_name, + "ports": { + pinfo["port"]: find_unused_port(deploy_config["host"]) + for pinfo in container["ports"] + }, + "protocols": [ + pinfo["protocol"] for pinfo in container["ports"] + ], + "environment": container.get("environment", {}), + "mem_limit": container.get("mem_limit", "512m"), + "privileged": container.get("privileged", False), + "read_only": container.get("read_only", False), + "cpu_period": container.get("cpu_period", None), + "cpu_quota": container.get("cpu_quota", None), + } + ) - current_app.logger.debug("Environment for deployment '%s': %s", deploy_config["network_name"], deploy_config) + current_app.logger.debug( + "Environment for deployment '%s': %s", + deploy_config["network_name"], + deploy_config, + ) # Save instances in DB and run containers for container in deploy_config["containers"]: @@ -107,13 +137,20 @@ def create_instances(session, challenge_info): challenge_name=challenge_info["name"], network_name=deploy_config["network_name"], hostname=container["hostname"], - ports=", ".join(f"{port}/{proto}" for port, proto in zip(container["ports"].values(), container["protocols"])), + ports=", ".join( + f"{port}/{proto}" + for port, proto in zip( + container["ports"].values(), + container["protocols"], + strict=False, + ) + ), host_domain=deploy_config["host"]["domain"], - instance_name=container["instance_name"] + instance_name=container["instance_name"], ) try: - container = worker.containers.run( + created_container = worker.containers.run( image=container["docker_image"], command=container["command"], hostname=container["hostname"], @@ -126,91 +163,97 @@ def create_instances(session, challenge_info): mem_limit=container["mem_limit"], privileged=container["privileged"], read_only=container["read_only"], - cpu_period=container["cpu_period"], - cpu_quota=container["cpu_quota"] + cpu_period=container["cpu_period"], + cpu_quota=container["cpu_quota"], ) - instance.ip_address = find_ip_address(container) + instance.ip_address = find_ip_address(created_container) except ImageNotFound as err: - current_app.logger.error("ImageNotFound: Unable to find %s, %s", docker_image, err) - return "ERROR", [] + current_app.logger.exception( + "ImageNotFound: Unable to find %s, %s", + container["docker_image"], + err, + ) + return -1 db.session.add(instance) db.session.commit() return len(challenge_info["containers"]) -def get_challenge_count_per_team(team_id): - """ - Returns the number of challenges running for a specific team. - """ - return Instances.query.filter_by(team_id=team_id).distinct(Instances.network_name).count() -def find_unused_port(docker_host): +def get_challenge_count_per_team(team_id: int) -> int: + """Returns the number of challenges running for a specific team.""" + return ( + Instances.query.filter_by(team_id=team_id) + .distinct(Instances.network_name) + .count() + ) + + +def find_unused_port(docker_host: ConfigDockerHostType) -> int: """ - Find a port that is not used by any instances (on a specific challenge host). + Find a port that is not used by any instances + (on a specific challenge host). """ containers = docker_host["client"].containers.list(all=True) found = False + rand_port = random.randint(MIN_PORTS, MAX_PORTS + 1) + while not found: found = True - rand_port = random.randint(MIN_PORTS, MAX_PORTS + 1) for container in containers: if rand_port in container.ports.values(): found = False - if found: - return rand_port + rand_port = random.randint(MIN_PORTS, MAX_PORTS + 1) + return rand_port -def get_total_instance_count(): - """ - Returns the number of challenges instance running. - """ + +def get_total_instance_count() -> int: + """Returns the number of challenges instance running.""" return Instances.query.count() -def get_challenge_info(challenge_name): - """ - Returns challenge information with a challenge_name as parameter. - """ +def get_challenge_info(challenge_name: str) -> ConfigChallengeType | None: + """Returns challenge information with a challenge_name as parameter.""" for challenge in CHALLENGES: - if challenge['name'] == challenge_name: + if challenge["name"] == challenge_name: return challenge - return False + return None -def check_challenge_name(challenge_name): - """ - Returns True if the challenge_name is valid, else False. - """ - for challenge in CHALLENGES: - if challenge['name'] == challenge_name: - return True - return False +def check_challenge_name(challenge_name: str) -> bool: + """Returns True if the challenge_name is valid, else False.""" + return any(challenge["name"] == challenge_name for challenge in CHALLENGES) -def check_access_key(key): - """ - Returns the user_id, user_name, team_id, team_name and is_admin. - """ - user = { +def check_access_key(key: str) -> tuple[bool, str, UserType]: + """Returns the user_id, user_name, team_id, team_name and is_admin.""" + user: UserType = { "user_id": None, "username": None, "team_id": None, "team_name": None, - "is_admin": False + "is_admin": False, } - - pattern = r'^ctfd_[a-zA-Z0-9]+$' + + pattern = r"^ctfd_[a-zA-Z0-9]+$" if not re.match(pattern, key): return False, "Invalid access key, wrong format!", user - base_url = CTFD_URL.strip('/') + base_url = CTFD_URL.strip("/") try: - resp_json = requests.get(f"{base_url}/api/v1/users/me", - headers={"Authorization":f"Token {key}", "Content-Type":"application/json"}).json() + resp_json = requests.get( + f"{base_url}/api/v1/users/me", + headers={ + "Authorization": f"Token {key}", + "Content-Type": "application/json", + }, + timeout=REQUESTS_TIMEOUT, + ).json() success = resp_json.get("success", False) user["user_id"] = resp_json.get("data", {}).get("id", "") @@ -221,31 +264,43 @@ def check_access_key(key): if not success or not user["team_id"]: return False, "User not in a team or invalid token.", user - resp_json = requests.get(f"{base_url}/api/v1/teams/{user['team_id']}", - headers={"Authorization":f"Token {key}", "Content-Type":"application/json"}).json() + resp_json = requests.get( + f"{base_url}/api/v1/teams/{user['team_id']}", + headers={ + "Authorization": f"Token {key}", + "Content-Type": "application/json", + }, + timeout=REQUESTS_TIMEOUT, + ).json() user["team_name"] = resp_json.get("data", {}).get("name", "") - resp_json = requests.get(f"{base_url}/api/v1/configs", headers={"Authorization":f"Token {key}", "Content-Type":"application/json"}).json() + resp_json = requests.get( + f"{base_url}/api/v1/configs", + headers={ + "Authorization": f"Token {key}", + "Content-Type": "application/json", + }, + timeout=REQUESTS_TIMEOUT, + ).json() user["is_admin"] = resp_json.get("success", False) - return True, "", user + return True, "", user # noqa: TRY300 except Exception as err: - current_app.logger.error("Unable to reach CTFd with access key: %s", key) - current_app.logger.error("Error: %s", str(err)) + current_app.logger.exception( + "Unable to reach CTFd with access key: %s", key + ) + current_app.logger.exception("Error: %s", str(err)) - return False, "An error has occured.", user + return False, "An error has occurred.", user -def remove_all_instances(): - """ - Remove all running containers. - """ + +def remove_all_instances() -> None: + """Remove all running containers.""" for instance in Instances.query.all(): remove_container_by_id(instance.id) -def remove_container_by_name(host_domain, name): - """ - Remove running container using its random name. - """ +def remove_container_by_name(host_domain: str, name: str) -> None: + """Remove running container using its random name.""" for docker_host in DOCKER_HOSTS: if host_domain in docker_host["domain"]: client = docker_host["client"] @@ -256,22 +311,40 @@ def remove_container_by_name(host_domain, name): try: containers[0].remove(force=True) - network_name = list(containers[0].attrs['NetworkSettings']['Networks'].keys())[0] - networks = client.networks.list(filters={"name": network_name}) + network_name = next( + containers[0] + .attrs["NetworkSettings"]["Networks"] + .keys() + ) + networks = client.networks.list( + filters={"name": network_name} + ) networks[0].remove() except NotFound as err: - current_app.logger.warning("Unable to find the container to remove (name: '%s'): %s", name, err) + current_app.logger.warning( + "Unable to find the container to remove (name: '%s'): " + "%s", + name, + err, + ) except KeyError as err: - current_app.logger.warning("Unable to find the network to remove (name: '%s'): %s", network_name, err) + current_app.logger.warning( + "Unable to find the network to remove (name: '%s'): " + "%s", + network_name, + err, + ) except APIError as err: - current_app.logger.warning("Unable to remove the network (name: '%s'): %s", network_name, err) + current_app.logger.warning( + "Unable to remove the network (name: '%s'): %s", + network_name, + err, + ) return -def remove_container_by_id(instance_id): - """ - Remove running container using its instance ID. - """ +def remove_container_by_id(instance_id: int) -> None: + """Remove running container using its instance ID.""" instance = Instances.query.filter_by(id=instance_id).first() if instance: remove_container_by_name(instance.host_domain, instance.instance_name) diff --git a/pyproject.toml b/pyproject.toml index 2d5b0d2..0ae9398 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -111,10 +111,11 @@ include = '\.pyi?$' [tool.pyright] -include = ["app"] +include = ["app", "app.py"] exclude = [ "docs", - "nginx" + "nginx", + "venv" ] venvPath = "./" venv = "venv" From ea56a3c6edcfcf156d2a78f927c169cfcce304f5 Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 21:26:32 +0200 Subject: [PATCH 10/13] fix: revert breaking change on 5ced277eb8f03e9c9aa1229f91a0efae64a4cf88 --- app.py | 7 +++++-- app/utils.py | 6 +++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/app.py b/app.py index 2fd33c3..c7adde6 100644 --- a/app.py +++ b/app.py @@ -126,7 +126,10 @@ def index() -> str: challenges_info[instance.network_name] = [] remaining = datetime.timedelta(minutes=MAX_INSTANCE_DURATION) - ( - datetime.datetime.now(datetime.UTC) - instance.creation_date + + datetime.datetime.now(datetime.UTC) + - instance.creation_date.replace(tzinfo=datetime.UTC) + ) if remaining > datetime.timedelta(seconds=0): remaining = ( @@ -233,7 +236,7 @@ def run_instance() -> flask.Response: challenge_name = request.form.get("challenge_name", None) # Disable captcha on debug mode - if not current_app.config["ENABLE_RECAPTCHA"] and not recaptcha.verify(): + if current_app.config["ENABLE_RECAPTCHA"] and not recaptcha.verify(): flash("Captcha failed.", "red") return redirect(url_for("index")) diff --git a/app/utils.py b/app/utils.py index 17618c5..b81ce1f 100644 --- a/app/utils.py +++ b/app/utils.py @@ -312,9 +312,9 @@ def remove_container_by_name(host_domain: str, name: str) -> None: containers[0].remove(force=True) network_name = next( - containers[0] - .attrs["NetworkSettings"]["Networks"] - .keys() + iter( + containers[0].attrs["NetworkSettings"]["Networks"] + ) ) networks = client.networks.list( filters={"name": network_name} From 1bb6825df9d2dc84e083f725bd7600228bfbc166 Mon Sep 17 00:00:00 2001 From: "Romain J." Date: Sat, 10 Aug 2024 22:14:36 +0200 Subject: [PATCH 11/13] feat: add connection string shortcut for challs links --- README.md | 4 ++-- app/templates/index.html | 35 ++++++++++++++++++++++++++++++++++- 2 files changed, 36 insertions(+), 3 deletions(-) diff --git a/README.md b/README.md index adcb5a7..7699332 100644 --- a/README.md +++ b/README.md @@ -101,10 +101,10 @@ All the slaves must build all docker images present in the `config.json` file (i ## Todo -- [ ] pylint +- [x] pylint - [ ] add more docs about `config.json` format - [ ] Extend instance feature -- [ ] Display connection string (ex: ssh -p ..., http://host:port, nc host port, ...) +- [x] Display connection string (ex: ssh -p ..., http://host:port, nc host port, ...) - [ ] Better admin panel - [ ] Add challenge host to HTML table - [ ] Monitoring on each hosts diff --git a/app/templates/index.html b/app/templates/index.html index 7c4bf4a..e533973 100644 --- a/app/templates/index.html +++ b/app/templates/index.html @@ -1,5 +1,21 @@ {% extends 'base.html' %} +{% macro quickLaunch(host, service) %} + {% set port, proto = service.split('/') %} + + {% if proto in ["http", "https"] %} + {{ proto }}://{{ host }}:{{ port }} + {% elif proto == "ssh" or True %} + + ssh -p {{ port }} user@{{ host }} + + {% else %} + + nc {{ host }} {{ port }} + + {% endif %} +{%- endmacro %} + {% block main %}
@@ -51,7 +67,7 @@

Deploy unique instances for your challenges

Logout
- +

Instances of your team {{ session['team_name'] }}:

@@ -70,6 +86,18 @@

Instances of your team {{ session['team_name'] }}:

Remaining time: {{ container['time_remaining'] }} {% endif %}

Host: {{ container['host'] }}{% if container['ports'] %}, ports: {{ container['ports'] }}{% endif %} ({{ container['hostname'] }})

+ + {% if container['ports'] %} +

Quick launch :

+
    + {% set services = container['ports'].split(', ') %} + {% for service in services %} +
  • + {{ quickLaunch(container['host'], service) }} +
  • + {% endfor %} +
+ {% endif %} {% endfor %}
{% endfor %} @@ -80,6 +108,11 @@

{{ instances_count }} instance{% if instances_count > 1 %}s{% endif %} runni