Files
armarium-suite/backend/finance/management/commands/import_praemien.py
T
Daniel Krähenbühl c03d2a97ab feat: insurance section — overview, documents, analysis, KVG premium comparison
- Insurance overview page (/insurance): current policies table with type,
  provider, premium, franchise, coverage, and document links
- Documents page: upload and manage insurance documents
- Analysis page: coverage gap analysis per insurance type
- Priminfo integration (/insurance/priminfo): KVG premium comparison by
  insurer, model (TAR/HMO/etc.), franchise level, and accident coverage
  via embedded Priminfo iframe (no public API available)
- Backend: Insurance, PraemienEntry, PraemienPolice models with migrations
- Sidebar: insurance nav group with flyout and dropdown
- i18n: all keys in DE/EN/FR/IT
2026-05-25 22:46:31 +02:00

310 lines
13 KiB
Python

"""
Management command: import_praemien
Imports Swiss KVG premium data from two BAG sources:
1. praemienregionen_{year}.xlsx (Priminfo)
PLZ → BFS-Nr, Gemeinde, Kanton, Prämienregion (0-3) + Ø-Monatsrämien
→ PraemienEntry model
2. Prämien_CH.csv (opendata.bagnet.ch / opendata.swiss)
Full per-insurer, per-model, per-franchise granular premiums
→ PraemienPolice model
Usage:
python manage.py import_praemien # import both, latest year
python manage.py import_praemien --year 2025
python manage.py import_praemien --skip-policen # only PLZ/region data
python manage.py import_praemien --skip-regionen # only granular policen data
"""
import csv
import io
import urllib.request
import zipfile
import xml.etree.ElementTree as ET
from decimal import Decimal, InvalidOperation
from django.core.management.base import BaseCommand, CommandError
from finance.models import PraemienEntry, PraemienPolice
NS = {'x': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'}
REGIONEN_URL = 'https://www.priminfo.admin.ch/downloads/praemienregionen_{year}.xlsx'
POLICEN_URL = 'https://opendata.bagnet.ch/?r=/download&path=L1ByYWVtaWVuL1Byw6RtaWVuX0NILmNzdg%3D%3D'
LATEST_YEAR = 2025 # praemienregionen: update each September when BAG publishes new file
POLICEN_YEAR = 2026 # Prämien_CH.csv always contains the upcoming business year
# BAG insurer ID → display name (stable, regulated by BAG)
INSURER_NAMES: dict[int, str] = {
8: 'Helsana AG',
32: 'KPT/CPT',
134: 'CSS Versicherung AG',
194: 'Concordia',
246: 'Groupe Mutuel',
290: 'Sanitas Krankenversicherung',
312: 'SWICA Krankenversicherung',
343: 'Visana AG',
360: 'Atupri Krankenkasse',
376: 'Kolping Krankenkasse',
455: 'EGK-Gesundheitskasse',
509: 'Galenos AG',
780: 'Luzerner Hinterland Krankenkasse (LHK)',
820: 'Krankenkasse Steffisburg',
881: 'sodalis gesundheitsgruppe',
923: 'Vivao Sympany AG',
941: 'Birchmeier Krankenkasse',
966: 'Krankenkasse Wädenswil',
1040: 'ÖKK',
1113: 'Agrisano Krankenkasse',
1318: 'Mutuel Assurance',
1322: 'Provita Gesundheitsversicherung AG',
1384: 'Sanagate AG',
1386: 'Aquilana Versicherungen',
1401: 'Easy Sana Assurance Maladie SA',
1479: 'Caisse-maladie Philos',
1507: 'Scheidegg Krankenkasse',
1509: 'Sana24 AG',
1535: 'rhenusana',
1542: 'Caisse-maladie de la Vallée SA',
1555: 'KLuG Krankenkasse',
1560: 'Krankenkasse Institut Ingenbohl',
1562: 'Sumiswalder Krankenkasse',
1568: 'avanto health AG',
}
# Franchise code → CHF value
FRANCHISE_CHF: dict[str, int] = {
'FRA-0': 0,
'FRA-100': 100,
'FRA-200': 200,
'FRA-300': 300,
'FRA-400': 400,
'FRA-500': 500,
'FRA-600': 600,
'FRA-1000': 1000,
'FRA-1500': 1500,
'FRA-2000': 2000,
'FRA-2500': 2500,
}
# ─────────────────────────────────────────────────────────
# XLSX helpers (for praemienregionen)
# ─────────────────────────────────────────────────────────
def _cell_value(cell):
is_el = cell.find('x:is/x:t', NS)
if is_el is not None:
return (is_el.text or '').strip().replace('\n', ' ')
v_el = cell.find('x:v', NS)
if v_el is not None and v_el.text:
return v_el.text.strip()
return None
def _parse_rows(ws):
for row in ws.findall('.//x:row', NS):
yield [_cell_value(c) for c in row.findall('x:c', NS)]
def _parse_regionen_xlsx(data: bytes, year: int) -> list[dict]:
zf = zipfile.ZipFile(io.BytesIO(data))
rels_xml = ET.fromstring(zf.read('xl/_rels/workbook.xml.rels'))
wb_xml = ET.fromstring(zf.read('xl/workbook.xml'))
wb_ns = {'x': 'http://schemas.openxmlformats.org/spreadsheetml/2006/main'}
rid_to_path = {
r.get('Id'): r.get('Target').lstrip('/')
for r in rels_xml if 'worksheet' in r.get('Type', '')
}
rid_to_name = {
s.get('{http://schemas.openxmlformats.org/officeDocument/2006/relationships}id'): s.get('name')
for s in wb_xml.findall('.//x:sheet', wb_ns)
}
name_to_path = {rid_to_name[rid]: path for rid, path in rid_to_path.items() if rid in rid_to_name}
# D_PRIM: BFS-Nr → avg premiums
ws_dprim = ET.fromstring(zf.read(name_to_path['D_PRIM']))
premiums = {}
header_found = False
for row_vals in _parse_rows(ws_dprim):
if not header_found:
flat = ' '.join(str(v) for v in row_vals if v)
if 'BFS-Nr' in flat or 'No OFS' in flat:
header_found = True
continue
if not row_vals or not row_vals[0]:
continue
try:
bfs_nr = int(row_vals[0])
region = int(row_vals[3]) if row_vals[3] is not None else 0
avg_child = Decimal(str(row_vals[4]).replace("'", '')) if row_vals[4] else Decimal('0')
avg_young = Decimal(str(row_vals[5]).replace("'", '')) if row_vals[5] else Decimal('0')
avg_adult = Decimal(str(row_vals[6]).replace("'", '')) if row_vals[6] else Decimal('0')
premiums[bfs_nr] = (region, avg_child, avg_young, avg_adult)
except (ValueError, InvalidOperation, IndexError):
continue
# B_NPA: PLZ → BFS-Nr (PLZ at index 1, flag column at index 0)
ws_bnpa = ET.fromstring(zf.read(name_to_path['B_NPA']))
entries = []
header_found = False
for row_vals in _parse_rows(ws_bnpa):
if not header_found:
flat = ' '.join(str(v) for v in row_vals if v)
if 'PLZ' in flat and 'BFS' in flat:
header_found = True
continue
if len(row_vals) < 6:
continue
plz_raw = str(row_vals[1] or '').replace("'", '').strip()
if not plz_raw.isdigit():
continue
try:
plz = plz_raw.zfill(4)
ort = str(row_vals[2] or '').replace("'", '').strip()
kanton = str(row_vals[3] or '').replace("'", '').strip()
bfs_nr_raw = row_vals[5]
if bfs_nr_raw is None:
continue
bfs_nr = int(str(bfs_nr_raw).replace("'", '').strip())
gemeinde = str(row_vals[6] or '').replace("'", '').strip() if len(row_vals) > 6 else ''
bezirk = str(row_vals[7] or '').replace("'", '').strip() if len(row_vals) > 7 else ''
if bfs_nr not in premiums:
continue
region, avg_child, avg_young, avg_adult = premiums[bfs_nr]
entries.append({
'plz': plz, 'ort': ort, 'kanton': kanton, 'region': region,
'bfs_nr': bfs_nr, 'gemeinde': gemeinde, 'bezirk': bezirk,
'avg_adult': avg_adult, 'avg_young_adult': avg_young,
'avg_child': avg_child, 'data_year': year,
})
except (ValueError, InvalidOperation, IndexError):
continue
return entries
# ─────────────────────────────────────────────────────────
# CSV helpers (for Prämien_CH.csv)
# ─────────────────────────────────────────────────────────
def _parse_policen_csv(data: bytes, data_year: int) -> list[dict]:
"""Parse Prämien_CH.csv → list of PraemienPolice dicts."""
text = data.decode('utf-8-sig')
reader = csv.DictReader(io.StringIO(text))
entries = []
for row in reader:
try:
versicherer_id = int(row['Versicherer'])
kanton = row['Kanton'].strip()
if kanton not in ('AG','AI','AR','BE','BL','BS','FR','GE','GL','GR',
'JU','LU','NE','NW','OW','SG','SH','SO','SZ','TG',
'TI','UR','VD','VS','ZG','ZH'):
continue # skip EU/EFTA rows
region_code = row['Region'].strip() # PR-REG CH0 … CH3
try:
region = int(region_code.split('CH')[1])
except (IndexError, ValueError):
continue
altersklasse = row['Altersklasse'].strip()
unfalleinschluss = row['Unfalleinschluss'].strip()
tariftyp = row['Tariftyp'].strip()
tarifbezeichnung = row['Tarifbezeichnung'].strip()
franchisestufe = row['Franchisestufe'].strip()
franchise_code = row['Franchise'].strip()
franchise_chf = FRANCHISE_CHF.get(franchise_code, 0)
praemie = Decimal(row['Prämie'].strip())
entries.append({
'versicherer_id': versicherer_id,
'kanton': kanton,
'region': region,
'altersklasse': altersklasse,
'unfalleinschluss': unfalleinschluss,
'tariftyp': tariftyp,
'tarifbezeichnung': tarifbezeichnung,
'franchisestufe': franchisestufe,
'franchise_chf': franchise_chf,
'praemie': praemie,
'data_year': data_year,
})
except (ValueError, InvalidOperation, KeyError):
continue
return entries
class Command(BaseCommand):
help = 'Import Swiss KVG premium data from BAG/Priminfo (PLZ regions + granular policen)'
def add_arguments(self, parser):
parser.add_argument('--year', type=int, default=LATEST_YEAR,
help='Year for praemienregionen XLSX (default: %(default)s)')
parser.add_argument('--policen-year', type=int, default=POLICEN_YEAR,
help='Business year in Prämien_CH.csv (default: %(default)s)')
parser.add_argument('--skip-regionen', action='store_true',
help='Skip PLZ/region import (praemienregionen XLSX)')
parser.add_argument('--skip-policen', action='store_true',
help='Skip granular policen import (Prämien_CH.csv)')
def handle(self, *args, **options):
year = options['year']
policen_year = options['policen_year']
# ── 1. PLZ / Prämienregionen ──────────────────────────────────────
if not options['skip_regionen']:
url = REGIONEN_URL.format(year=year)
self.stdout.write(f'[1/2] Downloading praemienregionen {year}: {url}')
try:
with urllib.request.urlopen(url, timeout=30) as resp:
data = resp.read()
except Exception as e:
raise CommandError(f'Download failed: {e}')
self.stdout.write(f' Parsing XLSX ({len(data):,} bytes)…')
entries = _parse_regionen_xlsx(data, year)
self.stdout.write(f' Parsed {len(entries):,} PLZ entries.')
if not entries:
raise CommandError('No PLZ data parsed — check XLSX structure.')
deleted, _ = PraemienEntry.objects.filter(data_year=year).delete()
self.stdout.write(f' Cleared {deleted} old entries.')
objs = [PraemienEntry(**e) for e in entries]
for i in range(0, len(objs), 1000):
PraemienEntry.objects.bulk_create(objs[i:i+1000], ignore_conflicts=True)
self.stdout.write(self.style.SUCCESS(f'{len(objs):,} PLZ entries imported.'))
# ── 2. Granular Prämien_CH.csv ────────────────────────────────────
if not options['skip_policen']:
self.stdout.write(f'[2/2] Downloading Prämien_CH.csv (business year {policen_year})…')
try:
with urllib.request.urlopen(POLICEN_URL, timeout=120) as resp:
data = resp.read()
except Exception as e:
raise CommandError(f'Download failed: {e}')
self.stdout.write(f' Parsing CSV ({len(data):,} bytes)…')
entries = _parse_policen_csv(data, policen_year)
self.stdout.write(f' Parsed {len(entries):,} policen rows.')
if not entries:
raise CommandError('No policen data parsed — check CSV structure.')
deleted, _ = PraemienPolice.objects.filter(data_year=policen_year).delete()
self.stdout.write(f' Cleared {deleted} old entries.')
objs = [PraemienPolice(**e) for e in entries]
created = 0
for i in range(0, len(objs), 2000):
PraemienPolice.objects.bulk_create(objs[i:i+2000], ignore_conflicts=True)
created += min(2000, len(objs) - i)
self.stdout.write(f' {created:,} / {len(objs):,}', ending='\r')
self.stdout.write('')
self.stdout.write(self.style.SUCCESS(f'{len(objs):,} policen rows imported.'))
self.stdout.write(self.style.SUCCESS('Done.'))