FossilRepo

fossilrepo / accounts / views.py
Blame History Raw 389 lines
1
import logging
2
import re
3
4
import requests
5
from django.contrib import messages
6
from django.contrib.auth import login, logout
7
from django.contrib.auth.decorators import login_required
8
from django.http import HttpResponse
9
from django.shortcuts import get_object_or_404, redirect, render
10
from django.utils.http import url_has_allowed_host_and_scheme
11
from django.views.decorators.http import require_POST
12
from django_ratelimit.decorators import ratelimit
13
14
from .forms import LoginForm
15
from .models import PersonalAccessToken, UserProfile
16
17
logger = logging.getLogger(__name__)
18
19
# Allowed SSH key type prefixes
20
_SSH_KEY_PREFIXES = ("ssh-ed25519", "ssh-rsa", "ecdsa-sha2-", "ssh-dss")
21
22
23
def _sanitize_ssh_key(public_key: str) -> tuple[str | None, str]:
24
"""Validate and sanitize an SSH public key.
25
26
Returns (sanitized_key, error_message). On success error_message is "".
27
Rejects keys containing newlines, carriage returns, or null bytes (which
28
would allow injection of extra authorized_keys entries). Validates format:
29
known type prefix, 2-3 space-separated parts.
30
"""
31
# Strip dangerous injection characters -- newlines let an attacker add
32
# a second authorized_keys line outside the forced-command wrapper
33
if "\n" in public_key or "\r" in public_key or "\x00" in public_key:
34
return None, "SSH key must be a single line. Newlines, carriage returns, and null bytes are not allowed."
35
36
key = public_key.strip()
37
if not key:
38
return None, "SSH key cannot be empty."
39
40
# SSH keys are: <type> <base64-data> [optional comment]
41
parts = key.split()
42
if len(parts) < 2 or len(parts) > 3:
43
return None, "Invalid SSH key format. Expected: <key-type> <key-data> [comment]"
44
45
key_type = parts[0]
46
if not any(key_type.startswith(prefix) for prefix in _SSH_KEY_PREFIXES):
47
return None, f"Unsupported key type '{key_type}'. Allowed: ssh-ed25519, ssh-rsa, ecdsa-sha2-*, ssh-dss."
48
49
# Validate base64 data is plausible (only base64 chars + padding)
50
if not re.match(r"^[A-Za-z0-9+/=]+$", parts[1]):
51
return None, "Invalid SSH key data encoding."
52
53
return key, ""
54
55
56
def _verify_turnstile(token: str, remote_ip: str) -> bool:
57
"""Verify a Cloudflare Turnstile response token. Returns True if valid."""
58
from constance import config
59
60
if not config.TURNSTILE_SECRET_KEY:
61
return False
62
try:
63
resp = requests.post(
64
"https://challenges.cloudflare.com/turnstile/v0/siteverify",
65
data={"secret": config.TURNSTILE_SECRET_KEY, "response": token, "remoteip": remote_ip},
66
timeout=5,
67
)
68
return resp.status_code == 200 and resp.json().get("success", False)
69
except Exception:
70
logger.exception("Turnstile verification failed")
71
return False
72
73
74
@ratelimit(key="ip", rate="10/m", block=True)
75
def login_view(request):
76
if request.user.is_authenticated:
77
return redirect("dashboard")
78
79
from constance import config
80
81
turnstile_enabled = config.TURNSTILE_ENABLED and config.TURNSTILE_SITE_KEY
82
turnstile_error = False
83
84
if request.method == "POST":
85
# Verify Turnstile before processing login
86
if turnstile_enabled:
87
turnstile_token = request.POST.get("cf-turnstile-response", "")
88
if not turnstile_token or not _verify_turnstile(turnstile_token, request.META.get("REMOTE_ADDR", "")):
89
turnstile_error = True
90
form = LoginForm()
91
return render(
92
request,
93
"accounts/login.html",
94
{
95
"form": form,
96
"turnstile_enabled": True,
97
"turnstile_site_key": config.TURNSTILE_SITE_KEY,
98
"turnstile_error": True,
99
},
100
)
101
102
form = LoginForm(request, data=request.POST)
103
if form.is_valid():
104
login(request, form.get_user())
105
next_url = request.GET.get("next", "")
106
if next_url and url_has_allowed_host_and_scheme(next_url, allowed_hosts={request.get_host()}):
107
return redirect(next_url)
108
return redirect("dashboard")
109
else:
110
form = LoginForm()
111
112
ctx = {"form": form}
113
if turnstile_enabled:
114
ctx["turnstile_enabled"] = True
115
ctx["turnstile_site_key"] = config.TURNSTILE_SITE_KEY
116
ctx["turnstile_error"] = turnstile_error
117
return render(request, "accounts/login.html", ctx)
118
119
120
@require_POST
121
def logout_view(request):
122
logout(request)
123
return redirect("accounts:login")
124
125
126
# ---------------------------------------------------------------------------
127
# SSH key management
128
# ---------------------------------------------------------------------------
129
130
131
def _parse_key_type(public_key):
132
"""Extract key type from public key string."""
133
parts = public_key.strip().split()
134
if parts:
135
key_prefix = parts[0]
136
type_map = {
137
"ssh-ed25519": "ed25519",
138
"ssh-rsa": "rsa",
139
"ecdsa-sha2-nistp256": "ecdsa",
140
"ecdsa-sha2-nistp384": "ecdsa",
141
"ecdsa-sha2-nistp521": "ecdsa",
142
"ssh-dss": "dsa",
143
}
144
return type_map.get(key_prefix, key_prefix)
145
return ""
146
147
148
def _compute_fingerprint(public_key):
149
"""Compute SSH key fingerprint (SHA256)."""
150
import base64
151
import hashlib
152
153
parts = public_key.strip().split()
154
if len(parts) >= 2:
155
try:
156
key_data = base64.b64decode(parts[1])
157
digest = hashlib.sha256(key_data).digest()
158
return "SHA256:" + base64.b64encode(digest).rstrip(b"=").decode()
159
except Exception:
160
pass
161
return ""
162
163
164
def _regenerate_authorized_keys():
165
"""Regenerate the authorized_keys file from all active user SSH keys."""
166
from pathlib import Path
167
168
from constance import config
169
170
from fossil.user_keys import UserSSHKey
171
172
ssh_dir = Path(config.FOSSIL_DATA_DIR).parent / "ssh"
173
ssh_dir.mkdir(parents=True, exist_ok=True)
174
authorized_keys_path = ssh_dir / "authorized_keys"
175
176
keys = UserSSHKey.objects.filter(deleted_at__isnull=True).select_related("user")
177
178
lines = []
179
for key in keys:
180
# Defense in depth: strip newlines/CR/null from stored keys so a
181
# compromised DB value cannot inject extra authorized_keys entries.
182
clean_key = key.public_key.strip().replace("\n", "").replace("\r", "").replace("\x00", "")
183
if not clean_key:
184
continue
185
# Each key gets a forced command that identifies the user
186
forced_cmd = (
187
f'command="/usr/local/bin/fossil-shell {key.user.username}",no-port-forwarding,no-X11-forwarding,no-agent-forwarding,no-pty'
188
)
189
lines.append(f"{forced_cmd} {clean_key}")
190
191
authorized_keys_path.write_text("\n".join(lines) + "\n" if lines else "")
192
authorized_keys_path.chmod(0o600)
193
194
195
@login_required
196
def ssh_keys(request):
197
"""List and add SSH keys."""
198
from fossil.user_keys import UserSSHKey
199
200
keys = UserSSHKey.objects.filter(user=request.user)
201
202
if request.method == "POST":
203
title = request.POST.get("title", "").strip()
204
public_key = request.POST.get("public_key", "").strip()
205
206
if title and public_key:
207
sanitized_key, error = _sanitize_ssh_key(public_key)
208
if error:
209
messages.error(request, error)
210
return render(request, "accounts/ssh_keys.html", {"keys": keys})
211
212
key_type = _parse_key_type(sanitized_key)
213
fingerprint = _compute_fingerprint(sanitized_key)
214
215
UserSSHKey.objects.create(
216
user=request.user,
217
title=title,
218
public_key=sanitized_key,
219
key_type=key_type,
220
fingerprint=fingerprint,
221
created_by=request.user,
222
)
223
224
_regenerate_authorized_keys()
225
226
messages.success(request, f'SSH key "{title}" added.')
227
return redirect("accounts:ssh_keys")
228
229
return render(request, "accounts/ssh_keys.html", {"keys": keys})
230
231
232
@login_required
233
@require_POST
234
def ssh_key_delete(request, pk):
235
"""Delete an SSH key."""
236
from fossil.user_keys import UserSSHKey
237
238
key = get_object_or_404(UserSSHKey, pk=pk, user=request.user)
239
key.soft_delete(user=request.user)
240
_regenerate_authorized_keys()
241
242
messages.success(request, f'SSH key "{key.title}" removed.')
243
244
if request.headers.get("HX-Request"):
245
return HttpResponse(status=200, headers={"HX-Redirect": "/auth/ssh-keys/"})
246
247
return redirect("accounts:ssh_keys")
248
249
250
@login_required
251
def notification_preferences(request):
252
"""User notification preferences page."""
253
from fossil.notifications import NotificationPreference
254
255
prefs, _ = NotificationPreference.objects.get_or_create(user=request.user)
256
257
if request.method == "POST":
258
prefs.delivery_mode = request.POST.get("delivery_mode", "immediate")
259
prefs.notify_checkins = "notify_checkins" in request.POST
260
prefs.notify_tickets = "notify_tickets" in request.POST
261
prefs.notify_wiki = "notify_wiki" in request.POST
262
prefs.notify_releases = "notify_releases" in request.POST
263
prefs.notify_forum = "notify_forum" in request.POST
264
prefs.save()
265
266
messages.success(request, "Notification preferences updated.")
267
268
if request.headers.get("HX-Request"):
269
return HttpResponse(status=200, headers={"HX-Redirect": "/auth/notifications/"})
270
271
return redirect("accounts:notification_prefs")
272
273
return render(request, "accounts/notification_prefs.html", {"prefs": prefs})
274
275
276
# ---------------------------------------------------------------------------
277
# Unified profile
278
# ---------------------------------------------------------------------------
279
280
VALID_SCOPES = {"read", "write", "admin"}
281
282
283
@login_required
284
def profile(request):
285
"""Unified user profile page consolidating all personal settings."""
286
from fossil.notifications import NotificationPreference
287
from fossil.user_keys import UserSSHKey
288
289
user_profile, _ = UserProfile.objects.get_or_create(user=request.user)
290
notif_prefs, _ = NotificationPreference.objects.get_or_create(user=request.user)
291
ssh_keys = UserSSHKey.objects.filter(user=request.user)
292
tokens = PersonalAccessToken.objects.filter(user=request.user, revoked_at__isnull=True)
293
294
return render(
295
request,
296
"accounts/profile.html",
297
{
298
"user_profile": user_profile,
299
"notif_prefs": notif_prefs,
300
"ssh_keys": ssh_keys,
301
"tokens": tokens,
302
},
303
)
304
305
306
@login_required
307
def profile_edit(request):
308
"""Edit profile info: name, email, handle, bio, location, website."""
309
user_profile, _ = UserProfile.objects.get_or_create(user=request.user)
310
311
if request.method == "POST":
312
user = request.user
313
user.first_name = request.POST.get("first_name", "").strip()[:30]
314
user.last_name = request.POST.get("last_name", "").strip()[:150]
315
user.email = request.POST.get("email", "").strip()[:254]
316
user.save(update_fields=["first_name", "last_name", "email"])
317
318
raw_handle = request.POST.get("handle", "").strip()
319
handle = UserProfile.sanitize_handle(raw_handle)
320
if handle:
321
# Check uniqueness (excluding self)
322
conflict = UserProfile.objects.filter(handle=handle).exclude(pk=user_profile.pk).exists()
323
if conflict:
324
messages.error(request, f"Handle @{handle} is already taken.")
325
return render(request, "accounts/profile_edit.html", {"user_profile": user_profile})
326
user_profile.handle = handle
327
else:
328
user_profile.handle = None
329
330
user_profile.bio = request.POST.get("bio", "").strip()[:500]
331
user_profile.location = request.POST.get("location", "").strip()[:100]
332
user_profile.website = request.POST.get("website", "").strip()[:200]
333
user_profile.save()
334
335
messages.success(request, "Profile updated.")
336
return redirect("accounts:profile")
337
338
return render(request, "accounts/profile_edit.html", {"user_profile": user_profile})
339
340
341
@login_required
342
def profile_token_create(request):
343
"""Generate a personal access token. Shows the raw token once."""
344
if request.method == "POST":
345
name = request.POST.get("name", "").strip()
346
if not name:
347
messages.error(request, "Token name is required.")
348
return render(request, "accounts/profile_token_create.html", {})
349
350
raw_scopes = request.POST.get("scopes", "read").strip()
351
scopes = ",".join(s.strip() for s in raw_scopes.split(",") if s.strip() in VALID_SCOPES)
352
if not scopes:
353
scopes = "read"
354
355
raw_token, token_hash, prefix = PersonalAccessToken.generate()
356
PersonalAccessToken.objects.create(
357
user=request.user,
358
name=name,
359
token_hash=token_hash,
360
token_prefix=prefix,
361
scopes=scopes,
362
)
363
364
return render(
365
request,
366
"accounts/profile_token_created.html",
367
{"raw_token": raw_token, "token_name": name},
368
)
369
370
return render(request, "accounts/profile_token_create.html", {})
371
372
373
@login_required
374
@require_POST
375
def profile_token_revoke(request, guid):
376
"""Revoke a personal access token by GUID (token_prefix used as public id)."""
377
from django.utils import timezone
378
379
token = get_object_or_404(PersonalAccessToken, token_prefix=guid, user=request.user, revoked_at__isnull=True)
380
token.revoked_at = timezone.now()
381
token.save(update_fields=["revoked_at"])
382
383
messages.success(request, f'Token "{token.name}" revoked.')
384
385
if request.headers.get("HX-Request"):
386
return HttpResponse(status=200, headers={"HX-Redirect": "/auth/profile/"})
387
388
return redirect("accounts:profile")
389

Keyboard Shortcuts

Open search /
Next entry (timeline) j
Previous entry (timeline) k
Open focused entry Enter
Show this help ?
Toggle theme Top nav button