Skip to content
Merged
2 changes: 2 additions & 0 deletions backend/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@

SECRET_KEY = os.getenv("SECRET_KEY", binascii.hexlify(os.urandom(30)).decode())

HCAPTCHA_API_SECRET = os.getenv("HCAPTCHA_API_SECRET")

QUESTION_TYPES = [
"radio",
"checkbox",
Expand Down
5 changes: 4 additions & 1 deletion backend/models/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
from .antispam import AntiSpam
from .discord_user import DiscordUser
from .form import Form
from .form_response import FormResponse
from .question import Question

__all__ = ["Form", "Question"]
__all__ = ["AntiSpam", "DiscordUser", "Form", "FormResponse", "Question"]
10 changes: 10 additions & 0 deletions backend/models/antispam.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from pydantic import BaseModel


class AntiSpam(BaseModel):
"""Schema model for form response antispam field."""

ip_hash: str
user_agent_hash: str
captcha_pass: bool
dns_blacklisted: bool
24 changes: 24 additions & 0 deletions backend/models/discord_user.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
import typing as t

from pydantic import BaseModel


class DiscordUser(BaseModel):
"""Schema model of Discord user for form response."""

# Discord default fields
id: int # This is actually snowflake, but we simplify it here
username: str
discriminator: str
avatar: t.Optional[str]
bot: t.Optional[bool]
system: t.Optional[bool]
locale: t.Optional[str]
verified: t.Optional[bool]
email: t.Optional[str]
flags: t.Optional[int]
premium_type: t.Optional[int]
public_flags: t.Optional[int]

# Custom fields
admin: bool
19 changes: 19 additions & 0 deletions backend/models/form_response.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import typing as t

from pydantic import BaseModel, Field

from .antispam import AntiSpam
from .discord_user import DiscordUser


class FormResponse(BaseModel):
"""Schema model for form response."""

id: str = Field(alias="_id")
user: t.Optional[DiscordUser]
antispam: t.Optional[AntiSpam]
response: t.Dict[str, t.Any]
form_id: str

class Config:
allow_population_by_field_name = True
5 changes: 5 additions & 0 deletions backend/routes/forms/new.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,10 @@ async def post(self, request: Request) -> JSONResponse:
except ValidationError as e:
return JSONResponse(e.errors())

if await request.state.db.forms.find_one({"_id": form.id}):
return JSONResponse({
"error": "Form with same ID already exists."
}, status_code=400)

await request.state.db.forms.insert_one(form.dict(by_alias=True))
return JSONResponse(form.dict())
95 changes: 71 additions & 24 deletions backend/routes/forms/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,24 @@

import binascii
import hashlib
import uuid

import jwt
import httpx
import pydnsbl
from pydantic import ValidationError
from starlette.requests import Request

from starlette.responses import JSONResponse

from backend.constants import SECRET_KEY
from backend.constants import HCAPTCHA_API_SECRET, FormFeatures
from backend.models import Form, FormResponse
from backend.route import Route

HCAPTCHA_VERIFY_URL = "https://hcaptcha.com/siteverify"
HCAPTCHA_HEADERS = {
"Content-Type": "application/x-www-form-urlencoded"
}


class SubmitForm(Route):
"""
Expand All @@ -28,40 +37,78 @@ async def post(self, request: Request) -> JSONResponse:
if form := await request.state.db.forms.find_one(
{"_id": request.path_params["form_id"], "features": "OPEN"}
):
response_obj = {}
form = Form(**form)
response = data.copy()
response["id"] = str(uuid.uuid4())
response["form_id"] = form.id

if "DISABLE_ANTISPAM" not in form["features"]:
if FormFeatures.DISABLE_ANTISPAM.value not in form.features:
ip_hash_ctx = hashlib.md5()
ip_hash_ctx.update(request.client.host.encode())
ip_hash = binascii.hexlify(ip_hash_ctx.digest())
user_agent_hash_ctx = hashlib.md5()
user_agent_hash_ctx.update(request.headers["User-Agent"].encode())
user_agent_hash = binascii.hexlify(user_agent_hash_ctx.digest())

response_obj["antispam"] = {
"ip": ip_hash.decode()
}
dsn_checker = pydnsbl.DNSBLIpChecker()
dsn_blacklist = await dsn_checker.check_async(request.client.host)

if "REQUIRES_LOGIN" in form["features"]:
if token := data.get("token"):
data = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
response_obj["user"] = {
"user": f"{data['username']}#{data['discriminator']}",
"id": data["id"]
async with httpx.AsyncClient() as client:
query_params = {
"secret": HCAPTCHA_API_SECRET,
"response": data.get("captcha")
}
r = await client.post(
HCAPTCHA_VERIFY_URL,
params=query_params,
headers=HCAPTCHA_HEADERS
)
r.raise_for_status()
captcha_data = r.json()

response["antispam"] = {
"ip_hash": ip_hash.decode(),
"user_agent_hash": user_agent_hash.decode(),
"captcha_pass": captcha_data["success"],
"dns_blacklisted": dsn_blacklist.blacklisted,
}

if FormFeatures.REQUIRES_LOGIN.value in form.features:
if request.user.is_authenticated:
response["user"] = request.user.payload

if "COLLECT_EMAIL" in form["features"]:
if data.get("email"):
response_obj["user"]["email"] = data["email"]
else:
return JSONResponse({
"error": "User data did not include email information"
})
if FormFeatures.COLLECT_EMAIL.value in form.features and "email" not in response["user"]: # noqa
return JSONResponse({
"error": "email_required"
}, status_code=400)
else:
return JSONResponse({
"error": "Missing Discord user data"
})
"error": "missing_discord_data"
}, status_code=400)

missing_fields = []
for question in form.questions:
if question.id not in response["response"]:
missing_fields.append(question.id)

if missing_fields:
return JSONResponse({
"error": "missing_fields",
"fields": missing_fields
}, status_code=400)

try:
response_obj = FormResponse(**response)
except ValidationError as e:
return JSONResponse(e.errors())

await request.state.db.responses.insert_one(
response_obj.dict(by_alias=True)
)

return JSONResponse({
"form": form,
"response": response_obj
"form": form.dict(),
"response": response_obj.dict()
})
else:
return JSONResponse({
Expand Down
Loading