Files
armarium-suite/backend/finance/views.py
T
Daniel Krähenbühl 1a7ef09805 feat: Armarium v1.1.0 — dashboard, auth, 2FA, SMTP, settings, deploy
Dashboard:
- ApexCharts bar chart (income vs fixed costs vs expenses) and donut chart
- KPI cards: income, fixed costs, savings rate with configurable goal
- Greeting with time-of-day and locale-aware date/time display

Authentication & security:
- Email-based login (no username), case-insensitive lookup
- JWT access/refresh tokens with rotation and blacklist
- TOTP 2FA with QR code, backup codes (copy + PDF export)
- 2FA recovery via email code
- Cloudflare Turnstile CAPTCHA on login and register

Email flows:
- Email verification on registration (24h token)
- Password reset flow (15min token, anti-enumeration)
- Brevo SMTP integration with HTML + plaintext email templates
- Notification emails: 2FA recovery, password changed, email changed

Settings page:
- 2FA management (enable/disable, QR, backup codes)
- Active sessions list with per-device revoke
- Data export: ZIP with 6 PDFs via fpdf2
- Notification preferences (3 toggles)
- Danger zone: account deletion with mandatory export + confirmation phrase

UI & layout:
- Sidebar with collapsible/flyout mode, Angular signal-based dropdowns
- Dark mode (class-based), language switcher (DE/FR/IT/EN)
- Mobile-responsive layout with touch-friendly targets
- Roboto font via @fontsource (GDPR-compliant, no Google CDN)
- Pure Tailwind CSS v3

Infrastructure:
- Forgejo Actions CI/CD pipeline (auto-deploy on push to main)
- Gunicorn + Nginx + PostgreSQL production setup
- Rate limiting, HSTS, secure cookies, CSRF protection
2026-05-25 22:46:30 +02:00

