|
0c354ac…
|
ragelink
|
1 |
import logging |
|
c588255…
|
ragelink
|
2 |
import re |
|
c588255…
|
ragelink
|
3 |
|
|
0c354ac…
|
ragelink
|
4 |
import requests |
|
c588255…
|
ragelink
|
5 |
from django.contrib import messages |
|
c588255…
|
ragelink
|
6 |
from django.contrib.auth import login, logout |
|
c588255…
|
ragelink
|
7 |
from django.contrib.auth.decorators import login_required |
|
c588255…
|
ragelink
|
8 |
from django.http import HttpResponse |
|
c588255…
|
ragelink
|
9 |
from django.shortcuts import get_object_or_404, redirect, render |
|
c588255…
|
ragelink
|
10 |
from django.utils.http import url_has_allowed_host_and_scheme |
|
c588255…
|
ragelink
|
11 |
from django.views.decorators.http import require_POST |
|
c588255…
|
ragelink
|
12 |
from django_ratelimit.decorators import ratelimit |
|
c588255…
|
ragelink
|
13 |
|
|
c588255…
|
ragelink
|
14 |
from .forms import LoginForm |
|
c588255…
|
ragelink
|
15 |
from .models import PersonalAccessToken, UserProfile |
|
0c354ac…
|
ragelink
|
16 |
|
|
0c354ac…
|
ragelink
|
17 |
logger = logging.getLogger(__name__) |
|
c588255…
|
ragelink
|
18 |
|
|
c588255…
|
ragelink
|
19 |
# Allowed SSH key type prefixes |
|
c588255…
|
ragelink
|
20 |
_SSH_KEY_PREFIXES = ("ssh-ed25519", "ssh-rsa", "ecdsa-sha2-", "ssh-dss") |
|
c588255…
|
ragelink
|
21 |
|
|
c588255…
|
ragelink
|
22 |
|
|
c588255…
|
ragelink
|
23 |
def _sanitize_ssh_key(public_key: str) -> tuple[str | None, str]: |
|
c588255…
|
ragelink
|
24 |
"""Validate and sanitize an SSH public key. |
|
c588255…
|
ragelink
|
25 |
|
|
c588255…
|
ragelink
|
26 |
Returns (sanitized_key, error_message). On success error_message is "". |
|
c588255…
|
ragelink
|
27 |
Rejects keys containing newlines, carriage returns, or null bytes (which |
|
c588255…
|
ragelink
|
28 |
would allow injection of extra authorized_keys entries). Validates format: |
|
c588255…
|
ragelink
|
29 |
known type prefix, 2-3 space-separated parts. |
|
c588255…
|
ragelink
|
30 |
""" |
|
c588255…
|
ragelink
|
31 |
# Strip dangerous injection characters -- newlines let an attacker add |
|
c588255…
|
ragelink
|
32 |
# a second authorized_keys line outside the forced-command wrapper |
|
c588255…
|
ragelink
|
33 |
if "\n" in public_key or "\r" in public_key or "\x00" in public_key: |
|
c588255…
|
ragelink
|
34 |
return None, "SSH key must be a single line. Newlines, carriage returns, and null bytes are not allowed." |
|
c588255…
|
ragelink
|
35 |
|
|
c588255…
|
ragelink
|
36 |
key = public_key.strip() |
|
c588255…
|
ragelink
|
37 |
if not key: |
|
c588255…
|
ragelink
|
38 |
return None, "SSH key cannot be empty." |
|
c588255…
|
ragelink
|
39 |
|
|
c588255…
|
ragelink
|
40 |
# SSH keys are: <type> <base64-data> [optional comment] |
|
c588255…
|
ragelink
|
41 |
parts = key.split() |
|
c588255…
|
ragelink
|
42 |
if len(parts) < 2 or len(parts) > 3: |
|
c588255…
|
ragelink
|
43 |
return None, "Invalid SSH key format. Expected: <key-type> <key-data> [comment]" |
|
c588255…
|
ragelink
|
44 |
|
|
c588255…
|
ragelink
|
45 |
key_type = parts[0] |
|
c588255…
|
ragelink
|
46 |
if not any(key_type.startswith(prefix) for prefix in _SSH_KEY_PREFIXES): |
|
c588255…
|
ragelink
|
47 |
return None, f"Unsupported key type '{key_type}'. Allowed: ssh-ed25519, ssh-rsa, ecdsa-sha2-*, ssh-dss." |
|
c588255…
|
ragelink
|
48 |
|
|
c588255…
|
ragelink
|
49 |
# Validate base64 data is plausible (only base64 chars + padding) |
|
c588255…
|
ragelink
|
50 |
if not re.match(r"^[A-Za-z0-9+/=]+$", parts[1]): |
|
c588255…
|
ragelink
|
51 |
return None, "Invalid SSH key data encoding." |
|
c588255…
|
ragelink
|
52 |
|
|
c588255…
|
ragelink
|
53 |
return key, "" |
|
c588255…
|
ragelink
|
54 |
|
|
c588255…
|
ragelink
|
55 |
|
|
0c354ac…
|
ragelink
|
56 |
def _verify_turnstile(token: str, remote_ip: str) -> bool: |
|
0c354ac…
|
ragelink
|
57 |
"""Verify a Cloudflare Turnstile response token. Returns True if valid.""" |
|
0c354ac…
|
ragelink
|
58 |
from constance import config |
|
0c354ac…
|
ragelink
|
59 |
|
|
0c354ac…
|
ragelink
|
60 |
if not config.TURNSTILE_SECRET_KEY: |
|
0c354ac…
|
ragelink
|
61 |
return False |
|
0c354ac…
|
ragelink
|
62 |
try: |
|
0c354ac…
|
ragelink
|
63 |
resp = requests.post( |
|
0c354ac…
|
ragelink
|
64 |
"https://challenges.cloudflare.com/turnstile/v0/siteverify", |
|
0c354ac…
|
ragelink
|
65 |
data={"secret": config.TURNSTILE_SECRET_KEY, "response": token, "remoteip": remote_ip}, |
|
0c354ac…
|
ragelink
|
66 |
timeout=5, |
|
0c354ac…
|
ragelink
|
67 |
) |
|
0c354ac…
|
ragelink
|
68 |
return resp.status_code == 200 and resp.json().get("success", False) |
|
0c354ac…
|
ragelink
|
69 |
except Exception: |
|
0c354ac…
|
ragelink
|
70 |
logger.exception("Turnstile verification failed") |
|
0c354ac…
|
ragelink
|
71 |
return False |
|
0c354ac…
|
ragelink
|
72 |
|
|
0c354ac…
|
ragelink
|
73 |
|
|
c588255…
|
ragelink
|
74 |
@ratelimit(key="ip", rate="10/m", block=True) |
|
c588255…
|
ragelink
|
75 |
def login_view(request): |
|
c588255…
|
ragelink
|
76 |
if request.user.is_authenticated: |
|
c588255…
|
ragelink
|
77 |
return redirect("dashboard") |
|
c588255…
|
ragelink
|
78 |
|
|
0c354ac…
|
ragelink
|
79 |
from constance import config |
|
0c354ac…
|
ragelink
|
80 |
|
|
0c354ac…
|
ragelink
|
81 |
turnstile_enabled = config.TURNSTILE_ENABLED and config.TURNSTILE_SITE_KEY |
|
0c354ac…
|
ragelink
|
82 |
turnstile_error = False |
|
0c354ac…
|
ragelink
|
83 |
|
|
c588255…
|
ragelink
|
84 |
if request.method == "POST": |
|
0c354ac…
|
ragelink
|
85 |
# Verify Turnstile before processing login |
|
0c354ac…
|
ragelink
|
86 |
if turnstile_enabled: |
|
0c354ac…
|
ragelink
|
87 |
turnstile_token = request.POST.get("cf-turnstile-response", "") |
|
0c354ac…
|
ragelink
|
88 |
if not turnstile_token or not _verify_turnstile(turnstile_token, request.META.get("REMOTE_ADDR", "")): |
|
0c354ac…
|
ragelink
|
89 |
turnstile_error = True |
|
0c354ac…
|
ragelink
|
90 |
form = LoginForm() |
|
0c354ac…
|
ragelink
|
91 |
return render( |
|
0c354ac…
|
ragelink
|
92 |
request, |
|
0c354ac…
|
ragelink
|
93 |
"accounts/login.html", |
|
0c354ac…
|
ragelink
|
94 |
{ |
|
0c354ac…
|
ragelink
|
95 |
"form": form, |
|
0c354ac…
|
ragelink
|
96 |
"turnstile_enabled": True, |
|
0c354ac…
|
ragelink
|
97 |
"turnstile_site_key": config.TURNSTILE_SITE_KEY, |
|
0c354ac…
|
ragelink
|
98 |
"turnstile_error": True, |
|
0c354ac…
|
ragelink
|
99 |
}, |
|
0c354ac…
|
ragelink
|
100 |
) |
|
0c354ac…
|
ragelink
|
101 |
|
|
c588255…
|
ragelink
|
102 |
form = LoginForm(request, data=request.POST) |
|
c588255…
|
ragelink
|
103 |
if form.is_valid(): |
|
c588255…
|
ragelink
|
104 |
login(request, form.get_user()) |
|
c588255…
|
ragelink
|
105 |
next_url = request.GET.get("next", "") |
|
c588255…
|
ragelink
|
106 |
if next_url and url_has_allowed_host_and_scheme(next_url, allowed_hosts={request.get_host()}): |
|
c588255…
|
ragelink
|
107 |
return redirect(next_url) |
|
c588255…
|
ragelink
|
108 |
return redirect("dashboard") |
|
c588255…
|
ragelink
|
109 |
else: |
|
c588255…
|
ragelink
|
110 |
form = LoginForm() |
|
c588255…
|
ragelink
|
111 |
|
|
0c354ac…
|
ragelink
|
112 |
ctx = {"form": form} |
|
0c354ac…
|
ragelink
|
113 |
if turnstile_enabled: |
|
0c354ac…
|
ragelink
|
114 |
ctx["turnstile_enabled"] = True |
|
0c354ac…
|
ragelink
|
115 |
ctx["turnstile_site_key"] = config.TURNSTILE_SITE_KEY |
|
0c354ac…
|
ragelink
|
116 |
ctx["turnstile_error"] = turnstile_error |
|
0c354ac…
|
ragelink
|
117 |
return render(request, "accounts/login.html", ctx) |
|
c588255…
|
ragelink
|
118 |
|
|
c588255…
|
ragelink
|
119 |
|
|
c588255…
|
ragelink
|
120 |
@require_POST |
|
c588255…
|
ragelink
|
121 |
def logout_view(request): |
|
c588255…
|
ragelink
|
122 |
logout(request) |
|
c588255…
|
ragelink
|
123 |
return redirect("accounts:login") |
|
c588255…
|
ragelink
|
124 |
|
|
c588255…
|
ragelink
|
125 |
|
|
c588255…
|
ragelink
|
126 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
127 |
# SSH key management |
|
c588255…
|
ragelink
|
128 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
129 |
|
|
c588255…
|
ragelink
|
130 |
|
|
c588255…
|
ragelink
|
131 |
def _parse_key_type(public_key): |
|
c588255…
|
ragelink
|
132 |
"""Extract key type from public key string.""" |
|
c588255…
|
ragelink
|
133 |
parts = public_key.strip().split() |
|
c588255…
|
ragelink
|
134 |
if parts: |
|
c588255…
|
ragelink
|
135 |
key_prefix = parts[0] |
|
c588255…
|
ragelink
|
136 |
type_map = { |
|
c588255…
|
ragelink
|
137 |
"ssh-ed25519": "ed25519", |
|
c588255…
|
ragelink
|
138 |
"ssh-rsa": "rsa", |
|
c588255…
|
ragelink
|
139 |
"ecdsa-sha2-nistp256": "ecdsa", |
|
c588255…
|
ragelink
|
140 |
"ecdsa-sha2-nistp384": "ecdsa", |
|
c588255…
|
ragelink
|
141 |
"ecdsa-sha2-nistp521": "ecdsa", |
|
c588255…
|
ragelink
|
142 |
"ssh-dss": "dsa", |
|
c588255…
|
ragelink
|
143 |
} |
|
c588255…
|
ragelink
|
144 |
return type_map.get(key_prefix, key_prefix) |
|
c588255…
|
ragelink
|
145 |
return "" |
|
c588255…
|
ragelink
|
146 |
|
|
c588255…
|
ragelink
|
147 |
|
|
c588255…
|
ragelink
|
148 |
def _compute_fingerprint(public_key): |
|
c588255…
|
ragelink
|
149 |
"""Compute SSH key fingerprint (SHA256).""" |
|
c588255…
|
ragelink
|
150 |
import base64 |
|
c588255…
|
ragelink
|
151 |
import hashlib |
|
c588255…
|
ragelink
|
152 |
|
|
c588255…
|
ragelink
|
153 |
parts = public_key.strip().split() |
|
c588255…
|
ragelink
|
154 |
if len(parts) >= 2: |
|
c588255…
|
ragelink
|
155 |
try: |
|
c588255…
|
ragelink
|
156 |
key_data = base64.b64decode(parts[1]) |
|
c588255…
|
ragelink
|
157 |
digest = hashlib.sha256(key_data).digest() |
|
c588255…
|
ragelink
|
158 |
return "SHA256:" + base64.b64encode(digest).rstrip(b"=").decode() |
|
c588255…
|
ragelink
|
159 |
except Exception: |
|
c588255…
|
ragelink
|
160 |
pass |
|
c588255…
|
ragelink
|
161 |
return "" |
|
c588255…
|
ragelink
|
162 |
|
|
c588255…
|
ragelink
|
163 |
|
|
c588255…
|
ragelink
|
164 |
def _regenerate_authorized_keys(): |
|
c588255…
|
ragelink
|
165 |
"""Regenerate the authorized_keys file from all active user SSH keys.""" |
|
c588255…
|
ragelink
|
166 |
from pathlib import Path |
|
c588255…
|
ragelink
|
167 |
|
|
c588255…
|
ragelink
|
168 |
from constance import config |
|
c588255…
|
ragelink
|
169 |
|
|
c588255…
|
ragelink
|
170 |
from fossil.user_keys import UserSSHKey |
|
c588255…
|
ragelink
|
171 |
|
|
c588255…
|
ragelink
|
172 |
ssh_dir = Path(config.FOSSIL_DATA_DIR).parent / "ssh" |
|
c588255…
|
ragelink
|
173 |
ssh_dir.mkdir(parents=True, exist_ok=True) |
|
c588255…
|
ragelink
|
174 |
authorized_keys_path = ssh_dir / "authorized_keys" |
|
c588255…
|
ragelink
|
175 |
|
|
c588255…
|
ragelink
|
176 |
keys = UserSSHKey.objects.filter(deleted_at__isnull=True).select_related("user") |
|
c588255…
|
ragelink
|
177 |
|
|
c588255…
|
ragelink
|
178 |
lines = [] |
|
c588255…
|
ragelink
|
179 |
for key in keys: |
|
c588255…
|
ragelink
|
180 |
# Defense in depth: strip newlines/CR/null from stored keys so a |
|
c588255…
|
ragelink
|
181 |
# compromised DB value cannot inject extra authorized_keys entries. |
|
c588255…
|
ragelink
|
182 |
clean_key = key.public_key.strip().replace("\n", "").replace("\r", "").replace("\x00", "") |
|
c588255…
|
ragelink
|
183 |
if not clean_key: |
|
c588255…
|
ragelink
|
184 |
continue |
|
c588255…
|
ragelink
|
185 |
# Each key gets a forced command that identifies the user |
|
c588255…
|
ragelink
|
186 |
forced_cmd = ( |
|
c588255…
|
ragelink
|
187 |
f'command="/usr/local/bin/fossil-shell {key.user.username}",no-port-forwarding,no-X11-forwarding,no-agent-forwarding,no-pty' |
|
c588255…
|
ragelink
|
188 |
) |
|
c588255…
|
ragelink
|
189 |
lines.append(f"{forced_cmd} {clean_key}") |
|
c588255…
|
ragelink
|
190 |
|
|
c588255…
|
ragelink
|
191 |
authorized_keys_path.write_text("\n".join(lines) + "\n" if lines else "") |
|
c588255…
|
ragelink
|
192 |
authorized_keys_path.chmod(0o600) |
|
c588255…
|
ragelink
|
193 |
|
|
c588255…
|
ragelink
|
194 |
|
|
c588255…
|
ragelink
|
195 |
@login_required |
|
c588255…
|
ragelink
|
196 |
def ssh_keys(request): |
|
c588255…
|
ragelink
|
197 |
"""List and add SSH keys.""" |
|
c588255…
|
ragelink
|
198 |
from fossil.user_keys import UserSSHKey |
|
c588255…
|
ragelink
|
199 |
|
|
c588255…
|
ragelink
|
200 |
keys = UserSSHKey.objects.filter(user=request.user) |
|
c588255…
|
ragelink
|
201 |
|
|
c588255…
|
ragelink
|
202 |
if request.method == "POST": |
|
c588255…
|
ragelink
|
203 |
title = request.POST.get("title", "").strip() |
|
c588255…
|
ragelink
|
204 |
public_key = request.POST.get("public_key", "").strip() |
|
c588255…
|
ragelink
|
205 |
|
|
c588255…
|
ragelink
|
206 |
if title and public_key: |
|
c588255…
|
ragelink
|
207 |
sanitized_key, error = _sanitize_ssh_key(public_key) |
|
c588255…
|
ragelink
|
208 |
if error: |
|
c588255…
|
ragelink
|
209 |
messages.error(request, error) |
|
c588255…
|
ragelink
|
210 |
return render(request, "accounts/ssh_keys.html", {"keys": keys}) |
|
c588255…
|
ragelink
|
211 |
|
|
c588255…
|
ragelink
|
212 |
key_type = _parse_key_type(sanitized_key) |
|
c588255…
|
ragelink
|
213 |
fingerprint = _compute_fingerprint(sanitized_key) |
|
c588255…
|
ragelink
|
214 |
|
|
c588255…
|
ragelink
|
215 |
UserSSHKey.objects.create( |
|
c588255…
|
ragelink
|
216 |
user=request.user, |
|
c588255…
|
ragelink
|
217 |
title=title, |
|
c588255…
|
ragelink
|
218 |
public_key=sanitized_key, |
|
c588255…
|
ragelink
|
219 |
key_type=key_type, |
|
c588255…
|
ragelink
|
220 |
fingerprint=fingerprint, |
|
c588255…
|
ragelink
|
221 |
created_by=request.user, |
|
c588255…
|
ragelink
|
222 |
) |
|
c588255…
|
ragelink
|
223 |
|
|
c588255…
|
ragelink
|
224 |
_regenerate_authorized_keys() |
|
c588255…
|
ragelink
|
225 |
|
|
c588255…
|
ragelink
|
226 |
messages.success(request, f'SSH key "{title}" added.') |
|
c588255…
|
ragelink
|
227 |
return redirect("accounts:ssh_keys") |
|
c588255…
|
ragelink
|
228 |
|
|
c588255…
|
ragelink
|
229 |
return render(request, "accounts/ssh_keys.html", {"keys": keys}) |
|
c588255…
|
ragelink
|
230 |
|
|
c588255…
|
ragelink
|
231 |
|
|
c588255…
|
ragelink
|
232 |
@login_required |
|
c588255…
|
ragelink
|
233 |
@require_POST |
|
c588255…
|
ragelink
|
234 |
def ssh_key_delete(request, pk): |
|
c588255…
|
ragelink
|
235 |
"""Delete an SSH key.""" |
|
c588255…
|
ragelink
|
236 |
from fossil.user_keys import UserSSHKey |
|
c588255…
|
ragelink
|
237 |
|
|
c588255…
|
ragelink
|
238 |
key = get_object_or_404(UserSSHKey, pk=pk, user=request.user) |
|
c588255…
|
ragelink
|
239 |
key.soft_delete(user=request.user) |
|
c588255…
|
ragelink
|
240 |
_regenerate_authorized_keys() |
|
c588255…
|
ragelink
|
241 |
|
|
c588255…
|
ragelink
|
242 |
messages.success(request, f'SSH key "{key.title}" removed.') |
|
c588255…
|
ragelink
|
243 |
|
|
c588255…
|
ragelink
|
244 |
if request.headers.get("HX-Request"): |
|
c588255…
|
ragelink
|
245 |
return HttpResponse(status=200, headers={"HX-Redirect": "/auth/ssh-keys/"}) |
|
c588255…
|
ragelink
|
246 |
|
|
c588255…
|
ragelink
|
247 |
return redirect("accounts:ssh_keys") |
|
c588255…
|
ragelink
|
248 |
|
|
c588255…
|
ragelink
|
249 |
|
|
c588255…
|
ragelink
|
250 |
@login_required |
|
c588255…
|
ragelink
|
251 |
def notification_preferences(request): |
|
c588255…
|
ragelink
|
252 |
"""User notification preferences page.""" |
|
c588255…
|
ragelink
|
253 |
from fossil.notifications import NotificationPreference |
|
c588255…
|
ragelink
|
254 |
|
|
c588255…
|
ragelink
|
255 |
prefs, _ = NotificationPreference.objects.get_or_create(user=request.user) |
|
c588255…
|
ragelink
|
256 |
|
|
c588255…
|
ragelink
|
257 |
if request.method == "POST": |
|
c588255…
|
ragelink
|
258 |
prefs.delivery_mode = request.POST.get("delivery_mode", "immediate") |
|
c588255…
|
ragelink
|
259 |
prefs.notify_checkins = "notify_checkins" in request.POST |
|
c588255…
|
ragelink
|
260 |
prefs.notify_tickets = "notify_tickets" in request.POST |
|
c588255…
|
ragelink
|
261 |
prefs.notify_wiki = "notify_wiki" in request.POST |
|
c588255…
|
ragelink
|
262 |
prefs.notify_releases = "notify_releases" in request.POST |
|
c588255…
|
ragelink
|
263 |
prefs.notify_forum = "notify_forum" in request.POST |
|
c588255…
|
ragelink
|
264 |
prefs.save() |
|
c588255…
|
ragelink
|
265 |
|
|
c588255…
|
ragelink
|
266 |
messages.success(request, "Notification preferences updated.") |
|
c588255…
|
ragelink
|
267 |
|
|
c588255…
|
ragelink
|
268 |
if request.headers.get("HX-Request"): |
|
c588255…
|
ragelink
|
269 |
return HttpResponse(status=200, headers={"HX-Redirect": "/auth/notifications/"}) |
|
c588255…
|
ragelink
|
270 |
|
|
c588255…
|
ragelink
|
271 |
return redirect("accounts:notification_prefs") |
|
c588255…
|
ragelink
|
272 |
|
|
c588255…
|
ragelink
|
273 |
return render(request, "accounts/notification_prefs.html", {"prefs": prefs}) |
|
c588255…
|
ragelink
|
274 |
|
|
c588255…
|
ragelink
|
275 |
|
|
c588255…
|
ragelink
|
276 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
277 |
# Unified profile |
|
c588255…
|
ragelink
|
278 |
# --------------------------------------------------------------------------- |
|
c588255…
|
ragelink
|
279 |
|
|
c588255…
|
ragelink
|
280 |
VALID_SCOPES = {"read", "write", "admin"} |
|
c588255…
|
ragelink
|
281 |
|
|
c588255…
|
ragelink
|
282 |
|
|
c588255…
|
ragelink
|
283 |
@login_required |
|
c588255…
|
ragelink
|
284 |
def profile(request): |
|
c588255…
|
ragelink
|
285 |
"""Unified user profile page consolidating all personal settings.""" |
|
c588255…
|
ragelink
|
286 |
from fossil.notifications import NotificationPreference |
|
c588255…
|
ragelink
|
287 |
from fossil.user_keys import UserSSHKey |
|
c588255…
|
ragelink
|
288 |
|
|
c588255…
|
ragelink
|
289 |
user_profile, _ = UserProfile.objects.get_or_create(user=request.user) |
|
c588255…
|
ragelink
|
290 |
notif_prefs, _ = NotificationPreference.objects.get_or_create(user=request.user) |
|
c588255…
|
ragelink
|
291 |
ssh_keys = UserSSHKey.objects.filter(user=request.user) |
|
c588255…
|
ragelink
|
292 |
tokens = PersonalAccessToken.objects.filter(user=request.user, revoked_at__isnull=True) |
|
c588255…
|
ragelink
|
293 |
|
|
c588255…
|
ragelink
|
294 |
return render( |
|
c588255…
|
ragelink
|
295 |
request, |
|
c588255…
|
ragelink
|
296 |
"accounts/profile.html", |
|
c588255…
|
ragelink
|
297 |
{ |
|
c588255…
|
ragelink
|
298 |
"user_profile": user_profile, |
|
c588255…
|
ragelink
|
299 |
"notif_prefs": notif_prefs, |
|
c588255…
|
ragelink
|
300 |
"ssh_keys": ssh_keys, |
|
c588255…
|
ragelink
|
301 |
"tokens": tokens, |
|
c588255…
|
ragelink
|
302 |
}, |
|
c588255…
|
ragelink
|
303 |
) |
|
c588255…
|
ragelink
|
304 |
|
|
c588255…
|
ragelink
|
305 |
|
|
c588255…
|
ragelink
|
306 |
@login_required |
|
c588255…
|
ragelink
|
307 |
def profile_edit(request): |
|
c588255…
|
ragelink
|
308 |
"""Edit profile info: name, email, handle, bio, location, website.""" |
|
c588255…
|
ragelink
|
309 |
user_profile, _ = UserProfile.objects.get_or_create(user=request.user) |
|
c588255…
|
ragelink
|
310 |
|
|
c588255…
|
ragelink
|
311 |
if request.method == "POST": |
|
c588255…
|
ragelink
|
312 |
user = request.user |
|
c588255…
|
ragelink
|
313 |
user.first_name = request.POST.get("first_name", "").strip()[:30] |
|
c588255…
|
ragelink
|
314 |
user.last_name = request.POST.get("last_name", "").strip()[:150] |
|
c588255…
|
ragelink
|
315 |
user.email = request.POST.get("email", "").strip()[:254] |
|
c588255…
|
ragelink
|
316 |
user.save(update_fields=["first_name", "last_name", "email"]) |
|
c588255…
|
ragelink
|
317 |
|
|
c588255…
|
ragelink
|
318 |
raw_handle = request.POST.get("handle", "").strip() |
|
c588255…
|
ragelink
|
319 |
handle = UserProfile.sanitize_handle(raw_handle) |
|
c588255…
|
ragelink
|
320 |
if handle: |
|
c588255…
|
ragelink
|
321 |
# Check uniqueness (excluding self) |
|
c588255…
|
ragelink
|
322 |
conflict = UserProfile.objects.filter(handle=handle).exclude(pk=user_profile.pk).exists() |
|
c588255…
|
ragelink
|
323 |
if conflict: |
|
c588255…
|
ragelink
|
324 |
messages.error(request, f"Handle @{handle} is already taken.") |
|
c588255…
|
ragelink
|
325 |
return render(request, "accounts/profile_edit.html", {"user_profile": user_profile}) |
|
c588255…
|
ragelink
|
326 |
user_profile.handle = handle |
|
c588255…
|
ragelink
|
327 |
else: |
|
c588255…
|
ragelink
|
328 |
user_profile.handle = None |
|
c588255…
|
ragelink
|
329 |
|
|
c588255…
|
ragelink
|
330 |
user_profile.bio = request.POST.get("bio", "").strip()[:500] |
|
c588255…
|
ragelink
|
331 |
user_profile.location = request.POST.get("location", "").strip()[:100] |
|
c588255…
|
ragelink
|
332 |
user_profile.website = request.POST.get("website", "").strip()[:200] |
|
c588255…
|
ragelink
|
333 |
user_profile.save() |
|
c588255…
|
ragelink
|
334 |
|
|
c588255…
|
ragelink
|
335 |
messages.success(request, "Profile updated.") |
|
c588255…
|
ragelink
|
336 |
return redirect("accounts:profile") |
|
c588255…
|
ragelink
|
337 |
|
|
c588255…
|
ragelink
|
338 |
return render(request, "accounts/profile_edit.html", {"user_profile": user_profile}) |
|
c588255…
|
ragelink
|
339 |
|
|
c588255…
|
ragelink
|
340 |
|
|
c588255…
|
ragelink
|
341 |
@login_required |
|
c588255…
|
ragelink
|
342 |
def profile_token_create(request): |
|
c588255…
|
ragelink
|
343 |
"""Generate a personal access token. Shows the raw token once.""" |
|
c588255…
|
ragelink
|
344 |
if request.method == "POST": |
|
c588255…
|
ragelink
|
345 |
name = request.POST.get("name", "").strip() |
|
c588255…
|
ragelink
|
346 |
if not name: |
|
c588255…
|
ragelink
|
347 |
messages.error(request, "Token name is required.") |
|
c588255…
|
ragelink
|
348 |
return render(request, "accounts/profile_token_create.html", {}) |
|
c588255…
|
ragelink
|
349 |
|
|
c588255…
|
ragelink
|
350 |
raw_scopes = request.POST.get("scopes", "read").strip() |
|
c588255…
|
ragelink
|
351 |
scopes = ",".join(s.strip() for s in raw_scopes.split(",") if s.strip() in VALID_SCOPES) |
|
c588255…
|
ragelink
|
352 |
if not scopes: |
|
c588255…
|
ragelink
|
353 |
scopes = "read" |
|
c588255…
|
ragelink
|
354 |
|
|
c588255…
|
ragelink
|
355 |
raw_token, token_hash, prefix = PersonalAccessToken.generate() |
|
c588255…
|
ragelink
|
356 |
PersonalAccessToken.objects.create( |
|
c588255…
|
ragelink
|
357 |
user=request.user, |
|
c588255…
|
ragelink
|
358 |
name=name, |
|
c588255…
|
ragelink
|
359 |
token_hash=token_hash, |
|
c588255…
|
ragelink
|
360 |
token_prefix=prefix, |
|
c588255…
|
ragelink
|
361 |
scopes=scopes, |
|
c588255…
|
ragelink
|
362 |
) |
|
c588255…
|
ragelink
|
363 |
|
|
c588255…
|
ragelink
|
364 |
return render( |
|
c588255…
|
ragelink
|
365 |
request, |
|
c588255…
|
ragelink
|
366 |
"accounts/profile_token_created.html", |
|
c588255…
|
ragelink
|
367 |
{"raw_token": raw_token, "token_name": name}, |
|
c588255…
|
ragelink
|
368 |
) |
|
c588255…
|
ragelink
|
369 |
|
|
c588255…
|
ragelink
|
370 |
return render(request, "accounts/profile_token_create.html", {}) |
|
c588255…
|
ragelink
|
371 |
|
|
c588255…
|
ragelink
|
372 |
|
|
c588255…
|
ragelink
|
373 |
@login_required |
|
c588255…
|
ragelink
|
374 |
@require_POST |
|
c588255…
|
ragelink
|
375 |
def profile_token_revoke(request, guid): |
|
c588255…
|
ragelink
|
376 |
"""Revoke a personal access token by GUID (token_prefix used as public id).""" |
|
c588255…
|
ragelink
|
377 |
from django.utils import timezone |
|
c588255…
|
ragelink
|
378 |
|
|
c588255…
|
ragelink
|
379 |
token = get_object_or_404(PersonalAccessToken, token_prefix=guid, user=request.user, revoked_at__isnull=True) |
|
c588255…
|
ragelink
|
380 |
token.revoked_at = timezone.now() |
|
c588255…
|
ragelink
|
381 |
token.save(update_fields=["revoked_at"]) |
|
c588255…
|
ragelink
|
382 |
|
|
c588255…
|
ragelink
|
383 |
messages.success(request, f'Token "{token.name}" revoked.') |
|
c588255…
|
ragelink
|
384 |
|
|
c588255…
|
ragelink
|
385 |
if request.headers.get("HX-Request"): |
|
c588255…
|
ragelink
|
386 |
return HttpResponse(status=200, headers={"HX-Redirect": "/auth/profile/"}) |
|
c588255…
|
ragelink
|
387 |
|
|
c588255…
|
ragelink
|
388 |
return redirect("accounts:profile") |