diff --git a/CHANGES/727.feature b/CHANGES/727.feature new file mode 100644 index 00000000..dad3fa6b --- /dev/null +++ b/CHANGES/727.feature @@ -0,0 +1,6 @@ +Added PackagePermissionGuard content guard for fine-grained package-level access control. + +PackagePermissionGuard allows controlling download and upload permissions for individual PyPI packages +within a single distribution/index. Policies map package names to lists of user/group PRNs that are +allowed to access those packages. + diff --git a/docs/admin/guides/rbac.md b/docs/admin/guides/rbac.md index fe48a5e9..e9cbcfc1 100644 --- a/docs/admin/guides/rbac.md +++ b/docs/admin/guides/rbac.md @@ -73,6 +73,88 @@ new RBACContentGuard: !!! warning The PyPI access policies do not support `creation_hooks` or `queryset_scoping`. +## PackagePermissionGuard + +The PackagePermissionGuard provides fine-grained, package-level access control within a single distribution/index. +Unlike RBACContentGuard which applies to all packages in a distribution, PackagePermissionGuard allows you to +control download and upload permissions for individual PyPI packages. + +### Creating a PackagePermissionGuard + +```bash +pulp content-guard package-permission create --name my-package-guard +``` + +### Managing Package Permissions + +PackagePermissionGuard uses two separate policies: +- `download_policy`: Controls who can download specific packages +- `upload_policy`: Controls who can upload specific packages + +Both policies map package names (normalized) to lists of user/group PRNs. + +#### Adding Permissions + +Use the `add` action to grant users/groups access to packages: + +```bash +# Add download permissions for multiple packages +pulp content-guard package-permission add \ + --name my-package-guard \ + --packages shelf-reader django \ + --users-groups prn:core.user:alice prn:core.group:developers \ + --policy-type download + +# Add upload permissions +pulp content-guard package-permission add \ + --name my-package-guard \ + --packages shelf-reader \ + --users-groups prn:core.user:bob \ + --policy-type upload +``` + +#### Removing Permissions + +Use the `remove` action to revoke access: + +```bash +# Remove specific users/groups from a package +pulp content-guard package-permission remove \ + --name my-package-guard \ + --packages shelf-reader \ + --users-groups prn:core.user:alice \ + --policy-type download + +# Remove all permissions for a package (use '*' in users-groups) +pulp content-guard package-permission remove \ + --name my-package-guard \ + --packages django \ + --users-groups '*' \ + --policy-type download + +# Remove all packages from a policy (use '*' in packages) +pulp content-guard package-permission remove \ + --name my-package-guard \ + --packages '*' \ + --users-groups '' \ + --policy-type download +``` + +### How PackagePermissionGuard Works + +- **Downloads**: During downloads the guard's `permit()` method checks the `download_policy` to see + if there is a policy for the package. If no policy the download is permited. If there is one it + then checks that the user of the request is in the policy list for that package. + +- **Uploads**: For uploads, the endpoint's access policy checks the `upload_policy` to see if the + user/group has permission for the package being uploaded. If the package or user is not in the + policy the upload is denied. + +!!! note + For the content guard to properly protect uploads the `legacy` and `simple` AccessPolies must + use the `package_permission_check` condition for their `create` action. This is the current + default with a fallback to `index_has_repo_perm` when there is no content guard on the distro. + ## Index Specific Access Conditions Pulp Python comes with two specific access condition methods that can be used in the PyPI access policies. @@ -89,5 +171,15 @@ the modify python repository permission. This access condition checks if the user has the supplied permission on the index (distribution) itself. If no permission is specified for the method then it will use `python.view_pythondistribution` as its default. +### `package_permission_check` + +This access condition checks PackagePermissionGuard for package-level permissions. It extracts the package name +from the request (from URL path for downloads,from filename for uploads) and checks if the user/group has +permission for that specific package in the guard's policy. This condition is used by default in PyPI upload +endpoints to enable package-level upload control when a PackagePermissionGuard is attached to the distribution. + +- For downloads: Checks `download_policy` and denies only if package in policy and user/group is not. +- For uploads: Checks `upload_policy` and denies access if package not in policy or user/group not allowed + !!! note - Both access condition methods are compatible with the Pulp Domains feature. \ No newline at end of file + All access condition methods are compatible with the Pulp Domains feature. \ No newline at end of file diff --git a/pulp_python/app/global_access_conditions.py b/pulp_python/app/global_access_conditions.py index a2a5ee61..8053ee2e 100644 --- a/pulp_python/app/global_access_conditions.py +++ b/pulp_python/app/global_access_conditions.py @@ -1,4 +1,10 @@ from django.conf import settings +from django.contrib.auth.models import AnonymousUser +from packaging.utils import canonicalize_name +from pathlib import PurePath +from pulp_python.app.models import PackagePermissionGuard +from pypi_simple import parse_filename, UnparsableFilenameError +from pulpcore.plugin.util import get_prn # Access Condition methods that can be used with PyPI access policies @@ -28,3 +34,71 @@ def index_has_repo_perm(request, view, action, perm="python.view_pythonrepositor if repo := view.distribution.repository: return request.user.has_perm(perm, obj=repo.cast()) return True + + +def package_permission_check(request, view, action, policy_type="upload"): + """ + Access Policy condition that checks PackagePermissionGuard for package-level permissions. + + Checks if the user/group has permission for the specific package being accessed. + + Args: + policy_type: "download" or "upload" - which policy to check + """ + if hasattr(view, "content_guard"): + content_guard = view.content_guard + else: + content_guard = view.distribution.content_guard + + # If no guard attached, deny access + if not content_guard or not isinstance(content_guard.cast(), PackagePermissionGuard): + return False + + guard = content_guard.cast() + policy = guard.download_policy if policy_type == "download" else guard.upload_policy + + # Extract package name from request + package_name = None + + if policy_type == "upload": + # For uploads, extract from filename in request.FILES + # The file is uploaded as multipart/form-data with field name 'content' + if hasattr(request, 'FILES') and 'content' in request.FILES: + file_obj = request.FILES['content'] + package_name = canonicalize_name(parse_filename(file_obj.name)[0]) + else: + # For downloads, extract from URL path + if hasattr(view, "kwargs"): + # Check URL kwargs for package name + if 'package' in view.kwargs: + package_name = canonicalize_name(view.kwargs['package']) + elif 'meta' in view.kwargs: + # Metadata endpoint: pypi/{package}/json/ or pypi/{package}/{version}/json/ + meta_path = PurePath(view.kwargs['meta']) + if meta_path.match("*/json") or meta_path.match("*/*/json"): + package_name = canonicalize_name(meta_path.parts[0]) + else: + path = PurePath(request.path_info) + try: + package_name = parse_filename(path.name)[0] + except UnparsableFilenameError: + print(f"No package name found in path: {request.path_info}") + + # Downloads are permissive, only deny if package in policy but user is not listed + # Uploads are strict, only allow if package in policy and user is listed + if package_name not in policy: + return policy_type == "download" + + allowed_prns = policy[package_name] + if not request.user or isinstance(request.user, AnonymousUser): + return False + user_prn = get_prn(request.user) + group_prns = [get_prn(group) for group in request.user.groups.all()] + + if user_prn and user_prn in allowed_prns: + return True + + if any(group_prn in allowed_prns for group_prn in group_prns): + return True + + return False diff --git a/pulp_python/app/migrations/0016_packagepermissionguard.py b/pulp_python/app/migrations/0016_packagepermissionguard.py new file mode 100644 index 00000000..fcd61bdf --- /dev/null +++ b/pulp_python/app/migrations/0016_packagepermissionguard.py @@ -0,0 +1,56 @@ +# Generated by Django 4.2.25 on 2025-10-31 12:50 + +from django.db import migrations, models +import django.db.models.deletion +import pulpcore.app.models.access_policy + + +class Migration(migrations.Migration): + + dependencies = [ + ("core", "0145_domainize_import_export"), + ("python", "0015_alter_pythonpackagecontent_options"), + ] + + operations = [ + migrations.CreateModel( + name="PackagePermissionGuard", + fields=[ + ( + "contentguard_ptr", + models.OneToOneField( + auto_created=True, + on_delete=django.db.models.deletion.CASCADE, + parent_link=True, + primary_key=True, + serialize=False, + to="core.contentguard", + ), + ), + ( + "download_policy", + models.JSONField( + default=dict, + help_text="Dictionary mapping package names to lists of user/group PRNsthat are allowed to download those packages.", + ), + ), + ( + "upload_policy", + models.JSONField( + default=dict, + help_text="Dictionary mapping package names to lists of user/group PRNsthat are allowed to upload those packages.", + ), + ), + ], + options={ + "permissions": [ + ( + "manage_roles_packagepermissionguard", + "Can manage role assignments on package permission guard", + ) + ], + "default_related_name": "%(app_label)s_%(model_name)s", + }, + bases=("core.contentguard", pulpcore.app.models.access_policy.AutoAddObjPermsMixin), + ), + ] diff --git a/pulp_python/app/models.py b/pulp_python/app/models.py index 3bd9d605..618f8dcc 100644 --- a/pulp_python/app/models.py +++ b/pulp_python/app/models.py @@ -1,13 +1,16 @@ from logging import getLogger +from gettext import gettext as _ from aiohttp.web import json_response from django.contrib.postgres.fields import ArrayField from django.core.exceptions import ObjectDoesNotExist from django.db import models from django.conf import settings +from rest_framework.views import APIView from pulpcore.plugin.models import ( AutoAddObjPermsMixin, Content, + ContentGuard, Publication, Distribution, Remote, @@ -20,6 +23,7 @@ artifact_to_python_content_data, canonicalize_name, python_content_to_json, + DIST_EXTENSIONS, PYPI_LAST_SERIAL, PYPI_SERIAL_CONSTANT, ) @@ -328,3 +332,51 @@ def finalize_new_version(self, new_version): """ remove_duplicates(new_version) validate_repo_version(new_version) + + +class PackagePermissionGuard(ContentGuard, AutoAddObjPermsMixin): + """ + A content guard that protects individual PyPI packages within a distribution/index. + + This guard allows fine-grained control over which users/groups can download or upload + specific packages. Policies are stored as JSON dictionaries mapping package names to + lists of user/group PRNs. + """ + + TYPE = "package_permission" + + download_policy = models.JSONField( + default=dict, + help_text=_( + "Dictionary mapping package names to lists of user/group PRNs" + "that are allowed to download those packages." + ), + ) + upload_policy = models.JSONField( + default=dict, + help_text=_( + "Dictionary mapping package names to lists of user/group PRNs" + "that are allowed to upload those packages." + ), + ) + + def permit(self, request): + """ + Check if the request can download the package. Do nothing if requesting metadata. + """ + from .global_access_conditions import package_permission_check + + if drequest := request.get("drf_request", None): + if not any(drequest.path.endswith(ext) for ext in DIST_EXTENSIONS.keys()): + return + view = APIView() + setattr(view, "content_guard", self) + if package_permission_check(drequest, view, "GET", "download"): + return + raise PermissionError("Access to the requested resource is not authorized.") + + class Meta: + default_related_name = "%(app_label)s_%(model_name)s" + permissions = [ + ("manage_roles_packagepermissionguard", "Can manage role assignments on package permission guard"), + ] diff --git a/pulp_python/app/pypi/views.py b/pulp_python/app/pypi/views.py index bd8bc2af..29819910 100644 --- a/pulp_python/app/pypi/views.py +++ b/pulp_python/app/pypi/views.py @@ -230,7 +230,7 @@ class SimpleView(PackageUploadMixin, ViewSet): "action": ["create"], "principal": "authenticated", "effect": "allow", - "condition": "index_has_repo_perm:python.modify_pythonrepository", + "condition_expression": "package_permission_check:upload or index_has_repo_perm:python.modify_pythonrepository", }, ], } @@ -403,7 +403,7 @@ class UploadView(PackageUploadMixin, ViewSet): "action": ["create"], "principal": "authenticated", "effect": "allow", - "condition": "index_has_repo_perm:python.modify_pythonrepository", + "condition_expression": "package_permission_check:upload or index_has_repo_perm:python.modify_pythonrepository", }, ], } diff --git a/pulp_python/app/serializers.py b/pulp_python/app/serializers.py index d8387adf..a7feb009 100644 --- a/pulp_python/app/serializers.py +++ b/pulp_python/app/serializers.py @@ -8,12 +8,13 @@ from pulpcore.plugin import models as core_models from pulpcore.plugin import serializers as core_serializers -from pulpcore.plugin.util import get_domain +from pulpcore.plugin.util import get_domain, get_prn from pulp_python.app import models as python_models from pulp_python.app.utils import ( DIST_EXTENSIONS, artifact_to_python_content_data, + canonicalize_name, get_project_metadata_from_file, parse_project_metadata, ) @@ -596,3 +597,78 @@ class PythonPublicationSerializer(core_serializers.PublicationSerializer): class Meta: fields = core_serializers.PublicationSerializer.Meta.fields + ("distributions",) model = python_models.PythonPublication + + +class PackagePermissionGuardSerializer(core_serializers.ContentGuardSerializer): + """ + A Serializer for PackagePermissionGuard. + """ + + download_policy = serializers.JSONField( + help_text=_( + "Dictionary mapping package names to lists of user/group PRNs or hrefs " + "that are allowed to download those packages." + ), + default=dict, + ) + upload_policy = serializers.JSONField( + help_text=_( + "Dictionary mapping package names to lists of user/group PRNs or hrefs " + "that are allowed to upload those packages." + ), + default=dict, + ) + + def validate_policy(self, value): + policy = {} + for package_name, prns in value.items(): + policy[canonicalize_name(package_name)] = [get_prn(uri=prn) for prn in prns] + return policy + + def validate_download_policy(self, value): + return self.validate_policy(value) + + def validate_upload_policy(self, value): + return self.validate_policy(value) + + class Meta(core_serializers.ContentGuardSerializer.Meta): + model = python_models.PackagePermissionGuard + fields = core_serializers.ContentGuardSerializer.Meta.fields + ("download_policy", "upload_policy") + + +class PackagePermissionGuardAddRemoveSerializer(serializers.Serializer): + """ + Serializer for add/remove operations on PackagePermissionGuard. + """ + + packages = serializers.ListField( + child=serializers.CharField(), + help_text=_("List of package names to add/remove permissions for."), + required=True, + ) + users_groups = serializers.ListField( + child=serializers.CharField(), + help_text=_("List of user/group PRNs or hrefs. Use ['*'] to remove all entries."), + required=True, + ) + policy_type = serializers.ChoiceField( + choices=["download", "upload"], + default="download", + help_text=_("Which policy to modify: 'download' or 'upload'."), + required=False, + ) + + def validate(self, data): + data = super().validate(data) + adding = self.context["action"] == "add" + if "*" in data["users_groups"]: + if (len(data["users_groups"]) > 1 or adding): + raise serializers.ValidationError(_("'*' can only be used when removing and should be the only item present")) + else: + data["users_groups"] = [get_prn(uri=user_group) for user_group in data["users_groups"]] + if "*" in data["packages"]: + if (len(data["packages"]) > 1 or adding): + raise serializers.ValidationError(_("'*' can only be used when removing and should be the only item present")) + else: + data["packages"] = [canonicalize_name(package) for package in data["packages"]] + return data \ No newline at end of file diff --git a/pulp_python/app/viewsets.py b/pulp_python/app/viewsets.py index dfe03bfb..ec3d8fb0 100644 --- a/pulp_python/app/viewsets.py +++ b/pulp_python/app/viewsets.py @@ -13,10 +13,12 @@ RepositorySyncURLSerializer, ) from pulpcore.plugin.tasking import dispatch +from pulpcore.plugin.util import get_prn from pulp_python.app import models as python_models from pulp_python.app import serializers as python_serializers from pulp_python.app import tasks +from pulp_python.app.utils import canonicalize_name class PythonRepositoryViewSet( @@ -623,3 +625,148 @@ def create(self, request): kwargs={"repository_version_pk": str(repository_version.pk)}, ) return core_viewsets.OperationPostponedResponse(result, request) + + +class PackagePermissionGuardViewSet(core_viewsets.ContentGuardViewSet, core_viewsets.RolesMixin): + """ + Viewset for creating content guards that protect individual PyPI packages. + Has add and remove actions for managing permissions for users and groups on specific packages. + """ + + endpoint_name = "package_permission" + serializer_class = python_serializers.PackagePermissionGuardSerializer + queryset = python_models.PackagePermissionGuard.objects.all() + queryset_filtering_required_permission = "python.view_packagepermissionguard" + + DEFAULT_ACCESS_POLICY = { + "statements": [ + { + "action": ["list"], + "principal": "authenticated", + "effect": "allow", + }, + { + "action": ["create"], + "principal": "authenticated", + "effect": "allow", + "condition": "has_model_or_domain_perms:python.add_packagepermissionguard", + }, + { + "action": ["retrieve", "my_permissions"], + "principal": "authenticated", + "effect": "allow", + "condition": "has_model_or_domain_or_obj_perms:python.view_packagepermissionguard", + }, + { + "action": ["update", "partial_update"], + "principal": "authenticated", + "effect": "allow", + "condition": "has_model_or_domain_or_obj_perms:python.change_packagepermissionguard", + }, + { + "action": ["destroy"], + "principal": "authenticated", + "effect": "allow", + "condition": "has_model_or_domain_or_obj_perms:python.delete_packagepermissionguard", + }, + { + "action": ["add", "remove"], + "principal": "authenticated", + "effect": "allow", + "condition": "has_model_or_domain_or_obj_perms:python.change_packagepermissionguard", + }, + { + "action": ["list_roles", "add_role", "remove_role"], + "principal": "authenticated", + "effect": "allow", + "condition": [ + "has_model_or_domain_or_obj_perms:python.manage_roles_packagepermissionguard" + ], + }, + ], + "creation_hooks": [ + { + "function": "add_roles_for_object_creator", + "parameters": {"roles": "python.packagepermissionguard_owner"}, + }, + ], + "queryset_scoping": {"function": "scope_queryset"}, + } + LOCKED_ROLES = { + "python.packagepermissionguard_creator": ["python.add_packagepermissionguard"], + "python.packagepermissionguard_owner": [ + "python.view_packagepermissionguard", + "python.change_packagepermissionguard", + "python.delete_packagepermissionguard", + "python.manage_roles_packagepermissionguard", + ], + "python.packagepermissionguard_viewer": ["python.view_packagepermissionguard"], + } + + @extend_schema( + summary="Add package permissions", + request=python_serializers.PackagePermissionGuardAddRemoveSerializer, + responses={200: python_serializers.PackagePermissionGuardSerializer}, + ) + @action(detail=True, methods=["post"], serializer_class=python_serializers.PackagePermissionGuardAddRemoveSerializer) + def add(self, request, pk): + """ + Add users/groups to packages in download_policy or upload_policy. + """ + guard = self.get_object() + serializer = self.get_serializer(data=request.data, context={"action": "add"}) + serializer.is_valid(raise_exception=True) + + packages = serializer.validated_data["packages"] + users_groups = serializer.validated_data["users_groups"] + policy_type = serializer.validated_data.get("policy_type", "download") # 'download' or 'upload' + + policy = getattr(guard, f"{policy_type}_policy") + + for package in packages: + if package not in policy: + policy[package] = [] + policy[package] = list(set(policy[package] + users_groups)) + + setattr(guard, f"{policy_type}_policy", policy) + guard.save() + + serializer = python_serializers.PackagePermissionGuardSerializer(guard, context={"request": request}) + return Response(serializer.data) + + @extend_schema( + summary="Remove package permissions", + request=python_serializers.PackagePermissionGuardAddRemoveSerializer, + responses={200: python_serializers.PackagePermissionGuardSerializer}, + ) + @action(detail=True, methods=["post"], serializer_class=python_serializers.PackagePermissionGuardAddRemoveSerializer) + def remove(self, request, pk): + """ + Remove users/groups from packages in download_policy or upload_policy. + If packages contains ['*'], remove all entries for all packages. + If users_groups contains ['*'], remove all entries for the specified packages. + """ + guard = self.get_object() + serializer = self.get_serializer(data=request.data, context={"action": "remove"}) + serializer.is_valid(raise_exception=True) + + packages = serializer.validated_data["packages"] + users_groups = serializer.validated_data["users_groups"] + policy_type = serializer.validated_data.get("policy_type", "download") # 'download' or 'upload' + + policy = getattr(guard, f"{policy_type}_policy") + + if "*" in packages: + policy = {} + else: + for package in packages: + if "*" in users_groups: + del policy[package] + else: + policy[package] = list(set(policy[package]) - set(users_groups)) + + setattr(guard, f"{policy_type}_policy", policy) + guard.save() + + serializer = python_serializers.PackagePermissionGuardSerializer(guard, context={"request": request}) + return Response(serializer.data) diff --git a/pulp_python/tests/functional/api/test_package_permission_guard.py b/pulp_python/tests/functional/api/test_package_permission_guard.py new file mode 100644 index 00000000..4be10848 --- /dev/null +++ b/pulp_python/tests/functional/api/test_package_permission_guard.py @@ -0,0 +1,187 @@ +import pytest +import requests +import uuid + +from pulp_python.tests.functional.constants import ( + PYTHON_EGG_FILENAME, + PYTHON_EGG_SHA256, +) + + +@pytest.fixture +def package_permission_guard(python_bindings, gen_object_with_cleanup): + """Fixture to create a PackagePermissionGuard.""" + def _create_guard(name=None, download_policy=None, upload_policy=None): + body = {"name": name or str(uuid.uuid4())} + if download_policy is not None: + body["download_policy"] = download_policy + if upload_policy is not None: + body["upload_policy"] = upload_policy + return gen_object_with_cleanup( + python_bindings.ContentguardsPackagePermissionApi, body + ) + return _create_guard + + +@pytest.mark.parallel +def test_package_permission_guard_add_remove( + python_bindings, + gen_user, + package_permission_guard, + pulpcore_bindings, +): + """Test add and remove actions on PackagePermissionGuard.""" + user = gen_user(model_roles=["python.packagepermissionguard_creator"]) + test_user = gen_user() + test_group = pulpcore_bindings.GroupsApi.create({"name": str(uuid.uuid4())}) + + with user: + guard = package_permission_guard() + + # Add download permissions + add_body = { + "packages": ["shelf-reader", "Django"], + "users_groups": [user.user.pulp_href, test_user.user.prn, test_group.prn], + "policy_type": "download" + } + python_bindings.ContentguardsPackagePermissionApi.add(guard.pulp_href, add_body) + + updated_guard = python_bindings.ContentguardsPackagePermissionApi.read(guard.pulp_href) + assert "shelf-reader" in updated_guard.download_policy + assert "django" in updated_guard.download_policy + assert user.user.prn in updated_guard.download_policy["shelf-reader"] + assert test_user.user.prn in updated_guard.download_policy["shelf-reader"] + assert test_group.prn in updated_guard.download_policy["shelf-reader"] + + # Remove specific user/group from a package + remove_body = { + "packages": ["shelf-reader"], + "users_groups": [test_user.user.prn], + "policy_type": "download" + } + python_bindings.ContentguardsPackagePermissionApi.remove(guard.pulp_href, remove_body) + + updated_guard = python_bindings.ContentguardsPackagePermissionApi.read(guard.pulp_href) + assert test_user.user.prn not in updated_guard.download_policy["shelf-reader"] + assert user.user.prn in updated_guard.download_policy["shelf-reader"] + assert test_group.prn in updated_guard.download_policy["shelf-reader"] + + # Remove all entries for a package using '*' + remove_all_body = { + "packages": ["django"], + "users_groups": ["*"], + "policy_type": "download" + } + python_bindings.ContentguardsPackagePermissionApi.remove(guard.pulp_href, remove_all_body) + + updated_guard = python_bindings.ContentguardsPackagePermissionApi.read(guard.pulp_href) + assert "django" not in updated_guard.download_policy + + # Test removing all packages using '*' in packages + remove_all_packages_body = { + "packages": ["*"], + "users_groups": [], + "policy_type": "download" + } + python_bindings.ContentguardsPackagePermissionApi.remove(guard.pulp_href, remove_all_packages_body) + + updated_guard = python_bindings.ContentguardsPackagePermissionApi.read(guard.pulp_href) + assert updated_guard.download_policy == {} + + +@pytest.mark.parallel +def test_package_permission_guard_download( + python_bindings, + gen_user, + package_permission_guard, + python_repo_factory, + python_distribution_factory, + python_publication_factory, + python_file, + pulp_content_url, + monitor_task, +): + """Test that PackagePermissionGuard controls package downloads.""" + allowed_user = gen_user() + denied_user = gen_user() + + # Setup distribution with content guard + repo = python_repo_factory() + body = {"relative_path": PYTHON_EGG_FILENAME, "file": python_file, "repository": repo.pulp_href} + response = python_bindings.ContentPackagesApi.create(**body) + monitor_task(response.task) + pub = python_publication_factory(repository=repo) + guard = package_permission_guard( + download_policy={"shelf-reader": [allowed_user.user.prn]} + ) + distro = python_distribution_factory( + publication=pub.pulp_href, + content_guard=guard.pulp_href + ) + + # Test that allowed user can download + download_url = f"{pulp_content_url}{distro.base_path}/{PYTHON_EGG_FILENAME}" + response = requests.get(download_url, auth=(allowed_user.username, allowed_user.password)) + assert response.status_code == 200 + + # Test that denied user cannot download + response = requests.get(download_url, auth=(denied_user.username, denied_user.password)) + assert response.status_code == 403 + + # Test that anonymous user cannot download + response = requests.get(download_url) + assert response.status_code == 403 + + # Remove policy and test that anonymous and denied users can download + remove_all_body = { + "packages": ["shelf-reader"], + "users_groups": ["*"], + "policy_type": "download" + } + python_bindings.ContentguardsPackagePermissionApi.remove(guard.pulp_href, remove_all_body) + response = requests.get(download_url, auth=(denied_user.username, denied_user.password)) + assert response.status_code == 200 + response = requests.get(download_url) + assert response.status_code == 200 + + +@pytest.mark.parallel +def test_package_permission_guard_upload( + gen_user, + package_permission_guard, + python_repo_factory, + python_distribution_factory, + python_file, +): + """Test that PackagePermissionGuard controls package uploads.""" + allowed_user = gen_user() + denied_user = gen_user() + + # Setup repository and distribution + repo = python_repo_factory() + guard = package_permission_guard( + upload_policy={"shelf-reader": [allowed_user.user.prn]} + ) + distro = python_distribution_factory( + repository=repo.pulp_href, + content_guard=guard.pulp_href + ) + + url = f"{distro.base_url}simple/" + # Test that denied user cannot upload + response = requests.post( + url, + data={"sha256_digest": PYTHON_EGG_SHA256}, + files={"content": open(python_file, "rb")}, + auth=(denied_user.username, denied_user.password), + ) + assert response.status_code == 403 + + # Test that allowed user can upload + response = requests.post( + url, + data={"sha256_digest": PYTHON_EGG_SHA256}, + files={"content": open(python_file, "rb")}, + auth=(allowed_user.username, allowed_user.password), + ) + assert response.status_code == 202