1000 lines
40 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import base64
import hmac
import hashlib
import json
import logging
import secrets
import time
import urllib.parse
import urllib.request
import pyotp
logger = logging.getLogger('armarium')
from django.conf import settings
from django.contrib.auth import get_user_model, authenticate
from django.http import HttpResponse
from icalendar import Calendar as iCalendar, Event as iCalEvent
from rest_framework import viewsets, views, status
from rest_framework.response import Response
from rest_framework.permissions import AllowAny, IsAuthenticated
from rest_framework.throttling import AnonRateThrottle
from rest_framework_simplejwt.tokens import RefreshToken
from rest_framework_simplejwt.exceptions import TokenError
from .models import Account, Transaction, Budget, Expense, Profile, Deadline, ReadEvent, BackupCode, UserSession
from .serializers import (
AccountSerializer, TransactionSerializer, BudgetSerializer,
ExpenseSerializer, ProfileSerializer, DeadlineSerializer, RegisterSerializer,
)
def _verify_turnstile(token: str, remote_ip: str = '') -> bool:
if settings.DEBUG:
return True
if not token or not settings.TURNSTILE_SECRET_KEY:
return False
data = urllib.parse.urlencode({
'secret': settings.TURNSTILE_SECRET_KEY,
'response': token,
'remoteip': remote_ip,
}).encode()
try:
req = urllib.request.Request(
'https://challenges.cloudflare.com/turnstile/v0/siteverify',
data=data,
method='POST',
)
with urllib.request.urlopen(req, timeout=5) as resp:
return json.loads(resp.read()).get('success', False)
except Exception:
logger.warning('Turnstile verification request failed')
return False
def generate_ical_token(user_id: int) -> str:
return hmac.new(
settings.SECRET_KEY.encode(),
str(user_id).encode(),
hashlib.sha256
).hexdigest()
MAX_AVATAR_SIZE_BYTES = 2 * 1024 * 1024 # 2 MB
class AuthThrottle(AnonRateThrottle):
rate = '5/min'
class AccountViewSet(viewsets.ModelViewSet):
serializer_class = AccountSerializer
def get_queryset(self):
return Account.objects.filter(user=self.request.user)
def perform_create(self, serializer):
serializer.save(user=self.request.user)
class TransactionViewSet(viewsets.ModelViewSet):
serializer_class = TransactionSerializer
def get_queryset(self):
return Transaction.objects.filter(source_account__user=self.request.user)
def get_serializer_context(self):
context = super().get_serializer_context()
context['request'] = self.request
return context
class BudgetViewSet(viewsets.ModelViewSet):
serializer_class = BudgetSerializer
def get_queryset(self):
return Budget.objects.filter(account__user=self.request.user)
class ExpenseViewSet(viewsets.ModelViewSet):
serializer_class = ExpenseSerializer
def get_queryset(self):
return Expense.objects.filter(account__user=self.request.user)
class DeadlineViewSet(viewsets.ModelViewSet):
serializer_class = DeadlineSerializer
def get_queryset(self):
return Deadline.objects.filter(user=self.request.user).order_by('date')
def perform_create(self, serializer):
serializer.save(user=self.request.user)
class ProfileView(views.APIView):
def get(self, request):
profile, _ = Profile.objects.get_or_create(user=request.user)
return Response(ProfileSerializer(profile).data)
def put(self, request):
from .email import send_email
avatar = request.FILES.get('avatar_image')
if avatar and avatar.size > MAX_AVATAR_SIZE_BYTES:
return Response({'detail': 'Image must be smaller than 2 MB.'}, status=400)
recovery_email = request.data.get('recovery_email', '').strip().lower()
if recovery_email and recovery_email == request.user.email.lower():
return Response(
{'recovery_email': 'Recovery email must differ from your login email.'},
status=400,
)
old_email = request.user.email
profile, _ = Profile.objects.get_or_create(user=request.user)
serializer = ProfileSerializer(profile, data=request.data, partial=True)
if serializer.is_valid():
serializer.save()
new_email = request.user.email
if new_email != old_email:
send_email(
'email_changed',
{'new_email': new_email},
'Armarium Deine E-Mail-Adresse wurde geändert',
old_email,
)
return Response(serializer.data)
return Response(serializer.errors, status=400)
def delete(self, request):
password = request.data.get('password', '')
if not password or not request.user.check_password(password):
return Response({'detail': 'Passwort ungültig.'}, status=status.HTTP_403_FORBIDDEN)
request.user.delete()
return Response(status=status.HTTP_204_NO_CONTENT)
class RegisterView(views.APIView):
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from .email import send_email
if not _verify_turnstile(
request.data.get('cf_turnstile_response', ''),
request.META.get('REMOTE_ADDR', ''),
):
return Response({'detail': 'Captcha verification failed.'}, status=status.HTTP_400_BAD_REQUEST)
serializer = RegisterSerializer(data=request.data)
if serializer.is_valid():
user = serializer.save()
from django.utils import timezone
from datetime import timedelta
token = secrets.token_urlsafe(32)
token_hash = hashlib.sha256(token.encode()).hexdigest()
profile, _ = Profile.objects.get_or_create(user=user)
profile.email_verify_token = token_hash
profile.email_verify_token_expires = timezone.now() + timedelta(hours=24)
profile.save(update_fields=['email_verify_token', 'email_verify_token_expires'])
link = f"{settings.FRONTEND_URL}/verify-email?token={token}"
send_email('registration_confirm', {'link': link}, 'Armarium E-Mail-Adresse bestätigen', user.email)
return Response({'detail': 'Account created.'}, status=status.HTTP_201_CREATED)
return Response(serializer.errors, status=status.HTTP_400_BAD_REQUEST)
class LogoutView(views.APIView):
permission_classes = [AllowAny]
def post(self, request):
refresh_token = request.data.get('refresh')
if not refresh_token:
return Response({'detail': 'Refresh token required.'}, status=400)
try:
token = RefreshToken(refresh_token)
jti = token.payload.get('jti', '')
token.blacklist()
if jti:
UserSession.objects.filter(refresh_jti=jti).delete()
except TokenError:
pass # already invalid, treat as success
session_key = request.headers.get('X-Session-Key', '')
if session_key:
UserSession.objects.filter(session_key=session_key).delete()
return Response(status=status.HTTP_204_NO_CONTENT)
class ChangePasswordView(views.APIView):
def post(self, request):
from .email import send_email
password = request.data.get('password', '')
if len(password) < 8:
return Response({'detail': 'Password must be at least 8 characters.'}, status=400)
request.user.set_password(password)
request.user.save()
current_key = request.headers.get('X-Session-Key', '')
other_sessions = UserSession.objects.filter(user=request.user).exclude(session_key=current_key)
for session in other_sessions:
_blacklist_session(session)
send_email('password_changed', {}, 'Armarium Dein Passwort wurde geändert', request.user.email)
return Response({'detail': 'Password updated.'})
class SearchView(views.APIView):
"""Global search across all user resources."""
def get(self, request):
q = request.query_params.get('q', '').strip()
if len(q) < 2:
return Response({})
user = request.user
results = {}
accounts = Account.objects.filter(user=user, name__icontains=q)[:5]
if accounts:
results['accounts'] = [
{'id': a.id, 'title': a.name, 'subtitle': a.get_account_type_display()}
for a in accounts
]
budgets = Budget.objects.filter(account__user=user, name__icontains=q)[:5]
if budgets:
results['budgets'] = [
{'id': b.id, 'title': b.name, 'subtitle': f'CHF {b.amount}'}
for b in budgets
]
expenses = Expense.objects.filter(account__user=user, name__icontains=q)[:5]
if expenses:
results['expenses'] = [
{'id': e.id, 'title': e.name, 'subtitle': f'{e.date} · CHF {e.amount}'}
for e in expenses
]
transactions = Transaction.objects.filter(
source_account__user=user, description__icontains=q
)[:5]
if transactions:
results['transactions'] = [
{'id': t.id, 'title': t.description, 'subtitle': f'{t.date} · CHF {t.amount}'}
for t in transactions
]
deadlines = Deadline.objects.filter(user=user, title__icontains=q)[:5]
if deadlines:
results['deadlines'] = [
{'id': d.id, 'title': d.title, 'subtitle': str(d.date), 'date': str(d.date)}
for d in deadlines
]
return Response(results)
class NotificationsView(views.APIView):
"""Returns all unread active events (date <= today) for the authenticated user."""
def get(self, request):
from datetime import date
today = date.today()
read_deadlines = set(
ReadEvent.objects.filter(user=request.user, event_type='deadline')
.values_list('event_id', flat=True)
)
read_expenses = set(
ReadEvent.objects.filter(user=request.user, event_type='expense')
.values_list('event_id', flat=True)
)
notifications = []
for d in Deadline.objects.filter(user=request.user, date__lte=today):
if d.id not in read_deadlines:
notifications.append({
'event_type': 'deadline',
'event_id': d.id,
'title': d.title,
'date': str(d.date),
})
for e in Expense.objects.filter(account__user=request.user, due_date__lte=today):
if e.id not in read_expenses:
notifications.append({
'event_type': 'expense',
'event_id': e.id,
'title': e.name,
'date': str(e.due_date),
})
notifications.sort(key=lambda x: x['date'])
return Response(notifications)
def post(self, request):
"""Mark a single event as read."""
event_type = request.data.get('event_type')
event_id = request.data.get('event_id')
if event_type not in ('deadline', 'expense') or not event_id:
return Response({'detail': 'Invalid payload.'}, status=400)
ReadEvent.objects.get_or_create(
user=request.user, event_type=event_type, event_id=event_id
)
return Response(status=status.HTTP_204_NO_CONTENT)
class ICalUrlView(views.APIView):
"""Returns the personal iCal feed URL for the authenticated user."""
def get(self, request):
token = generate_ical_token(request.user.id)
base_url = request.build_absolute_uri('/')
url = f"{base_url}api/calendar/ical/{request.user.id}/{token}/"
return Response({'url': url})
class ICalFeedView(views.APIView):
"""Serves the iCal feed. Token acts as authentication — no JWT required."""
permission_classes = [AllowAny]
def get(self, request, user_id, token):
expected = generate_ical_token(user_id)
if not hmac.compare_digest(expected, token):
return HttpResponse(status=404)
User = get_user_model()
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
return HttpResponse(status=404)
cal = iCalendar()
cal.add('prodid', '-//Budget App//EN')
cal.add('version', '2.0')
cal.add('x-wr-calname', 'Budget App')
cal.add('x-wr-timezone', 'Europe/Zurich')
# Deadlines
for deadline in Deadline.objects.filter(user=user):
event = iCalEvent()
event.add('summary', f'[{deadline.get_type_display()}] {deadline.title}')
event.add('dtstart', deadline.date)
event.add('dtend', deadline.date)
event.add('uid', f'deadline-{deadline.id}@budget-app')
if deadline.notes:
event.add('description', deadline.notes)
cal.add_component(event)
# Expense due dates
for expense in Expense.objects.filter(account__user=user, due_date__isnull=False):
event = iCalEvent()
event.add('summary', f'[Invoice] {expense.name} CHF {expense.amount}')
event.add('dtstart', expense.due_date)
event.add('dtend', expense.due_date)
event.add('uid', f'expense-{expense.id}@budget-app')
if expense.notes:
event.add('description', expense.notes)
cal.add_component(event)
response = HttpResponse(cal.to_ical(), content_type='text/calendar; charset=utf-8')
response['Content-Disposition'] = 'attachment; filename="budget-app.ics"'
return response
# ── 2FA helpers ──────────────────────────────────────────────────────────────
def _make_2fa_token(user_id: int) -> str:
"""Create a short-lived signed token binding step-1 to step-2 of login."""
payload = f"{user_id}:{int(time.time())}"
sig = hmac.new(settings.SECRET_KEY.encode(), payload.encode(), hashlib.sha256).hexdigest()
return base64.urlsafe_b64encode(f"{payload}:{sig}".encode()).decode()
def _verify_2fa_token(token: str, max_age: int = 300) -> int | None:
"""Return user_id if token is valid and not expired, else None."""
try:
decoded = base64.urlsafe_b64decode(token.encode()).decode()
*payload_parts, sig = decoded.split(':')
payload = ':'.join(payload_parts)
user_id_str, ts_str = payload_parts
expected = hmac.new(settings.SECRET_KEY.encode(), payload.encode(), hashlib.sha256).hexdigest()
if not hmac.compare_digest(expected, sig):
return None
if int(time.time()) - int(ts_str) > max_age:
return None
return int(user_id_str)
except Exception:
return None
def _generate_backup_codes(user, count: int = 8) -> list[str]:
"""Invalidate all old backup codes and return a fresh set of plain-text codes."""
BackupCode.objects.filter(user=user).delete()
plain = []
for _ in range(count):
code = f"{secrets.token_hex(4).upper()}-{secrets.token_hex(4).upper()}"
BackupCode.objects.create(
user=user,
code_hash=hashlib.sha256(code.encode()).hexdigest(),
)
plain.append(code)
return plain
def _verify_totp_with_replay_check(profile, code: str) -> bool:
"""Verify TOTP code and reject replay within the same 30-second window."""
if profile.totp_last_used_code == code:
return False
totp = pyotp.TOTP(profile.totp_secret)
if not totp.verify(code, valid_window=1):
return False
profile.totp_last_used_code = code
profile.save(update_fields=['totp_last_used_code'])
return True
# ── 2FA views ─────────────────────────────────────────────────────────────────
class LoginView(views.APIView):
"""Replaces TokenObtainPairView. Returns a short-lived temp_token when 2FA is required."""
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
if not _verify_turnstile(
request.data.get('cf_turnstile_response', ''),
request.META.get('REMOTE_ADDR', ''),
):
return Response({'detail': 'Captcha verification failed.'}, status=status.HTTP_400_BAD_REQUEST)
email = request.data.get('username', '')
password = request.data.get('password', '')
user = authenticate(request, username=email, password=password)
if user is None:
return Response({'detail': 'No active account found with the given credentials.'}, status=401)
profile, _ = Profile.objects.get_or_create(user=user)
if profile.totp_enabled:
return Response({'2fa_required': True, 'temp_token': _make_2fa_token(user.id)}, status=200)
refresh = RefreshToken.for_user(user)
session_key = _create_session(user, request, refresh)
return Response({'access': str(refresh.access_token), 'refresh': str(refresh), 'session_key': session_key})
class TwoFactorLoginView(views.APIView):
"""Step 2 of login — accepts TOTP code or backup code, returns JWT tokens."""
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
temp_token = request.data.get('temp_token', '')
code = str(request.data.get('code', '')).strip()
user_id = _verify_2fa_token(temp_token)
if user_id is None:
return Response({'detail': 'Session expired. Please log in again.'}, status=401)
User = get_user_model()
try:
user = User.objects.get(pk=user_id)
except User.DoesNotExist:
return Response({'detail': 'Invalid credentials.'}, status=401)
profile, _ = Profile.objects.get_or_create(user=user)
if not profile.totp_enabled or not profile.totp_secret:
return Response({'detail': 'Invalid credentials.'}, status=401)
if code.isdigit() and len(code) == 6:
if not _verify_totp_with_replay_check(profile, code):
return Response({'detail': 'Invalid or already used code.'}, status=400)
else:
code_hash = hashlib.sha256(code.encode()).hexdigest()
backup = BackupCode.objects.filter(user=user, code_hash=code_hash, used=False).first()
if backup is None:
return Response({'detail': 'Invalid backup code.'}, status=400)
backup.used = True
backup.save(update_fields=['used'])
refresh = RefreshToken.for_user(user)
session_key = _create_session(user, request, refresh)
return Response({'access': str(refresh.access_token), 'refresh': str(refresh), 'session_key': session_key})
class TwoFactorSetupView(views.APIView):
"""Generates a fresh TOTP secret and returns the otpauth:// URI for QR display."""
def get(self, request):
profile, _ = Profile.objects.get_or_create(user=request.user)
secret = pyotp.random_base32()
profile.totp_secret = secret
profile.totp_enabled = False
profile.save(update_fields=['totp_secret', 'totp_enabled'])
email = request.user.email or request.user.username
uri = pyotp.TOTP(secret).provisioning_uri(name=email, issuer_name='Armarium')
return Response({'uri': uri})
class TwoFactorEnableView(views.APIView):
"""Verifies the first TOTP code, activates 2FA and returns one-time backup codes."""
def post(self, request):
code = str(request.data.get('code', '')).strip()
profile, _ = Profile.objects.get_or_create(user=request.user)
if not profile.totp_secret:
return Response({'detail': 'Run setup first.'}, status=400)
if not _verify_totp_with_replay_check(profile, code):
return Response({'detail': 'Invalid code.'}, status=400)
profile.totp_enabled = True
profile.save(update_fields=['totp_enabled'])
backup_codes = _generate_backup_codes(request.user)
return Response({'detail': '2FA enabled.', 'backup_codes': backup_codes})
class TwoFactorDisableView(views.APIView):
"""Disables 2FA — accepts TOTP code or a backup code as proof."""
def post(self, request):
code = str(request.data.get('code', '')).strip()
profile, _ = Profile.objects.get_or_create(user=request.user)
if not profile.totp_enabled:
return Response({'detail': '2FA is not enabled.'}, status=400)
authenticated = False
if code.isdigit() and len(code) == 6:
authenticated = _verify_totp_with_replay_check(profile, code)
else:
code_hash = hashlib.sha256(code.encode()).hexdigest()
backup = BackupCode.objects.filter(user=request.user, code_hash=code_hash, used=False).first()
if backup:
backup.used = True
backup.save(update_fields=['used'])
authenticated = True
if not authenticated:
return Response({'detail': 'Invalid code.'}, status=400)
profile.totp_enabled = False
profile.totp_secret = ''
profile.totp_last_used_code = ''
profile.save(update_fields=['totp_enabled', 'totp_secret', 'totp_last_used_code'])
BackupCode.objects.filter(user=request.user).delete()
return Response({'detail': '2FA disabled.'})
# ── Recovery email helpers ────────────────────────────────────────────────────
def _mask_email(email: str) -> str:
if '@' not in email:
return '***'
local, domain = email.split('@', 1)
return f"{local[0]}{'*' * min(len(local) - 1, 18)}@{domain}"
def _generate_recovery_code() -> str:
"""Generate a human-readable 8-character code in XXXX-XXXX format."""
alphabet = 'ABCDEFGHJKLMNPQRSTUVWXYZ23456789' # no O/0, I/1 confusion
part = lambda: ''.join(secrets.choice(alphabet) for _ in range(4))
return f"{part()}-{part()}"
class TwoFactorRecoverRequestView(views.APIView):
"""Generate a recovery code, store its hash in Profile and email the plain code."""
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from django.utils import timezone
from datetime import timedelta
from .email import send_email
temp_token = request.data.get('temp_token', '')
user_id = _verify_2fa_token(temp_token)
if user_id is None:
return Response({'detail': 'ok'})
User = get_user_model()
user = User.objects.filter(pk=user_id).first()
if not user:
return Response({'detail': 'ok'})
profile = Profile.objects.filter(user=user).first()
if not profile or not profile.recovery_email:
return Response({'detail': 'ok'})
plain_code = _generate_recovery_code()
profile.recovery_code_hash = hashlib.sha256(plain_code.encode()).hexdigest()
profile.recovery_code_expires = timezone.now() + timedelta(minutes=15)
profile.save(update_fields=['recovery_code_hash', 'recovery_code_expires'])
sent = send_email(
template_name='2fa_recovery',
context={'code': plain_code},
subject='Armarium 2FA-Wiederherstellung',
to=profile.recovery_email,
)
if not sent:
return Response({'detail': 'Failed to send recovery email.'}, status=500)
return Response({'detail': 'ok', 'masked_email': _mask_email(profile.recovery_email)})
class TwoFactorRecoverConfirmView(views.APIView):
"""Verify the recovery code, disable 2FA and return JWT tokens."""
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from django.utils import timezone
temp_token = request.data.get('temp_token', '')
user_id = _verify_2fa_token(temp_token)
if user_id is None:
return Response({'detail': 'Session expired. Please log in again.'}, status=401)
recovery_code = str(request.data.get('recovery_code', '')).strip().upper()
if not recovery_code:
return Response({'detail': 'Code required.'}, status=400)
code_hash = hashlib.sha256(recovery_code.encode()).hexdigest()
profile = Profile.objects.filter(
user_id=user_id,
recovery_code_hash=code_hash,
recovery_code_expires__gt=timezone.now(),
).first()
if not profile:
return Response({'detail': 'Invalid or expired recovery code.'}, status=400)
profile.totp_enabled = False
profile.totp_secret = ''
profile.totp_last_used_code = ''
profile.recovery_code_hash = ''
profile.recovery_code_expires = None
profile.save(update_fields=[
'totp_enabled', 'totp_secret', 'totp_last_used_code',
'recovery_code_hash', 'recovery_code_expires',
])
BackupCode.objects.filter(user=profile.user).delete()
refresh = RefreshToken.for_user(profile.user)
session_key = _create_session(profile.user, request, refresh)
return Response({'access': str(refresh.access_token), 'refresh': str(refresh), 'session_key': session_key})
# ── Session helpers ───────────────────────────────────────────────────────────
def _parse_device(ua: str) -> str:
ua = ua.lower()
if 'iphone' in ua: return 'iPhone'
if 'ipad' in ua: return 'iPad'
if 'android' in ua and 'mobile' in ua: return 'Android (Phone)'
if 'android' in ua: return 'Android (Tablet)'
if 'macintosh' in ua or 'mac os x' in ua: return 'Mac'
if 'windows nt' in ua: return 'Windows'
if 'linux' in ua: return 'Linux'
return 'Unbekanntes Gerät'
def _get_client_ip(request) -> str | None:
forwarded = request.META.get('HTTP_X_FORWARDED_FOR', '')
if forwarded:
return forwarded.split(',')[0].strip()
return request.META.get('REMOTE_ADDR') or None
def _create_session(user, request, refresh_token: RefreshToken) -> str:
session_key = secrets.token_urlsafe(32)
UserSession.objects.create(
user=user,
session_key=session_key,
refresh_jti=str(refresh_token.payload.get('jti', '')),
device_name=_parse_device(request.META.get('HTTP_USER_AGENT', '')),
ip_address=_get_client_ip(request),
)
return session_key
# ── Session views ─────────────────────────────────────────────────────────────
class SessionListView(views.APIView):
def get(self, request):
current_key = request.headers.get('X-Session-Key', '')
sessions = UserSession.objects.filter(user=request.user)
data = [
{
'session_key': s.session_key,
'device_name': s.device_name,
'ip_address': s.ip_address,
'created_at': s.created_at,
'last_active_at': s.last_active_at,
'is_current': s.session_key == current_key,
}
for s in sessions
]
return Response(data)
class SessionRevokeView(views.APIView):
def delete(self, request, session_key):
session = UserSession.objects.filter(user=request.user, session_key=session_key).first()
if not session:
return Response({'detail': 'Not found.'}, status=404)
_blacklist_session(session)
return Response(status=204)
class SessionRevokeAllView(views.APIView):
def delete(self, request):
current_key = request.headers.get('X-Session-Key', '')
sessions = UserSession.objects.filter(user=request.user).exclude(session_key=current_key)
for session in sessions:
_blacklist_session(session)
return Response(status=204)
def _blacklist_session(session: UserSession) -> None:
if session.refresh_jti:
try:
from rest_framework_simplejwt.token_blacklist.models import OutstandingToken, BlacklistedToken
token = OutstandingToken.objects.get(jti=session.refresh_jti)
BlacklistedToken.objects.get_or_create(token=token)
except Exception:
pass
session.delete()
# ── Data export ───────────────────────────────────────────────────────────────
class DataExportView(views.APIView):
def get(self, request):
import io
import zipfile
from datetime import date
from fpdf import FPDF
user = request.user
profile = Profile.objects.filter(user=user).first()
today = date.today().strftime('%d.%m.%Y')
export_date = date.today().strftime('%Y-%m-%d')
VIOLET = (124, 58, 237)
HEADER_BG = (243, 240, 255)
ALT_ROW = (249, 249, 252)
TEXT_DARK = (30, 30, 40)
TEXT_GRAY = (120, 120, 135)
def safe(text: str) -> str:
return str(text).encode('latin-1', errors='replace').decode('latin-1')
class ArmPDF(FPDF):
def __init__(self, section_title):
super().__init__()
self.section_title = section_title
self.set_auto_page_break(auto=True, margin=18)
def header(self):
self.set_fill_color(*VIOLET)
self.rect(0, 0, 210, 10, 'F')
self.set_xy(14, 13)
self.set_font('Helvetica', 'B', 15)
self.set_text_color(*TEXT_DARK)
self.cell(0, 7, safe(f'Armarium - {self.section_title}'), ln=True)
self.set_font('Helvetica', '', 8)
self.set_text_color(*TEXT_GRAY)
self.set_x(14)
self.cell(0, 5, safe(f'Export vom {today}'), ln=True)
self.ln(3)
def footer(self):
self.set_y(-12)
self.set_font('Helvetica', '', 7)
self.set_text_color(*TEXT_GRAY)
self.cell(0, 5, safe(f'Armarium - {today} - Seite {self.page_no()}'), align='C')
def table_header(self, cols):
self.set_fill_color(*HEADER_BG)
self.set_font('Helvetica', 'B', 8)
self.set_text_color(*VIOLET)
for label, width in cols:
self.cell(width, 7, safe(label), border=0, fill=True, align='L')
self.ln()
self.set_draw_color(*VIOLET)
self.set_line_width(0.4)
x = self.get_x()
y = self.get_y()
self.line(14, y, 196, y)
def table_row(self, values, cols, fill=False):
if fill:
self.set_fill_color(*ALT_ROW)
self.set_font('Helvetica', '', 8)
self.set_text_color(*TEXT_DARK)
for (label, width), val in zip(cols, values):
self.cell(width, 6, safe(val), border=0, fill=fill)
self.ln()
def make_pdf(title, build_fn):
pdf = ArmPDF(title)
pdf.add_page()
build_fn(pdf)
return pdf.output()
# ── Profil ────────────────────────────────────────────────────────────
def build_profile(pdf):
name = f"{profile.first_name} {profile.last_name}".strip() if profile else ''
rows = [
('Name', name or '-'),
('E-Mail', user.email or '-'),
('Kanton', profile.canton if profile else '-'),
('Sprache', profile.language if profile else '-'),
('2FA', 'Aktiviert' if (profile and profile.totp_enabled) else 'Deaktiviert'),
]
cols = [('Feld', 60), ('Wert', 120)]
pdf.table_header(cols)
for i, (field, val) in enumerate(rows):
pdf.table_row([field, val], cols, fill=i % 2 == 1)
# ── Konten ────────────────────────────────────────────────────────────
def build_accounts(pdf):
cols = [('Name', 90), ('Typ', 60), ('Saldo (CHF)', 42)]
pdf.table_header(cols)
for i, acc in enumerate(Account.objects.filter(user=user)):
pdf.table_row([acc.name, acc.account_type, f'{acc.balance:,.2f}'], cols, fill=i % 2 == 1)
# ── Budgets ───────────────────────────────────────────────────────────
def build_budgets(pdf):
cols = [('Name', 80), ('Kategorie', 60), ('Betrag (CHF)', 42), ('Aktiv', 10)]
pdf.table_header(cols)
for i, b in enumerate(Budget.objects.filter(account__user=user).order_by('main_category', 'name')):
pdf.table_row([b.name, b.main_category, f'{b.amount:,.2f}', 'Ja' if b.active else 'Nein'], cols, fill=i % 2 == 1)
# ── Ausgaben ──────────────────────────────────────────────────────────
def build_expenses(pdf):
cols = [('Datum', 26), ('Name', 70), ('Kategorie', 46), ('Konto', 30), ('CHF', 20)]
pdf.table_header(cols)
for i, e in enumerate(Expense.objects.filter(account__user=user).order_by('-date')):
pdf.table_row([
e.date.strftime('%d.%m.%Y'), e.name, e.category,
e.account.name, f'{e.amount:,.2f}'
], cols, fill=i % 2 == 1)
# ── Transaktionen ─────────────────────────────────────────────────────
def build_transactions(pdf):
cols = [('Datum', 26), ('Beschreibung', 70), ('Von', 38), ('Nach', 38), ('CHF', 20)]
pdf.table_header(cols)
qs = Transaction.objects.filter(source_account__user=user).order_by('-date').select_related('source_account', 'destination_account')
for i, t in enumerate(qs):
pdf.table_row([
t.date.strftime('%d.%m.%Y'), t.description,
t.source_account.name, t.destination_account.name, f'{t.amount:,.2f}'
], cols, fill=i % 2 == 1)
# ── Termine ───────────────────────────────────────────────────────────
def build_deadlines(pdf):
cols = [('Datum', 30), ('Titel', 100), ('Typ', 42), ('Notizen', 20)]
pdf.table_header(cols)
for i, d in enumerate(Deadline.objects.filter(user=user).order_by('date')):
pdf.table_row([d.date.strftime('%d.%m.%Y'), d.title, d.type, d.notes[:20]], cols, fill=i % 2 == 1)
pdfs = [
('profil.pdf', 'Profil', build_profile),
('konten.pdf', 'Konten', build_accounts),
('budgets.pdf', 'Budgets', build_budgets),
('ausgaben.pdf', 'Ausgaben', build_expenses),
('transaktionen.pdf', 'Transaktionen', build_transactions),
('termine.pdf', 'Termine', build_deadlines),
]
zip_buffer = io.BytesIO()
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zf:
for filename, title, build_fn in pdfs:
zf.writestr(filename, bytes(make_pdf(title, build_fn)))
zip_buffer.seek(0)
response = HttpResponse(zip_buffer.read(), content_type='application/zip')
response['Content-Disposition'] = f'attachment; filename="armarium-export-{export_date}.zip"'
return response
# ── Notification preferences ──────────────────────────────────────────────────
class NotificationPrefsView(views.APIView):
def patch(self, request):
profile, _ = Profile.objects.get_or_create(user=request.user)
fields = ['notif_deadlines', 'notif_budget_alerts', 'notif_monthly_summary']
changed = []
for field in fields:
if field in request.data:
setattr(profile, field, bool(request.data[field]))
changed.append(field)
if changed:
profile.save(update_fields=changed)
return Response({
'notif_deadlines': profile.notif_deadlines,
'notif_budget_alerts': profile.notif_budget_alerts,
'notif_monthly_summary': profile.notif_monthly_summary,
})
class VerifyEmailView(views.APIView):
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from django.utils import timezone
token = request.data.get('token', '').strip()
if not token:
return Response({'detail': 'Token required.'}, status=400)
token_hash = hashlib.sha256(token.encode()).hexdigest()
profile = Profile.objects.filter(
email_verify_token=token_hash,
email_verify_token_expires__gt=timezone.now(),
).first()
if not profile:
return Response({'detail': 'Invalid or expired token.'}, status=400)
profile.email_verified = True
profile.email_verify_token = ''
profile.email_verify_token_expires = None
profile.save(update_fields=['email_verified', 'email_verify_token', 'email_verify_token_expires'])
return Response({'detail': 'Email verified.'})
class PasswordResetRequestView(views.APIView):
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from django.utils import timezone
from datetime import timedelta
from .email import send_email
email = request.data.get('email', '').strip().lower()
User = get_user_model()
user = User.objects.filter(email=email).first()
if user:
token = secrets.token_urlsafe(32)
token_hash = hashlib.sha256(token.encode()).hexdigest()
profile, _ = Profile.objects.get_or_create(user=user)
profile.password_reset_token_hash = token_hash
profile.password_reset_token_expires = timezone.now() + timedelta(minutes=15)
profile.save(update_fields=['password_reset_token_hash', 'password_reset_token_expires'])
link = f"{settings.FRONTEND_URL}/reset-password?token={token}"
send_email('password_reset', {'link': link}, 'Armarium Passwort zurücksetzen', user.email)
return Response({'detail': 'Wenn ein Konto mit dieser E-Mail existiert, wurde eine E-Mail gesendet.'})
class PasswordResetConfirmView(views.APIView):
permission_classes = [AllowAny]
throttle_classes = [AuthThrottle]
def post(self, request):
from django.utils import timezone
token = request.data.get('token', '').strip()
password = request.data.get('password', '')
if not token:
return Response({'detail': 'Token required.'}, status=400)
if len(password) < 8:
return Response({'detail': 'Password must be at least 8 characters.'}, status=400)
token_hash = hashlib.sha256(token.encode()).hexdigest()
profile = Profile.objects.filter(
password_reset_token_hash=token_hash,
password_reset_token_expires__gt=timezone.now(),
).first()
if not profile:
return Response({'detail': 'Invalid or expired token.'}, status=400)
user = profile.user
user.set_password(password)
user.save()
profile.password_reset_token_hash = ''
profile.password_reset_token_expires = None
profile.save(update_fields=['password_reset_token_hash', 'password_reset_token_expires'])
for session in UserSession.objects.filter(user=user):
_blacklist_session(session)
return Response({'detail': 'Password updated.'})