FossilRepo

fossilrepo / accounts / views.py
Source Blame History 388 lines
0c354ac… ragelink 1 import logging
c588255… ragelink 2 import re
c588255… ragelink 3
0c354ac… ragelink 4 import requests
c588255… ragelink 5 from django.contrib import messages
c588255… ragelink 6 from django.contrib.auth import login, logout
c588255… ragelink 7 from django.contrib.auth.decorators import login_required
c588255… ragelink 8 from django.http import HttpResponse
c588255… ragelink 9 from django.shortcuts import get_object_or_404, redirect, render
c588255… ragelink 10 from django.utils.http import url_has_allowed_host_and_scheme
c588255… ragelink 11 from django.views.decorators.http import require_POST
c588255… ragelink 12 from django_ratelimit.decorators import ratelimit
c588255… ragelink 13
c588255… ragelink 14 from .forms import LoginForm
c588255… ragelink 15 from .models import PersonalAccessToken, UserProfile
0c354ac… ragelink 16
0c354ac… ragelink 17 logger = logging.getLogger(__name__)
c588255… ragelink 18
c588255… ragelink 19 # Allowed SSH key type prefixes
c588255… ragelink 20 _SSH_KEY_PREFIXES = ("ssh-ed25519", "ssh-rsa", "ecdsa-sha2-", "ssh-dss")
c588255… ragelink 21
c588255… ragelink 22
c588255… ragelink 23 def _sanitize_ssh_key(public_key: str) -> tuple[str | None, str]:
c588255… ragelink 24 """Validate and sanitize an SSH public key.
c588255… ragelink 25
c588255… ragelink 26 Returns (sanitized_key, error_message). On success error_message is "".
c588255… ragelink 27 Rejects keys containing newlines, carriage returns, or null bytes (which
c588255… ragelink 28 would allow injection of extra authorized_keys entries). Validates format:
c588255… ragelink 29 known type prefix, 2-3 space-separated parts.
c588255… ragelink 30 """
c588255… ragelink 31 # Strip dangerous injection characters -- newlines let an attacker add
c588255… ragelink 32 # a second authorized_keys line outside the forced-command wrapper
c588255… ragelink 33 if "\n" in public_key or "\r" in public_key or "\x00" in public_key:
c588255… ragelink 34 return None, "SSH key must be a single line. Newlines, carriage returns, and null bytes are not allowed."
c588255… ragelink 35
c588255… ragelink 36 key = public_key.strip()
c588255… ragelink 37 if not key:
c588255… ragelink 38 return None, "SSH key cannot be empty."
c588255… ragelink 39
c588255… ragelink 40 # SSH keys are: <type> <base64-data> [optional comment]
c588255… ragelink 41 parts = key.split()
c588255… ragelink 42 if len(parts) < 2 or len(parts) > 3:
c588255… ragelink 43 return None, "Invalid SSH key format. Expected: <key-type> <key-data> [comment]"
c588255… ragelink 44
c588255… ragelink 45 key_type = parts[0]
c588255… ragelink 46 if not any(key_type.startswith(prefix) for prefix in _SSH_KEY_PREFIXES):
c588255… ragelink 47 return None, f"Unsupported key type '{key_type}'. Allowed: ssh-ed25519, ssh-rsa, ecdsa-sha2-*, ssh-dss."
c588255… ragelink 48
c588255… ragelink 49 # Validate base64 data is plausible (only base64 chars + padding)
c588255… ragelink 50 if not re.match(r"^[A-Za-z0-9+/=]+$", parts[1]):
c588255… ragelink 51 return None, "Invalid SSH key data encoding."
c588255… ragelink 52
c588255… ragelink 53 return key, ""
c588255… ragelink 54
c588255… ragelink 55
0c354ac… ragelink 56 def _verify_turnstile(token: str, remote_ip: str) -> bool:
0c354ac… ragelink 57 """Verify a Cloudflare Turnstile response token. Returns True if valid."""
0c354ac… ragelink 58 from constance import config
0c354ac… ragelink 59
0c354ac… ragelink 60 if not config.TURNSTILE_SECRET_KEY:
0c354ac… ragelink 61 return False
0c354ac… ragelink 62 try:
0c354ac… ragelink 63 resp = requests.post(
0c354ac… ragelink 64 "https://challenges.cloudflare.com/turnstile/v0/siteverify",
0c354ac… ragelink 65 data={"secret": config.TURNSTILE_SECRET_KEY, "response": token, "remoteip": remote_ip},
0c354ac… ragelink 66 timeout=5,
0c354ac… ragelink 67 )
0c354ac… ragelink 68 return resp.status_code == 200 and resp.json().get("success", False)
0c354ac… ragelink 69 except Exception:
0c354ac… ragelink 70 logger.exception("Turnstile verification failed")
0c354ac… ragelink 71 return False
0c354ac… ragelink 72
0c354ac… ragelink 73
c588255… ragelink 74 @ratelimit(key="ip", rate="10/m", block=True)
c588255… ragelink 75 def login_view(request):
c588255… ragelink 76 if request.user.is_authenticated:
c588255… ragelink 77 return redirect("dashboard")
c588255… ragelink 78
0c354ac… ragelink 79 from constance import config
0c354ac… ragelink 80
0c354ac… ragelink 81 turnstile_enabled = config.TURNSTILE_ENABLED and config.TURNSTILE_SITE_KEY
0c354ac… ragelink 82 turnstile_error = False
0c354ac… ragelink 83
c588255… ragelink 84 if request.method == "POST":
0c354ac… ragelink 85 # Verify Turnstile before processing login
0c354ac… ragelink 86 if turnstile_enabled:
0c354ac… ragelink 87 turnstile_token = request.POST.get("cf-turnstile-response", "")
0c354ac… ragelink 88 if not turnstile_token or not _verify_turnstile(turnstile_token, request.META.get("REMOTE_ADDR", "")):
0c354ac… ragelink 89 turnstile_error = True
0c354ac… ragelink 90 form = LoginForm()
0c354ac… ragelink 91 return render(
0c354ac… ragelink 92 request,
0c354ac… ragelink 93 "accounts/login.html",
0c354ac… ragelink 94 {
0c354ac… ragelink 95 "form": form,
0c354ac… ragelink 96 "turnstile_enabled": True,
0c354ac… ragelink 97 "turnstile_site_key": config.TURNSTILE_SITE_KEY,
0c354ac… ragelink 98 "turnstile_error": True,
0c354ac… ragelink 99 },
0c354ac… ragelink 100 )
0c354ac… ragelink 101
c588255… ragelink 102 form = LoginForm(request, data=request.POST)
c588255… ragelink 103 if form.is_valid():
c588255… ragelink 104 login(request, form.get_user())
c588255… ragelink 105 next_url = request.GET.get("next", "")
c588255… ragelink 106 if next_url and url_has_allowed_host_and_scheme(next_url, allowed_hosts={request.get_host()}):
c588255… ragelink 107 return redirect(next_url)
c588255… ragelink 108 return redirect("dashboard")
c588255… ragelink 109 else:
c588255… ragelink 110 form = LoginForm()
c588255… ragelink 111
0c354ac… ragelink 112 ctx = {"form": form}
0c354ac… ragelink 113 if turnstile_enabled:
0c354ac… ragelink 114 ctx["turnstile_enabled"] = True
0c354ac… ragelink 115 ctx["turnstile_site_key"] = config.TURNSTILE_SITE_KEY
0c354ac… ragelink 116 ctx["turnstile_error"] = turnstile_error
0c354ac… ragelink 117 return render(request, "accounts/login.html", ctx)
c588255… ragelink 118
c588255… ragelink 119
c588255… ragelink 120 @require_POST
c588255… ragelink 121 def logout_view(request):
c588255… ragelink 122 logout(request)
c588255… ragelink 123 return redirect("accounts:login")
c588255… ragelink 124
c588255… ragelink 125
c588255… ragelink 126 # ---------------------------------------------------------------------------
c588255… ragelink 127 # SSH key management
c588255… ragelink 128 # ---------------------------------------------------------------------------
c588255… ragelink 129
c588255… ragelink 130
c588255… ragelink 131 def _parse_key_type(public_key):
c588255… ragelink 132 """Extract key type from public key string."""
c588255… ragelink 133 parts = public_key.strip().split()
c588255… ragelink 134 if parts:
c588255… ragelink 135 key_prefix = parts[0]
c588255… ragelink 136 type_map = {
c588255… ragelink 137 "ssh-ed25519": "ed25519",
c588255… ragelink 138 "ssh-rsa": "rsa",
c588255… ragelink 139 "ecdsa-sha2-nistp256": "ecdsa",
c588255… ragelink 140 "ecdsa-sha2-nistp384": "ecdsa",
c588255… ragelink 141 "ecdsa-sha2-nistp521": "ecdsa",
c588255… ragelink 142 "ssh-dss": "dsa",
c588255… ragelink 143 }
c588255… ragelink 144 return type_map.get(key_prefix, key_prefix)
c588255… ragelink 145 return ""
c588255… ragelink 146
c588255… ragelink 147
c588255… ragelink 148 def _compute_fingerprint(public_key):
c588255… ragelink 149 """Compute SSH key fingerprint (SHA256)."""
c588255… ragelink 150 import base64
c588255… ragelink 151 import hashlib
c588255… ragelink 152
c588255… ragelink 153 parts = public_key.strip().split()
c588255… ragelink 154 if len(parts) >= 2:
c588255… ragelink 155 try:
c588255… ragelink 156 key_data = base64.b64decode(parts[1])
c588255… ragelink 157 digest = hashlib.sha256(key_data).digest()
c588255… ragelink 158 return "SHA256:" + base64.b64encode(digest).rstrip(b"=").decode()
c588255… ragelink 159 except Exception:
c588255… ragelink 160 pass
c588255… ragelink 161 return ""
c588255… ragelink 162
c588255… ragelink 163
c588255… ragelink 164 def _regenerate_authorized_keys():
c588255… ragelink 165 """Regenerate the authorized_keys file from all active user SSH keys."""
c588255… ragelink 166 from pathlib import Path
c588255… ragelink 167
c588255… ragelink 168 from constance import config
c588255… ragelink 169
c588255… ragelink 170 from fossil.user_keys import UserSSHKey
c588255… ragelink 171
c588255… ragelink 172 ssh_dir = Path(config.FOSSIL_DATA_DIR).parent / "ssh"
c588255… ragelink 173 ssh_dir.mkdir(parents=True, exist_ok=True)
c588255… ragelink 174 authorized_keys_path = ssh_dir / "authorized_keys"
c588255… ragelink 175
c588255… ragelink 176 keys = UserSSHKey.objects.filter(deleted_at__isnull=True).select_related("user")
c588255… ragelink 177
c588255… ragelink 178 lines = []
c588255… ragelink 179 for key in keys:
c588255… ragelink 180 # Defense in depth: strip newlines/CR/null from stored keys so a
c588255… ragelink 181 # compromised DB value cannot inject extra authorized_keys entries.
c588255… ragelink 182 clean_key = key.public_key.strip().replace("\n", "").replace("\r", "").replace("\x00", "")
c588255… ragelink 183 if not clean_key:
c588255… ragelink 184 continue
c588255… ragelink 185 # Each key gets a forced command that identifies the user
c588255… ragelink 186 forced_cmd = (
c588255… ragelink 187 f'command="/usr/local/bin/fossil-shell {key.user.username}",no-port-forwarding,no-X11-forwarding,no-agent-forwarding,no-pty'
c588255… ragelink 188 )
c588255… ragelink 189 lines.append(f"{forced_cmd} {clean_key}")
c588255… ragelink 190
c588255… ragelink 191 authorized_keys_path.write_text("\n".join(lines) + "\n" if lines else "")
c588255… ragelink 192 authorized_keys_path.chmod(0o600)
c588255… ragelink 193
c588255… ragelink 194
c588255… ragelink 195 @login_required
c588255… ragelink 196 def ssh_keys(request):
c588255… ragelink 197 """List and add SSH keys."""
c588255… ragelink 198 from fossil.user_keys import UserSSHKey
c588255… ragelink 199
c588255… ragelink 200 keys = UserSSHKey.objects.filter(user=request.user)
c588255… ragelink 201
c588255… ragelink 202 if request.method == "POST":
c588255… ragelink 203 title = request.POST.get("title", "").strip()
c588255… ragelink 204 public_key = request.POST.get("public_key", "").strip()
c588255… ragelink 205
c588255… ragelink 206 if title and public_key:
c588255… ragelink 207 sanitized_key, error = _sanitize_ssh_key(public_key)
c588255… ragelink 208 if error:
c588255… ragelink 209 messages.error(request, error)
c588255… ragelink 210 return render(request, "accounts/ssh_keys.html", {"keys": keys})
c588255… ragelink 211
c588255… ragelink 212 key_type = _parse_key_type(sanitized_key)
c588255… ragelink 213 fingerprint = _compute_fingerprint(sanitized_key)
c588255… ragelink 214
c588255… ragelink 215 UserSSHKey.objects.create(
c588255… ragelink 216 user=request.user,
c588255… ragelink 217 title=title,
c588255… ragelink 218 public_key=sanitized_key,
c588255… ragelink 219 key_type=key_type,
c588255… ragelink 220 fingerprint=fingerprint,
c588255… ragelink 221 created_by=request.user,
c588255… ragelink 222 )
c588255… ragelink 223
c588255… ragelink 224 _regenerate_authorized_keys()
c588255… ragelink 225
c588255… ragelink 226 messages.success(request, f'SSH key "{title}" added.')
c588255… ragelink 227 return redirect("accounts:ssh_keys")
c588255… ragelink 228
c588255… ragelink 229 return render(request, "accounts/ssh_keys.html", {"keys": keys})
c588255… ragelink 230
c588255… ragelink 231
c588255… ragelink 232 @login_required
c588255… ragelink 233 @require_POST
c588255… ragelink 234 def ssh_key_delete(request, pk):
c588255… ragelink 235 """Delete an SSH key."""
c588255… ragelink 236 from fossil.user_keys import UserSSHKey
c588255… ragelink 237
c588255… ragelink 238 key = get_object_or_404(UserSSHKey, pk=pk, user=request.user)
c588255… ragelink 239 key.soft_delete(user=request.user)
c588255… ragelink 240 _regenerate_authorized_keys()
c588255… ragelink 241
c588255… ragelink 242 messages.success(request, f'SSH key "{key.title}" removed.')
c588255… ragelink 243
c588255… ragelink 244 if request.headers.get("HX-Request"):
c588255… ragelink 245 return HttpResponse(status=200, headers={"HX-Redirect": "/auth/ssh-keys/"})
c588255… ragelink 246
c588255… ragelink 247 return redirect("accounts:ssh_keys")
c588255… ragelink 248
c588255… ragelink 249
c588255… ragelink 250 @login_required
c588255… ragelink 251 def notification_preferences(request):
c588255… ragelink 252 """User notification preferences page."""
c588255… ragelink 253 from fossil.notifications import NotificationPreference
c588255… ragelink 254
c588255… ragelink 255 prefs, _ = NotificationPreference.objects.get_or_create(user=request.user)
c588255… ragelink 256
c588255… ragelink 257 if request.method == "POST":
c588255… ragelink 258 prefs.delivery_mode = request.POST.get("delivery_mode", "immediate")
c588255… ragelink 259 prefs.notify_checkins = "notify_checkins" in request.POST
c588255… ragelink 260 prefs.notify_tickets = "notify_tickets" in request.POST
c588255… ragelink 261 prefs.notify_wiki = "notify_wiki" in request.POST
c588255… ragelink 262 prefs.notify_releases = "notify_releases" in request.POST
c588255… ragelink 263 prefs.notify_forum = "notify_forum" in request.POST
c588255… ragelink 264 prefs.save()
c588255… ragelink 265
c588255… ragelink 266 messages.success(request, "Notification preferences updated.")
c588255… ragelink 267
c588255… ragelink 268 if request.headers.get("HX-Request"):
c588255… ragelink 269 return HttpResponse(status=200, headers={"HX-Redirect": "/auth/notifications/"})
c588255… ragelink 270
c588255… ragelink 271 return redirect("accounts:notification_prefs")
c588255… ragelink 272
c588255… ragelink 273 return render(request, "accounts/notification_prefs.html", {"prefs": prefs})
c588255… ragelink 274
c588255… ragelink 275
c588255… ragelink 276 # ---------------------------------------------------------------------------
c588255… ragelink 277 # Unified profile
c588255… ragelink 278 # ---------------------------------------------------------------------------
c588255… ragelink 279
c588255… ragelink 280 VALID_SCOPES = {"read", "write", "admin"}
c588255… ragelink 281
c588255… ragelink 282
c588255… ragelink 283 @login_required
c588255… ragelink 284 def profile(request):
c588255… ragelink 285 """Unified user profile page consolidating all personal settings."""
c588255… ragelink 286 from fossil.notifications import NotificationPreference
c588255… ragelink 287 from fossil.user_keys import UserSSHKey
c588255… ragelink 288
c588255… ragelink 289 user_profile, _ = UserProfile.objects.get_or_create(user=request.user)
c588255… ragelink 290 notif_prefs, _ = NotificationPreference.objects.get_or_create(user=request.user)
c588255… ragelink 291 ssh_keys = UserSSHKey.objects.filter(user=request.user)
c588255… ragelink 292 tokens = PersonalAccessToken.objects.filter(user=request.user, revoked_at__isnull=True)
c588255… ragelink 293
c588255… ragelink 294 return render(
c588255… ragelink 295 request,
c588255… ragelink 296 "accounts/profile.html",
c588255… ragelink 297 {
c588255… ragelink 298 "user_profile": user_profile,
c588255… ragelink 299 "notif_prefs": notif_prefs,
c588255… ragelink 300 "ssh_keys": ssh_keys,
c588255… ragelink 301 "tokens": tokens,
c588255… ragelink 302 },
c588255… ragelink 303 )
c588255… ragelink 304
c588255… ragelink 305
c588255… ragelink 306 @login_required
c588255… ragelink 307 def profile_edit(request):
c588255… ragelink 308 """Edit profile info: name, email, handle, bio, location, website."""
c588255… ragelink 309 user_profile, _ = UserProfile.objects.get_or_create(user=request.user)
c588255… ragelink 310
c588255… ragelink 311 if request.method == "POST":
c588255… ragelink 312 user = request.user
c588255… ragelink 313 user.first_name = request.POST.get("first_name", "").strip()[:30]
c588255… ragelink 314 user.last_name = request.POST.get("last_name", "").strip()[:150]
c588255… ragelink 315 user.email = request.POST.get("email", "").strip()[:254]
c588255… ragelink 316 user.save(update_fields=["first_name", "last_name", "email"])
c588255… ragelink 317
c588255… ragelink 318 raw_handle = request.POST.get("handle", "").strip()
c588255… ragelink 319 handle = UserProfile.sanitize_handle(raw_handle)
c588255… ragelink 320 if handle:
c588255… ragelink 321 # Check uniqueness (excluding self)
c588255… ragelink 322 conflict = UserProfile.objects.filter(handle=handle).exclude(pk=user_profile.pk).exists()
c588255… ragelink 323 if conflict:
c588255… ragelink 324 messages.error(request, f"Handle @{handle} is already taken.")
c588255… ragelink 325 return render(request, "accounts/profile_edit.html", {"user_profile": user_profile})
c588255… ragelink 326 user_profile.handle = handle
c588255… ragelink 327 else:
c588255… ragelink 328 user_profile.handle = None
c588255… ragelink 329
c588255… ragelink 330 user_profile.bio = request.POST.get("bio", "").strip()[:500]
c588255… ragelink 331 user_profile.location = request.POST.get("location", "").strip()[:100]
c588255… ragelink 332 user_profile.website = request.POST.get("website", "").strip()[:200]
c588255… ragelink 333 user_profile.save()
c588255… ragelink 334
c588255… ragelink 335 messages.success(request, "Profile updated.")
c588255… ragelink 336 return redirect("accounts:profile")
c588255… ragelink 337
c588255… ragelink 338 return render(request, "accounts/profile_edit.html", {"user_profile": user_profile})
c588255… ragelink 339
c588255… ragelink 340
c588255… ragelink 341 @login_required
c588255… ragelink 342 def profile_token_create(request):
c588255… ragelink 343 """Generate a personal access token. Shows the raw token once."""
c588255… ragelink 344 if request.method == "POST":
c588255… ragelink 345 name = request.POST.get("name", "").strip()
c588255… ragelink 346 if not name:
c588255… ragelink 347 messages.error(request, "Token name is required.")
c588255… ragelink 348 return render(request, "accounts/profile_token_create.html", {})
c588255… ragelink 349
c588255… ragelink 350 raw_scopes = request.POST.get("scopes", "read").strip()
c588255… ragelink 351 scopes = ",".join(s.strip() for s in raw_scopes.split(",") if s.strip() in VALID_SCOPES)
c588255… ragelink 352 if not scopes:
c588255… ragelink 353 scopes = "read"
c588255… ragelink 354
c588255… ragelink 355 raw_token, token_hash, prefix = PersonalAccessToken.generate()
c588255… ragelink 356 PersonalAccessToken.objects.create(
c588255… ragelink 357 user=request.user,
c588255… ragelink 358 name=name,
c588255… ragelink 359 token_hash=token_hash,
c588255… ragelink 360 token_prefix=prefix,
c588255… ragelink 361 scopes=scopes,
c588255… ragelink 362 )
c588255… ragelink 363
c588255… ragelink 364 return render(
c588255… ragelink 365 request,
c588255… ragelink 366 "accounts/profile_token_created.html",
c588255… ragelink 367 {"raw_token": raw_token, "token_name": name},
c588255… ragelink 368 )
c588255… ragelink 369
c588255… ragelink 370 return render(request, "accounts/profile_token_create.html", {})
c588255… ragelink 371
c588255… ragelink 372
c588255… ragelink 373 @login_required
c588255… ragelink 374 @require_POST
c588255… ragelink 375 def profile_token_revoke(request, guid):
c588255… ragelink 376 """Revoke a personal access token by GUID (token_prefix used as public id)."""
c588255… ragelink 377 from django.utils import timezone
c588255… ragelink 378
c588255… ragelink 379 token = get_object_or_404(PersonalAccessToken, token_prefix=guid, user=request.user, revoked_at__isnull=True)
c588255… ragelink 380 token.revoked_at = timezone.now()
c588255… ragelink 381 token.save(update_fields=["revoked_at"])
c588255… ragelink 382
c588255… ragelink 383 messages.success(request, f'Token "{token.name}" revoked.')
c588255… ragelink 384
c588255… ragelink 385 if request.headers.get("HX-Request"):
c588255… ragelink 386 return HttpResponse(status=200, headers={"HX-Redirect": "/auth/profile/"})
c588255… ragelink 387
c588255… ragelink 388 return redirect("accounts:profile")

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button