|
2 | 2 | from collections import defaultdict |
3 | 3 |
|
4 | 4 | from django.conf import settings |
| 5 | +from django.contrib.auth import get_user_model |
5 | 6 | from django.contrib.auth.backends import ModelBackend, RemoteUserBackend as _RemoteUserBackend |
6 | | -from django.contrib.auth.models import Group |
| 7 | +from django.contrib.auth.models import Group, AnonymousUser |
7 | 8 | from django.core.exceptions import ImproperlyConfigured |
8 | 9 | from django.db.models import Q |
9 | 10 |
|
10 | 11 | from users.models import ObjectPermission |
11 | 12 | from utilities.permissions import permission_is_exempt, resolve_permission, resolve_permission_ct |
12 | 13 |
|
| 14 | +UserModel = get_user_model() |
| 15 | + |
13 | 16 |
|
14 | 17 | class ObjectPermissionMixin(): |
15 | 18 |
|
@@ -101,38 +104,145 @@ class RemoteUserBackend(_RemoteUserBackend): |
101 | 104 | def create_unknown_user(self): |
102 | 105 | return settings.REMOTE_AUTH_AUTO_CREATE_USER |
103 | 106 |
|
104 | | - def configure_user(self, request, user): |
| 107 | + def configure_groups(self, user, remote_groups): |
105 | 108 | logger = logging.getLogger('netbox.authentication.RemoteUserBackend') |
106 | 109 |
|
107 | 110 | # Assign default groups to the user |
108 | 111 | group_list = [] |
109 | | - for name in settings.REMOTE_AUTH_DEFAULT_GROUPS: |
| 112 | + for name in remote_groups: |
110 | 113 | try: |
111 | 114 | group_list.append(Group.objects.get(name=name)) |
112 | 115 | except Group.DoesNotExist: |
113 | | - logging.error(f"Could not assign group {name} to remotely-authenticated user {user}: Group not found") |
| 116 | + logging.error( |
| 117 | + f"Could not assign group {name} to remotely-authenticated user {user}: Group not found") |
114 | 118 | if group_list: |
115 | | - user.groups.add(*group_list) |
116 | | - logger.debug(f"Assigned groups to remotely-authenticated user {user}: {group_list}") |
| 119 | + user.groups.set(group_list) |
| 120 | + logger.debug( |
| 121 | + f"Assigned groups to remotely-authenticated user {user}: {group_list}") |
| 122 | + else: |
| 123 | + user.groups.clear() |
| 124 | + logger.debug(f"Stripping user {user} from Groups") |
| 125 | + user.is_superuser = self._is_superuser(user) |
| 126 | + logger.debug(f"User {user} is Superuser: {user.is_superuser}") |
| 127 | + logger.debug( |
| 128 | + f"User {user} should be Superuser: {self._is_superuser(user)}") |
| 129 | + |
| 130 | + user.is_staff = self._is_staff(user) |
| 131 | + logger.debug(f"User {user} is Staff: {user.is_staff}") |
| 132 | + logger.debug(f"User {user} should be Staff: {self._is_staff(user)}") |
| 133 | + user.save() |
| 134 | + return user |
117 | 135 |
|
118 | | - # Assign default object permissions to the user |
119 | | - permissions_list = [] |
120 | | - for permission_name, constraints in settings.REMOTE_AUTH_DEFAULT_PERMISSIONS.items(): |
| 136 | + def authenticate(self, request, remote_user, remote_groups=None): |
| 137 | + """ |
| 138 | + The username passed as ``remote_user`` is considered trusted. Return |
| 139 | + the ``User`` object with the given username. Create a new ``User`` |
| 140 | + object if ``create_unknown_user`` is ``True``. |
| 141 | + Return None if ``create_unknown_user`` is ``False`` and a ``User`` |
| 142 | + object with the given username is not found in the database. |
| 143 | + """ |
| 144 | + logger = logging.getLogger('netbox.authentication.RemoteUserBackend') |
| 145 | + logger.debug( |
| 146 | + f"trying to authenticate {remote_user} with groups {remote_groups}") |
| 147 | + if not remote_user: |
| 148 | + return |
| 149 | + user = None |
| 150 | + username = self.clean_username(remote_user) |
| 151 | + |
| 152 | + # Note that this could be accomplished in one try-except clause, but |
| 153 | + # instead we use get_or_create when creating unknown users since it has |
| 154 | + # built-in safeguards for multiple threads. |
| 155 | + if self.create_unknown_user: |
| 156 | + user, created = UserModel._default_manager.get_or_create(**{ |
| 157 | + UserModel.USERNAME_FIELD: username |
| 158 | + }) |
| 159 | + if created: |
| 160 | + user = self.configure_user(request, user) |
| 161 | + else: |
121 | 162 | try: |
122 | | - object_type, action = resolve_permission_ct(permission_name) |
123 | | - # TODO: Merge multiple actions into a single ObjectPermission per content type |
124 | | - obj_perm = ObjectPermission(actions=[action], constraints=constraints) |
125 | | - obj_perm.save() |
126 | | - obj_perm.users.add(user) |
127 | | - obj_perm.object_types.add(object_type) |
128 | | - permissions_list.append(permission_name) |
129 | | - except ValueError: |
130 | | - logging.error( |
131 | | - f"Invalid permission name: '{permission_name}'. Permissions must be in the form " |
132 | | - "<app>.<action>_<model>. (Example: dcim.add_site)" |
133 | | - ) |
134 | | - if permissions_list: |
135 | | - logger.debug(f"Assigned permissions to remotely-authenticated user {user}: {permissions_list}") |
| 163 | + user = UserModel._default_manager.get_by_natural_key(username) |
| 164 | + except UserModel.DoesNotExist: |
| 165 | + pass |
| 166 | + if self.user_can_authenticate(user): |
| 167 | + if settings.REMOTE_AUTH_GROUP_SYNC_ENABLED: |
| 168 | + if user is not None and not isinstance(user, AnonymousUser): |
| 169 | + return self.configure_groups(user, remote_groups) |
| 170 | + else: |
| 171 | + return user |
| 172 | + else: |
| 173 | + return None |
| 174 | + |
| 175 | + def _is_superuser(self, user): |
| 176 | + logger = logging.getLogger('netbox.authentication.RemoteUserBackend') |
| 177 | + superuser_groups = settings.REMOTE_AUTH_SUPERUSER_GROUPS |
| 178 | + logger.debug(f"Superuser Groups: {superuser_groups}") |
| 179 | + superusers = settings.REMOTE_AUTH_SUPERUSERS |
| 180 | + logger.debug(f"Superuser Users: {superusers}") |
| 181 | + user_groups = set() |
| 182 | + for g in user.groups.all(): |
| 183 | + user_groups.add(g.name) |
| 184 | + logger.debug(f"User {user.username} is in Groups:{user_groups}") |
| 185 | + |
| 186 | + result = user.username in superusers or ( |
| 187 | + set(user_groups) & set(superuser_groups)) |
| 188 | + logger.debug(f"User {user.username} in Superuser Users :{result}") |
| 189 | + return bool(result) |
| 190 | + |
| 191 | + def _is_staff(self, user): |
| 192 | + logger = logging.getLogger('netbox.authentication.RemoteUserBackend') |
| 193 | + staff_groups = settings.REMOTE_AUTH_STAFF_GROUPS |
| 194 | + logger.debug(f"Superuser Groups: {staff_groups}") |
| 195 | + staff_users = settings.REMOTE_AUTH_STAFF_USERS |
| 196 | + logger.debug(f"Staff Users :{staff_users}") |
| 197 | + user_groups = set() |
| 198 | + for g in user.groups.all(): |
| 199 | + user_groups.add(g.name) |
| 200 | + logger.debug(f"User {user.username} is in Groups:{user_groups}") |
| 201 | + result = user.username in staff_users or ( |
| 202 | + set(user_groups) & set(staff_groups)) |
| 203 | + logger.debug(f"User {user.username} in Staff Users :{result}") |
| 204 | + return bool(result) |
| 205 | + |
| 206 | + def configure_user(self, request, user): |
| 207 | + logger = logging.getLogger('netbox.authentication.RemoteUserBackend') |
| 208 | + if not settings.REMOTE_AUTH_GROUP_SYNC_ENABLED: |
| 209 | + # Assign default groups to the user |
| 210 | + group_list = [] |
| 211 | + for name in settings.REMOTE_AUTH_DEFAULT_GROUPS: |
| 212 | + try: |
| 213 | + group_list.append(Group.objects.get(name=name)) |
| 214 | + except Group.DoesNotExist: |
| 215 | + logging.error( |
| 216 | + f"Could not assign group {name} to remotely-authenticated user {user}: Group not found") |
| 217 | + if group_list: |
| 218 | + user.groups.add(*group_list) |
| 219 | + logger.debug( |
| 220 | + f"Assigned groups to remotely-authenticated user {user}: {group_list}") |
| 221 | + |
| 222 | + # Assign default object permissions to the user |
| 223 | + permissions_list = [] |
| 224 | + for permission_name, constraints in settings.REMOTE_AUTH_DEFAULT_PERMISSIONS.items(): |
| 225 | + try: |
| 226 | + object_type, action = resolve_permission_ct( |
| 227 | + permission_name) |
| 228 | + # TODO: Merge multiple actions into a single ObjectPermission per content type |
| 229 | + obj_perm = ObjectPermission( |
| 230 | + actions=[action], constraints=constraints) |
| 231 | + obj_perm.save() |
| 232 | + obj_perm.users.add(user) |
| 233 | + obj_perm.object_types.add(object_type) |
| 234 | + permissions_list.append(permission_name) |
| 235 | + except ValueError: |
| 236 | + logging.error( |
| 237 | + f"Invalid permission name: '{permission_name}'. Permissions must be in the form " |
| 238 | + "<app>.<action>_<model>. (Example: dcim.add_site)" |
| 239 | + ) |
| 240 | + if permissions_list: |
| 241 | + logger.debug( |
| 242 | + f"Assigned permissions to remotely-authenticated user {user}: {permissions_list}") |
| 243 | + else: |
| 244 | + logger.debug( |
| 245 | + f"Skipped initial assignment of permissions and groups to remotely-authenticated user {user} as Group sync is enabled") |
136 | 246 |
|
137 | 247 | return user |
138 | 248 |
|
|
0 commit comments