feat: Armarium v1.1.0 — dashboard, auth, 2FA, SMTP, settings, deploy

Dashboard:
- ApexCharts bar chart (income vs fixed costs vs expenses) and donut chart
- KPI cards: income, fixed costs, savings rate with configurable goal
- Greeting with time-of-day and locale-aware date/time display

Authentication & security:
- Email-based login (no username), case-insensitive lookup
- JWT access/refresh tokens with rotation and blacklist
- TOTP 2FA with QR code, backup codes (copy + PDF export)
- 2FA recovery via email code
- Cloudflare Turnstile CAPTCHA on login and register

Email flows:
- Email verification on registration (24h token)
- Password reset flow (15min token, anti-enumeration)
- Brevo SMTP integration with HTML + plaintext email templates
- Notification emails: 2FA recovery, password changed, email changed

Settings page:
- 2FA management (enable/disable, QR, backup codes)
- Active sessions list with per-device revoke
- Data export: ZIP with 6 PDFs via fpdf2
- Notification preferences (3 toggles)
- Danger zone: account deletion with mandatory export + confirmation phrase

UI & layout:
- Sidebar with collapsible/flyout mode, Angular signal-based dropdowns
- Dark mode (class-based), language switcher (DE/FR/IT/EN)
- Mobile-responsive layout with touch-friendly targets
- Roboto font via @fontsource (GDPR-compliant, no Google CDN)
- Pure Tailwind CSS v3

Infrastructure:
- Forgejo Actions CI/CD pipeline (auto-deploy on push to main)
- Gunicorn + Nginx + PostgreSQL production setup
- Rate limiting, HSTS, secure cookies, CSRF protection
This commit is contained in:
Daniel Krähenbühl
2026-05-25 22:45:18 +02:00
parent 807ebc41a5
commit 1a7ef09805
150 changed files with 22862 additions and 3 deletions
+999
View File
@@ -0,0 +1,999 @@
import base64
import hmac
import hashlib
import json
import logging
import secrets
import time
import urllib.parse
import urllib.request
import pyotp
logger = logging.getLogger('armarium')
from django.conf import settings
from django.contrib.auth import get_user_model, authenticate
from django.http import HttpResponse
from icalendar import Calendar as iCalendar, Event as iCalEvent
from rest_framework import viewsets, views, status
from rest_framework.response import Response
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.throttling import AnonRateThrottle
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenError
from .models import Account, Transaction, Budget, Expense, Profile, Deadline, ReadEvent, BackupCode, UserSession
from .serializers import (
AccountSerializer, TransactionSerializer, BudgetSerializer,
ExpenseSerializer, ProfileSerializer, DeadlineSerializer, RegisterSerializer,
)
def _verify_turnstile(token: str, remote_ip: str = '') -> bool:
if settings.DEBUG:
return True
if not token or not settings.TURNSTILE_SECRET_KEY:
return False
data = urllib.parse.urlencode({
'secret': settings.TURNSTILE_SECRET_KEY,
'response': token,
'remoteip': remote_ip,
}).encode()
try:
req = urllib.request.Request(
'https://challenges.cloudflare.com/turnstile/v0/siteverify',
data=data,
method='POST',
)
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read()).get('success', False)
except Exception:
logger.warning('Turnstile verification request failed')
return False
def generate_ical_token(user_id: int) -> str:
return hmac.new(
settings.SECRET_KEY.encode(),
str(user_id).encode(),
hashlib.sha256
).hexdigest()
MAX_AVATAR_SIZE_BYTES = 2 * 1024 * 1024 # 2 MB
class AuthThrottle(AnonRateThrottle):
rate = '5/min'
class AccountViewSet(viewsets.ModelViewSet):
serializer_class = AccountSerializer
def get_queryset(self):
return Account.objects.filter(user=self.request.user)
def perform_create(self, serializer):
serializer.save(user=self.request.user)
class TransactionViewSet(viewsets.ModelViewSet):
serializer_class = TransactionSerializer
def get_queryset(self):
return Transaction.objects.filter(source_account__user=self.request.user)
def get_serializer_context(self):
context = super().get_serializer_context()
context['request'] = self.request
return context
class BudgetViewSet(viewsets.ModelViewSet):
serializer_class = BudgetSerializer
def get_queryset(self):
return Budget.objects.filter(account__user=self.request.user)
class ExpenseViewSet(viewsets.ModelViewSet):
serializer_class = ExpenseSerializer
def get_queryset(self):
return Expense.objects.filter(account__user=self.request.user)
class DeadlineViewSet(viewsets.ModelViewSet):
serializer_class = DeadlineSerializer
def get_queryset(self):
return Deadline.objects.filter(user=self.request.user).order_by('date')
def perform_create(self, serializer):
serializer.save(user=self.request.user)
class ProfileView(views.APIView):
def get(self, request):
profile, _ = Profile.objects.get_or_create(user=request.user)
return Response(ProfileSerializer(profile).data)
def put(self, request):
from .email import send_email
avatar = request.FILES.get('avatar_image')
if avatar and avatar.size > MAX_AVATAR_SIZE_BYTES:
return Response({'detail': 'Image must be smaller than 2 MB.'}, status=400)
recovery_email = request.data.get('recovery_email', '').strip().lower()
if recovery_email and recovery_email == request.user.email.lower():
return Response(
{'recovery_email': 'Recovery email must differ from your login email.'},
status=400,
)
old_email = request.user.email
profile, _ = Profile.objects.get_or_create(user=request.user)
serializer = ProfileSerializer(profile, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
new_email = request.user.email
if new_email != old_email:
send_email(
'email_changed',
{'new_email': new_email},
'Armarium Deine E-Mail-Adresse wurde geändert',
old_email,
)
return Response(serializer.data)
return Response(serializer.errors, status=400)
def delete(self, request):
password = request.data.get('password', '')
if not password or not request.user.check_password(password):
return Response({'detail': 'Passwort ungültig.'}, status=status.HTTP_403_FORBIDDEN)
request.user.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
class RegisterView(views.APIView):
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from .email import send_email
if not _verify_turnstile(
request.data.get('cf_turnstile_response', ''),
request.META.get('REMOTE_ADDR', ''),
):
return Response({'detail': 'Captcha verification failed.'}, status=status.HTTP_400_BAD_REQUEST)
serializer = RegisterSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
from django.utils import timezone
from datetime import timedelta
token = secrets.token_urlsafe(32)
token_hash = hashlib.sha256(token.encode()).hexdigest()
profile, _ = Profile.objects.get_or_create(user=user)
profile.email_verify_token = token_hash
profile.email_verify_token_expires = timezone.now() + timedelta(hours=24)
profile.save(update_fields=['email_verify_token', 'email_verify_token_expires'])
link = f"{settings.FRONTEND_URL}/verify-email?token={token}"
send_email('registration_confirm', {'link': link}, 'Armarium E-Mail-Adresse bestätigen', user.email)
return Response({'detail': 'Account created.'}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class LogoutView(views.APIView):
permission_classes = [AllowAny]
def post(self, request):
refresh_token = request.data.get('refresh')
if not refresh_token:
return Response({'detail': 'Refresh token required.'}, status=400)
try:
token = RefreshToken(refresh_token)
jti = token.payload.get('jti', '')
token.blacklist()
if jti:
UserSession.objects.filter(refresh_jti=jti).delete()
except TokenError:
pass # already invalid, treat as success
session_key = request.headers.get('X-Session-Key', '')
if session_key:
UserSession.objects.filter(session_key=session_key).delete()
return Response(status=status.HTTP_204_NO_CONTENT)
class ChangePasswordView(views.APIView):
def post(self, request):
from .email import send_email
password = request.data.get('password', '')
if len(password) < 8:
return Response({'detail': 'Password must be at least 8 characters.'}, status=400)
request.user.set_password(password)
request.user.save()
current_key = request.headers.get('X-Session-Key', '')
other_sessions = UserSession.objects.filter(user=request.user).exclude(session_key=current_key)
for session in other_sessions:
_blacklist_session(session)
send_email('password_changed', {}, 'Armarium Dein Passwort wurde geändert', request.user.email)
return Response({'detail': 'Password updated.'})
class SearchView(views.APIView):
"""Global search across all user resources."""
def get(self, request):
q = request.query_params.get('q', '').strip()
if len(q) < 2:
return Response({})
user = request.user
results = {}
accounts = Account.objects.filter(user=user, name__icontains=q)[:5]
if accounts:
results['accounts'] = [
{'id': a.id, 'title': a.name, 'subtitle': a.get_account_type_display()}
for a in accounts
]
budgets = Budget.objects.filter(account__user=user, name__icontains=q)[:5]
if budgets:
results['budgets'] = [
{'id': b.id, 'title': b.name, 'subtitle': f'CHF {b.amount}'}
for b in budgets
]
expenses = Expense.objects.filter(account__user=user, name__icontains=q)[:5]
if expenses:
results['expenses'] = [
{'id': e.id, 'title': e.name, 'subtitle': f'{e.date} · CHF {e.amount}'}
for e in expenses
]
transactions = Transaction.objects.filter(
source_account__user=user, description__icontains=q
)[:5]
if transactions:
results['transactions'] = [
{'id': t.id, 'title': t.description, 'subtitle': f'{t.date} · CHF {t.amount}'}
for t in transactions
]
deadlines = Deadline.objects.filter(user=user, title__icontains=q)[:5]
if deadlines:
results['deadlines'] = [
{'id': d.id, 'title': d.title, 'subtitle': str(d.date), 'date': str(d.date)}
for d in deadlines
]
return Response(results)
class NotificationsView(views.APIView):
"""Returns all unread active events (date <= today) for the authenticated user."""
def get(self, request):
from datetime import date
today = date.today()
read_deadlines = set(
ReadEvent.objects.filter(user=request.user, event_type='deadline')
.values_list('event_id', flat=True)
)
read_expenses = set(
ReadEvent.objects.filter(user=request.user, event_type='expense')
.values_list('event_id', flat=True)
)
notifications = []
for d in Deadline.objects.filter(user=request.user, date__lte=today):
if d.id not in read_deadlines:
notifications.append({
'event_type': 'deadline',
'event_id': d.id,
'title': d.title,
'date': str(d.date),
})
for e in Expense.objects.filter(account__user=request.user, due_date__lte=today):
if e.id not in read_expenses:
notifications.append({
'event_type': 'expense',
'event_id': e.id,
'title': e.name,
'date': str(e.due_date),
})
notifications.sort(key=lambda x: x['date'])
return Response(notifications)
def post(self, request):
"""Mark a single event as read."""
event_type = request.data.get('event_type')
event_id = request.data.get('event_id')
if event_type not in ('deadline', 'expense') or not event_id:
return Response({'detail': 'Invalid payload.'}, status=400)
ReadEvent.objects.get_or_create(
user=request.user, event_type=event_type, event_id=event_id
)
return Response(status=status.HTTP_204_NO_CONTENT)
class ICalUrlView(views.APIView):
"""Returns the personal iCal feed URL for the authenticated user."""
def get(self, request):
token = generate_ical_token(request.user.id)
base_url = request.build_absolute_uri('/')
url = f"{base_url}api/calendar/ical/{request.user.id}/{token}/"
return Response({'url': url})
class ICalFeedView(views.APIView):
"""Serves the iCal feed. Token acts as authentication — no JWT required."""
permission_classes = [AllowAny]
def get(self, request, user_id, token):
expected = generate_ical_token(user_id)
if not hmac.compare_digest(expected, token):
return HttpResponse(status=404)
User = get_user_model()
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
return HttpResponse(status=404)
cal = iCalendar()
cal.add('prodid', '-//Budget App//EN')
cal.add('version', '2.0')
cal.add('x-wr-calname', 'Budget App')
cal.add('x-wr-timezone', 'Europe/Zurich')
# Deadlines
for deadline in Deadline.objects.filter(user=user):
event = iCalEvent()
event.add('summary', f'[{deadline.get_type_display()}] {deadline.title}')
event.add('dtstart', deadline.date)
event.add('dtend', deadline.date)
event.add('uid', f'deadline-{deadline.id}@budget-app')
if deadline.notes:
event.add('description', deadline.notes)
cal.add_component(event)
# Expense due dates
for expense in Expense.objects.filter(account__user=user, due_date__isnull=False):
event = iCalEvent()
event.add('summary', f'[Invoice] {expense.name} CHF {expense.amount}')
event.add('dtstart', expense.due_date)
event.add('dtend', expense.due_date)
event.add('uid', f'expense-{expense.id}@budget-app')
if expense.notes:
event.add('description', expense.notes)
cal.add_component(event)
response = HttpResponse(cal.to_ical(), content_type='text/calendar; charset=utf-8')
response['Content-Disposition'] = 'attachment; filename="budget-app.ics"'
return response
# ── 2FA helpers ──────────────────────────────────────────────────────────────
def _make_2fa_token(user_id: int) -> str:
"""Create a short-lived signed token binding step-1 to step-2 of login."""
payload = f"{user_id}:{int(time.time())}"
sig = hmac.new(settings.SECRET_KEY.encode(), payload.encode(), hashlib.sha256).hexdigest()
return base64.urlsafe_b64encode(f"{payload}:{sig}".encode()).decode()
def _verify_2fa_token(token: str, max_age: int = 300) -> int | None:
"""Return user_id if token is valid and not expired, else None."""
try:
decoded = base64.urlsafe_b64decode(token.encode()).decode()
*payload_parts, sig = decoded.split(':')
payload = ':'.join(payload_parts)
user_id_str, ts_str = payload_parts
expected = hmac.new(settings.SECRET_KEY.encode(), payload.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, sig):
return None
if int(time.time()) - int(ts_str) > max_age:
return None
return int(user_id_str)
except Exception:
return None
def _generate_backup_codes(user, count: int = 8) -> list[str]:
"""Invalidate all old backup codes and return a fresh set of plain-text codes."""
BackupCode.objects.filter(user=user).delete()
plain = []
for _ in range(count):
code = f"{secrets.token_hex(4).upper()}-{secrets.token_hex(4).upper()}"
BackupCode.objects.create(
user=user,
code_hash=hashlib.sha256(code.encode()).hexdigest(),
)
plain.append(code)
return plain
def _verify_totp_with_replay_check(profile, code: str) -> bool:
"""Verify TOTP code and reject replay within the same 30-second window."""
if profile.totp_last_used_code == code:
return False
totp = pyotp.TOTP(profile.totp_secret)
if not totp.verify(code, valid_window=1):
return False
profile.totp_last_used_code = code
profile.save(update_fields=['totp_last_used_code'])
return True
# ── 2FA views ─────────────────────────────────────────────────────────────────
class LoginView(views.APIView):
"""Replaces TokenObtainPairView. Returns a short-lived temp_token when 2FA is required."""
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
if not _verify_turnstile(
request.data.get('cf_turnstile_response', ''),
request.META.get('REMOTE_ADDR', ''),
):
return Response({'detail': 'Captcha verification failed.'}, status=status.HTTP_400_BAD_REQUEST)
email = request.data.get('username', '')
password = request.data.get('password', '')
user = authenticate(request, username=email, password=password)
if user is None:
return Response({'detail': 'No active account found with the given credentials.'}, status=401)
profile, _ = Profile.objects.get_or_create(user=user)
if profile.totp_enabled:
return Response({'2fa_required': True, 'temp_token': _make_2fa_token(user.id)}, status=200)
refresh = RefreshToken.for_user(user)
session_key = _create_session(user, request, refresh)
return Response({'access': str(refresh.access_token), 'refresh': str(refresh), 'session_key': session_key})
class TwoFactorLoginView(views.APIView):
"""Step 2 of login — accepts TOTP code or backup code, returns JWT tokens."""
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
temp_token = request.data.get('temp_token', '')
code = str(request.data.get('code', '')).strip()
user_id = _verify_2fa_token(temp_token)
if user_id is None:
return Response({'detail': 'Session expired. Please log in again.'}, status=401)
User = get_user_model()
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
return Response({'detail': 'Invalid credentials.'}, status=401)
profile, _ = Profile.objects.get_or_create(user=user)
if not profile.totp_enabled or not profile.totp_secret:
return Response({'detail': 'Invalid credentials.'}, status=401)
if code.isdigit() and len(code) == 6:
if not _verify_totp_with_replay_check(profile, code):
return Response({'detail': 'Invalid or already used code.'}, status=400)
else:
code_hash = hashlib.sha256(code.encode()).hexdigest()
backup = BackupCode.objects.filter(user=user, code_hash=code_hash, used=False).first()
if backup is None:
return Response({'detail': 'Invalid backup code.'}, status=400)
backup.used = True
backup.save(update_fields=['used'])
refresh = RefreshToken.for_user(user)
session_key = _create_session(user, request, refresh)
return Response({'access': str(refresh.access_token), 'refresh': str(refresh), 'session_key': session_key})
class TwoFactorSetupView(views.APIView):
"""Generates a fresh TOTP secret and returns the otpauth:// URI for QR display."""
def get(self, request):
profile, _ = Profile.objects.get_or_create(user=request.user)
secret = pyotp.random_base32()
profile.totp_secret = secret
profile.totp_enabled = False
profile.save(update_fields=['totp_secret', 'totp_enabled'])
email = request.user.email or request.user.username
uri = pyotp.TOTP(secret).provisioning_uri(name=email, issuer_name='Armarium')
return Response({'uri': uri})
class TwoFactorEnableView(views.APIView):
"""Verifies the first TOTP code, activates 2FA and returns one-time backup codes."""
def post(self, request):
code = str(request.data.get('code', '')).strip()
profile, _ = Profile.objects.get_or_create(user=request.user)
if not profile.totp_secret:
return Response({'detail': 'Run setup first.'}, status=400)
if not _verify_totp_with_replay_check(profile, code):
return Response({'detail': 'Invalid code.'}, status=400)
profile.totp_enabled = True
profile.save(update_fields=['totp_enabled'])
backup_codes = _generate_backup_codes(request.user)
return Response({'detail': '2FA enabled.', 'backup_codes': backup_codes})
class TwoFactorDisableView(views.APIView):
"""Disables 2FA — accepts TOTP code or a backup code as proof."""
def post(self, request):
code = str(request.data.get('code', '')).strip()
profile, _ = Profile.objects.get_or_create(user=request.user)
if not profile.totp_enabled:
return Response({'detail': '2FA is not enabled.'}, status=400)
authenticated = False
if code.isdigit() and len(code) == 6:
authenticated = _verify_totp_with_replay_check(profile, code)
else:
code_hash = hashlib.sha256(code.encode()).hexdigest()
backup = BackupCode.objects.filter(user=request.user, code_hash=code_hash, used=False).first()
if backup:
backup.used = True
backup.save(update_fields=['used'])
authenticated = True
if not authenticated:
return Response({'detail': 'Invalid code.'}, status=400)
profile.totp_enabled = False
profile.totp_secret = ''
profile.totp_last_used_code = ''
profile.save(update_fields=['totp_enabled', 'totp_secret', 'totp_last_used_code'])
BackupCode.objects.filter(user=request.user).delete()
return Response({'detail': '2FA disabled.'})
# ── Recovery email helpers ────────────────────────────────────────────────────
def _mask_email(email: str) -> str:
if '@' not in email:
return '***'
local, domain = email.split('@', 1)
return f"{local[0]}{'*' * min(len(local) - 1, 18)}@{domain}"
def _generate_recovery_code() -> str:
"""Generate a human-readable 8-character code in XXXX-XXXX format."""
alphabet = 'ABCDEFGHJKLMNPQRSTUVWXYZ23456789' # no O/0, I/1 confusion
part = lambda: ''.join(secrets.choice(alphabet) for _ in range(4))
return f"{part()}-{part()}"
class TwoFactorRecoverRequestView(views.APIView):
"""Generate a recovery code, store its hash in Profile and email the plain code."""
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from django.utils import timezone
from datetime import timedelta
from .email import send_email
temp_token = request.data.get('temp_token', '')
user_id = _verify_2fa_token(temp_token)
if user_id is None:
return Response({'detail': 'ok'})
User = get_user_model()
user = User.objects.filter(pk=user_id).first()
if not user:
return Response({'detail': 'ok'})
profile = Profile.objects.filter(user=user).first()
if not profile or not profile.recovery_email:
return Response({'detail': 'ok'})
plain_code = _generate_recovery_code()
profile.recovery_code_hash = hashlib.sha256(plain_code.encode()).hexdigest()
profile.recovery_code_expires = timezone.now() + timedelta(minutes=15)
profile.save(update_fields=['recovery_code_hash', 'recovery_code_expires'])
sent = send_email(
template_name='2fa_recovery',
context={'code': plain_code},
subject='Armarium 2FA-Wiederherstellung',
to=profile.recovery_email,
)
if not sent:
return Response({'detail': 'Failed to send recovery email.'}, status=500)
return Response({'detail': 'ok', 'masked_email': _mask_email(profile.recovery_email)})
class TwoFactorRecoverConfirmView(views.APIView):
"""Verify the recovery code, disable 2FA and return JWT tokens."""
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from django.utils import timezone
temp_token = request.data.get('temp_token', '')
user_id = _verify_2fa_token(temp_token)
if user_id is None:
return Response({'detail': 'Session expired. Please log in again.'}, status=401)
recovery_code = str(request.data.get('recovery_code', '')).strip().upper()
if not recovery_code:
return Response({'detail': 'Code required.'}, status=400)
code_hash = hashlib.sha256(recovery_code.encode()).hexdigest()
profile = Profile.objects.filter(
user_id=user_id,
recovery_code_hash=code_hash,
recovery_code_expires__gt=timezone.now(),
).first()
if not profile:
return Response({'detail': 'Invalid or expired recovery code.'}, status=400)
profile.totp_enabled = False
profile.totp_secret = ''
profile.totp_last_used_code = ''
profile.recovery_code_hash = ''
profile.recovery_code_expires = None
profile.save(update_fields=[
'totp_enabled', 'totp_secret', 'totp_last_used_code',
'recovery_code_hash', 'recovery_code_expires',
])
BackupCode.objects.filter(user=profile.user).delete()
refresh = RefreshToken.for_user(profile.user)
session_key = _create_session(profile.user, request, refresh)
return Response({'access': str(refresh.access_token), 'refresh': str(refresh), 'session_key': session_key})
# ── Session helpers ───────────────────────────────────────────────────────────
def _parse_device(ua: str) -> str:
ua = ua.lower()
if 'iphone' in ua: return 'iPhone'
if 'ipad' in ua: return 'iPad'
if 'android' in ua and 'mobile' in ua: return 'Android (Phone)'
if 'android' in ua: return 'Android (Tablet)'
if 'macintosh' in ua or 'mac os x' in ua: return 'Mac'
if 'windows nt' in ua: return 'Windows'
if 'linux' in ua: return 'Linux'
return 'Unbekanntes Gerät'
def _get_client_ip(request) -> str | None:
forwarded = request.META.get('HTTP_X_FORWARDED_FOR', '')
if forwarded:
return forwarded.split(',')[0].strip()
return request.META.get('REMOTE_ADDR') or None
def _create_session(user, request, refresh_token: RefreshToken) -> str:
session_key = secrets.token_urlsafe(32)
UserSession.objects.create(
user=user,
session_key=session_key,
refresh_jti=str(refresh_token.payload.get('jti', '')),
device_name=_parse_device(request.META.get('HTTP_USER_AGENT', '')),
ip_address=_get_client_ip(request),
)
return session_key
# ── Session views ─────────────────────────────────────────────────────────────
class SessionListView(views.APIView):
def get(self, request):
current_key = request.headers.get('X-Session-Key', '')
sessions = UserSession.objects.filter(user=request.user)
data = [
{
'session_key': s.session_key,
'device_name': s.device_name,
'ip_address': s.ip_address,
'created_at': s.created_at,
'last_active_at': s.last_active_at,
'is_current': s.session_key == current_key,
}
for s in sessions
]
return Response(data)
class SessionRevokeView(views.APIView):
def delete(self, request, session_key):
session = UserSession.objects.filter(user=request.user, session_key=session_key).first()
if not session:
return Response({'detail': 'Not found.'}, status=404)
_blacklist_session(session)
return Response(status=204)
class SessionRevokeAllView(views.APIView):
def delete(self, request):
current_key = request.headers.get('X-Session-Key', '')
sessions = UserSession.objects.filter(user=request.user).exclude(session_key=current_key)
for session in sessions:
_blacklist_session(session)
return Response(status=204)
def _blacklist_session(session: UserSession) -> None:
if session.refresh_jti:
try:
from rest_framework_simplejwt.token_blacklist.models import OutstandingToken, BlacklistedToken
token = OutstandingToken.objects.get(jti=session.refresh_jti)
BlacklistedToken.objects.get_or_create(token=token)
except Exception:
pass
session.delete()
# ── Data export ───────────────────────────────────────────────────────────────
class DataExportView(views.APIView):
def get(self, request):
import io
import zipfile
from datetime import date
from fpdf import FPDF
user = request.user
profile = Profile.objects.filter(user=user).first()
today = date.today().strftime('%d.%m.%Y')
export_date = date.today().strftime('%Y-%m-%d')
VIOLET = (124, 58, 237)
HEADER_BG = (243, 240, 255)
ALT_ROW = (249, 249, 252)
TEXT_DARK = (30, 30, 40)
TEXT_GRAY = (120, 120, 135)
def safe(text: str) -> str:
return str(text).encode('latin-1', errors='replace').decode('latin-1')
class ArmPDF(FPDF):
def __init__(self, section_title):
super().__init__()
self.section_title = section_title
self.set_auto_page_break(auto=True, margin=18)
def header(self):
self.set_fill_color(*VIOLET)
self.rect(0, 0, 210, 10, 'F')
self.set_xy(14, 13)
self.set_font('Helvetica', 'B', 15)
self.set_text_color(*TEXT_DARK)
self.cell(0, 7, safe(f'Armarium - {self.section_title}'), ln=True)
self.set_font('Helvetica', '', 8)
self.set_text_color(*TEXT_GRAY)
self.set_x(14)
self.cell(0, 5, safe(f'Export vom {today}'), ln=True)
self.ln(3)
def footer(self):
self.set_y(-12)
self.set_font('Helvetica', '', 7)
self.set_text_color(*TEXT_GRAY)
self.cell(0, 5, safe(f'Armarium - {today} - Seite {self.page_no()}'), align='C')
def table_header(self, cols):
self.set_fill_color(*HEADER_BG)
self.set_font('Helvetica', 'B', 8)
self.set_text_color(*VIOLET)
for label, width in cols:
self.cell(width, 7, safe(label), border=0, fill=True, align='L')
self.ln()
self.set_draw_color(*VIOLET)
self.set_line_width(0.4)
x = self.get_x()
y = self.get_y()
self.line(14, y, 196, y)
def table_row(self, values, cols, fill=False):
if fill:
self.set_fill_color(*ALT_ROW)
self.set_font('Helvetica', '', 8)
self.set_text_color(*TEXT_DARK)
for (label, width), val in zip(cols, values):
self.cell(width, 6, safe(val), border=0, fill=fill)
self.ln()
def make_pdf(title, build_fn):
pdf = ArmPDF(title)
pdf.add_page()
build_fn(pdf)
return pdf.output()
# ── Profil ────────────────────────────────────────────────────────────
def build_profile(pdf):
name = f"{profile.first_name} {profile.last_name}".strip() if profile else ''
rows = [
('Name', name or '-'),
('E-Mail', user.email or '-'),
('Kanton', profile.canton if profile else '-'),
('Sprache', profile.language if profile else '-'),
('2FA', 'Aktiviert' if (profile and profile.totp_enabled) else 'Deaktiviert'),
]
cols = [('Feld', 60), ('Wert', 120)]
pdf.table_header(cols)
for i, (field, val) in enumerate(rows):
pdf.table_row([field, val], cols, fill=i % 2 == 1)
# ── Konten ────────────────────────────────────────────────────────────
def build_accounts(pdf):
cols = [('Name', 90), ('Typ', 60), ('Saldo (CHF)', 42)]
pdf.table_header(cols)
for i, acc in enumerate(Account.objects.filter(user=user)):
pdf.table_row([acc.name, acc.account_type, f'{acc.balance:,.2f}'], cols, fill=i % 2 == 1)
# ── Budgets ───────────────────────────────────────────────────────────
def build_budgets(pdf):
cols = [('Name', 80), ('Kategorie', 60), ('Betrag (CHF)', 42), ('Aktiv', 10)]
pdf.table_header(cols)
for i, b in enumerate(Budget.objects.filter(account__user=user).order_by('main_category', 'name')):
pdf.table_row([b.name, b.main_category, f'{b.amount:,.2f}', 'Ja' if b.active else 'Nein'], cols, fill=i % 2 == 1)
# ── Ausgaben ──────────────────────────────────────────────────────────
def build_expenses(pdf):
cols = [('Datum', 26), ('Name', 70), ('Kategorie', 46), ('Konto', 30), ('CHF', 20)]
pdf.table_header(cols)
for i, e in enumerate(Expense.objects.filter(account__user=user).order_by('-date')):
pdf.table_row([
e.date.strftime('%d.%m.%Y'), e.name, e.category,
e.account.name, f'{e.amount:,.2f}'
], cols, fill=i % 2 == 1)
# ── Transaktionen ─────────────────────────────────────────────────────
def build_transactions(pdf):
cols = [('Datum', 26), ('Beschreibung', 70), ('Von', 38), ('Nach', 38), ('CHF', 20)]
pdf.table_header(cols)
qs = Transaction.objects.filter(source_account__user=user).order_by('-date').select_related('source_account', 'destination_account')
for i, t in enumerate(qs):
pdf.table_row([
t.date.strftime('%d.%m.%Y'), t.description,
t.source_account.name, t.destination_account.name, f'{t.amount:,.2f}'
], cols, fill=i % 2 == 1)
# ── Termine ───────────────────────────────────────────────────────────
def build_deadlines(pdf):
cols = [('Datum', 30), ('Titel', 100), ('Typ', 42), ('Notizen', 20)]
pdf.table_header(cols)
for i, d in enumerate(Deadline.objects.filter(user=user).order_by('date')):
pdf.table_row([d.date.strftime('%d.%m.%Y'), d.title, d.type, d.notes[:20]], cols, fill=i % 2 == 1)
pdfs = [
('profil.pdf', 'Profil', build_profile),
('konten.pdf', 'Konten', build_accounts),
('budgets.pdf', 'Budgets', build_budgets),
('ausgaben.pdf', 'Ausgaben', build_expenses),
('transaktionen.pdf', 'Transaktionen', build_transactions),
('termine.pdf', 'Termine', build_deadlines),
]
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for filename, title, build_fn in pdfs:
zf.writestr(filename, bytes(make_pdf(title, build_fn)))
zip_buffer.seek(0)
response = HttpResponse(zip_buffer.read(), content_type='application/zip')
response['Content-Disposition'] = f'attachment; filename="armarium-export-{export_date}.zip"'
return response
# ── Notification preferences ──────────────────────────────────────────────────
class NotificationPrefsView(views.APIView):
def patch(self, request):
profile, _ = Profile.objects.get_or_create(user=request.user)
fields = ['notif_deadlines', 'notif_budget_alerts', 'notif_monthly_summary']
changed = []
for field in fields:
if field in request.data:
setattr(profile, field, bool(request.data[field]))
changed.append(field)
if changed:
profile.save(update_fields=changed)
return Response({
'notif_deadlines': profile.notif_deadlines,
'notif_budget_alerts': profile.notif_budget_alerts,
'notif_monthly_summary': profile.notif_monthly_summary,
})
class VerifyEmailView(views.APIView):
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from django.utils import timezone
token = request.data.get('token', '').strip()
if not token:
return Response({'detail': 'Token required.'}, status=400)
token_hash = hashlib.sha256(token.encode()).hexdigest()
profile = Profile.objects.filter(
email_verify_token=token_hash,
email_verify_token_expires__gt=timezone.now(),
).first()
if not profile:
return Response({'detail': 'Invalid or expired token.'}, status=400)
profile.email_verified = True
profile.email_verify_token = ''
profile.email_verify_token_expires = None
profile.save(update_fields=['email_verified', 'email_verify_token', 'email_verify_token_expires'])
return Response({'detail': 'Email verified.'})
class PasswordResetRequestView(views.APIView):
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from django.utils import timezone
from datetime import timedelta
from .email import send_email
email = request.data.get('email', '').strip().lower()
User = get_user_model()
user = User.objects.filter(email=email).first()
if user:
token = secrets.token_urlsafe(32)
token_hash = hashlib.sha256(token.encode()).hexdigest()
profile, _ = Profile.objects.get_or_create(user=user)
profile.password_reset_token_hash = token_hash
profile.password_reset_token_expires = timezone.now() + timedelta(minutes=15)
profile.save(update_fields=['password_reset_token_hash', 'password_reset_token_expires'])
link = f"{settings.FRONTEND_URL}/reset-password?token={token}"
send_email('password_reset', {'link': link}, 'Armarium Passwort zurücksetzen', user.email)
return Response({'detail': 'Wenn ein Konto mit dieser E-Mail existiert, wurde eine E-Mail gesendet.'})
class PasswordResetConfirmView(views.APIView):
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from django.utils import timezone
token = request.data.get('token', '').strip()
password = request.data.get('password', '')
if not token:
return Response({'detail': 'Token required.'}, status=400)
if len(password) < 8:
return Response({'detail': 'Password must be at least 8 characters.'}, status=400)
token_hash = hashlib.sha256(token.encode()).hexdigest()
profile = Profile.objects.filter(
password_reset_token_hash=token_hash,
password_reset_token_expires__gt=timezone.now(),
).first()
if not profile:
return Response({'detail': 'Invalid or expired token.'}, status=400)
user = profile.user
user.set_password(password)
user.save()
profile.password_reset_token_hash = ''
profile.password_reset_token_expires = None
profile.save(update_fields=['password_reset_token_hash', 'password_reset_token_expires'])
for session in UserSession.objects.filter(user=user):
_blacklist_session(session)
return Response({'detail': 'Password updated.'})