#u!/usr/bin/env python3 from flask import Flask, render_template, request, redirect, url_for, flash from flask_login import LoginManager, login_user, login_required, logout_user, UserMixin, current_user from flask_bcrypt import Bcrypt from cryptography.fernet import Fernet import sqlite3 import glob, os, re app = Flask(__name__) app.secret_key = "309cc4d5ce1fe7486ae25cbd232bbdfe6a72539c03f0127d372186dbdc0fc928" bcrypt = Bcrypt(app) login_manager = LoginManager() login_manager.login_view = "login" login_manager.init_app(app) DB_PATH = "/srv/poe_manager/sqlite.db" class User(UserMixin): def __init__(self, id_, username, is_admin): self.id = id_ self.username = username self.is_admin = is_admin def get_interval_seconds(): conn = get_db_connection() row = conn.execute("SELECT value FROM settings WHERE key='check_interval'").fetchone() conn.close() return int(row['value']) if row else 300 with open("/srv/poe_manager/fernet.key", "rb") as f: fernet = Fernet(f.read()) def encrypt_password(password: str) -> str: return fernet.encrypt(password.encode()).decode() def decrypt_password(token: str) -> str: return fernet.decrypt(token.encode()).decode() def get_db_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def get_devices(): """ Liefert eine Liste aller Devices aus der Datenbank als Dictionaries. """ conn = get_db_connection() devices = conn.execute("SELECT mac, rpi_ip, port, name, switch_hostname, is_active FROM devices ORDER BY name ASC").fetchall() conn.close() return devices @login_manager.user_loader def load_user(user_id): conn = get_db_connection() user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() conn.close() if user: return User(user['id'], user['username'], user['is_admin']) return None @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] conn = get_db_connection() user = conn.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchone() conn.close() if user and bcrypt.check_password_hash(user['password'], password): login_user(User(user['id'], user['username'], user['is_admin'])) return redirect(url_for('index')) else: flash("Ungültiger Benutzername oder Passwort") return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) @app.route("/") @login_required def index(): # Geräte aus DB laden conn = sqlite3.connect("sqlite.db") c = conn.cursor() c.execute("SELECT mac, name, is_active FROM devices ORDER BY name ASC") devices = c.fetchall() # Intervall aus DB (Minuten) laden c.execute("SELECT value FROM settings WHERE key='interval'") row = c.fetchone() interval = int(row[0]) if row else 5 # Default 5 Minuten conn.close() # Status aus letztem Log ermitteln import glob, os log_files = glob.glob("/var/log/rpi-*.log") status_dict = {} if log_files: latest_log = max(log_files, key=os.path.getctime) with open(latest_log, "r") as f: for line in f: for dev in devices: if dev[1] in line: status_dict[dev[0]] = "online" if "erreichbar" in line else "offline" # Template rendern mit Devices, Status und Intervall return render_template("index.html", devices=devices, status=status_dict, interval=interval) @app.route("/settings", methods=["GET", "POST"]) @login_required def settings(): if not current_user.is_admin: flash("Nur Admins dürfen die Einstellungen ändern!") return redirect(url_for("index")) # Aktuellen Prüfintervall laden conn = sqlite3.connect("sqlite.db") c = conn.cursor() c.execute("SELECT value FROM settings WHERE key='interval'") row = c.fetchone() interval = int(row[0]) if row else 5 # Default 5 Minuten conn.close() if request.method == "POST": new_interval = int(request.form["interval"]) # Minuten und Sekunden berechnen interval_minutes = new_interval check_interval_seconds = new_interval * 60 conn = sqlite3.connect("sqlite.db") c = conn.cursor() # interval (Minuten) c.execute(""" INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value """, ("interval", interval_minutes)) # check_interval (Sekunden) c.execute(""" INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value """, ("check_interval", check_interval_seconds)) conn.commit() conn.close() # rpi-check.service neu starten import subprocess subprocess.run(["systemctl", "restart", "rpi-check.service"]) flash(f"Intervall auf {interval_minutes} Minuten gesetzt und Service neu gestartet!") return redirect(url_for("settings")) return render_template("settings.html", interval=interval) @app.route('/devices', methods=['GET', 'POST']) @login_required def devices(): conn = get_db_connection() switches = conn.execute("SELECT hostname FROM switches").fetchall() if request.method == 'POST': # ----------------------- # Gerät hinzufügen # ----------------------- if 'add_device' in request.form: if not current_user.is_admin: flash("Zugriff verweigert!") return redirect(url_for('devices')) mac = request.form.get('mac') rpi_ip = request.form.get('rpi_ip') port = request.form.get('port') name = request.form.get('name') switch_hostname = request.form.get('switch_hostname') is_active = 1 if 'is_active' in request.form else 0 if not all([mac, rpi_ip, name]): flash("Alle Felder müssen ausgefüllt sein!") return redirect(url_for('devices')) # Prüfen auf doppelte IP ip_device = conn.execute("SELECT name FROM devices WHERE rpi_ip=?", (rpi_ip,)).fetchone() if ip_device: flash(f"IP-Adresse existiert bereits für Gerät '{ip_device['name']}'!") return redirect(url_for('devices')) # Prüfen auf doppelte MAC mac_device = conn.execute("SELECT name FROM devices WHERE mac=?", (mac,)).fetchone() if mac_device: flash(f"MAC-Adresse existiert bereits für Gerät '{mac_device['name']}'!") return redirect(url_for('devices')) try: conn.execute(""" INSERT INTO devices (mac, rpi_ip, port, name, switch_hostname, is_active) VALUES (?, ?, ?, ?, ?, ?) """, (mac, rpi_ip, port, name, switch_hostname, is_active)) conn.commit() flash(f"Gerät {name} hinzugefügt.") except sqlite3.IntegrityError: flash("Fehler beim Hinzufügen des Geräts!") # ----------------------- # Gerät bearbeiten # ----------------------- elif 'edit_device' in request.form: if not current_user.is_admin: flash("Zugriff verweigert!") return redirect(url_for('devices')) old_mac = request.form.get('old_mac') mac = request.form.get('mac') rpi_ip = request.form.get('rpi_ip') port = request.form.get('port') or None name = request.form.get('name') switch_hostname = request.form.get('switch_hostname') or None is_active = 1 if 'is_active' in request.form else 0 # --- Nur Switch ändern --- # Prüfen, ob nur das Switch-Feld gesendet wurde und die anderen Felder leer sind if 'switch_hostname' in request.form and all(not f for f in [mac, rpi_ip, name]): device = conn.execute("SELECT name, switch_hostname FROM devices WHERE mac=?", (old_mac,)).fetchone() if not device: flash("Gerät nicht gefunden!") return redirect(url_for('devices')) old_switch = device['switch_hostname'] or "unbekannt" device_name = device['name'] try: conn.execute(""" UPDATE devices SET switch_hostname=? WHERE mac=? """, (switch_hostname, old_mac)) conn.commit() flash(f"Switch von {device_name} geändert: {old_switch} → {switch_hostname} or 'kein Switch'") except sqlite3.IntegrityError: flash("Fehler beim Ändern des Switch!") return redirect(url_for('devices')) # --- Normales Gerät bearbeiten --- # Pflichtfelder prüfen if not all([old_mac, mac, rpi_ip, name]): flash("Felder 'MAC', 'IP' und 'Name' müssen ausgefüllt sein!") return redirect(url_for('devices')) # Prüfen auf doppelte IP außer das aktuelle Gerät ip_device = conn.execute( "SELECT name FROM devices WHERE rpi_ip=? AND mac<>?", (rpi_ip, old_mac) ).fetchone() if ip_device: flash(f"IP-Adresse existiert bereits für Gerät '{ip_device['name']}'!") return redirect(url_for('devices')) # Prüfen auf doppelte MAC außer das aktuelle Gerät mac_device = conn.execute( "SELECT name FROM devices WHERE mac=? AND mac<>?", (mac, old_mac) ).fetchone() if mac_device: flash(f"MAC-Adresse existiert bereits für Gerät '{mac_device['name']}'!") return redirect(url_for('devices')) # Update durchführen try: conn.execute(""" UPDATE devices SET mac=?, rpi_ip=?, port=?, name=?, is_active=? WHERE mac=? """, (mac, rpi_ip, port, name, is_active, old_mac)) conn.commit() flash(f"Gerät {name} aktualisiert.") except sqlite3.IntegrityError: flash("Fehler beim Aktualisieren des Geräts!") # ----------------------- # Gerät löschen # ----------------------- elif 'delete_device' in request.form: if not current_user.is_admin: flash("Zugriff verweigert!") return redirect(url_for('devices')) del_mac = request.form.get('delete_device') if del_mac: device = conn.execute("SELECT name FROM devices WHERE mac=?", (del_mac,)).fetchone() hostname = device['name'] if device else del_mac conn.execute("DELETE FROM devices WHERE mac=?", (del_mac,)) conn.commit() flash(f"Gerät {hostname} gelöscht.") else: flash("Keine MAC-Adresse übermittelt!") return redirect(url_for('devices')) # ----------------------- # Devices für Anzeige # ----------------------- devices = conn.execute(""" SELECT devices.mac, devices.rpi_ip, devices.port, devices.name, devices.is_active, switches.hostname AS switch_hostname FROM devices LEFT JOIN switches ON devices.switch_hostname = switches.hostname ORDER BY switches.hostname ASC """).fetchall() conn.close() return render_template('devices.html', devices=devices, switches=switches) @app.route('/devices/toggle/', methods=['POST']) @login_required def toggle_device(mac): if not current_user.is_admin: return {"success": False, "msg": "Zugriff verweigert!"}, 403 conn = get_db_connection() device = conn.execute("SELECT is_active, name FROM devices WHERE mac=?", (mac,)).fetchone() if not device: conn.close() return {"success": False, "msg": "Gerät nicht gefunden!"}, 404 new_status = 0 if device['is_active'] else 1 conn.execute("UPDATE devices SET is_active=? WHERE mac=?", (new_status, mac)) conn.commit() conn.close() status_text = "deaktiviert" if new_status == 0 else "aktiviert" return {"success": True, "msg": f"Gerät {device['name']} wurde {status_text}.", "new_status": new_status} @app.route('/switches', methods=['GET', 'POST']) @login_required def switches(): conn = get_db_connection() # Inline-Add if request.method == 'POST' and 'add_switch' in request.form: if not current_user.is_admin: flash("Zugriff verweigert!") return redirect(url_for('switches')) hostname = request.form['hostname'] ip = request.form['ip'] username = request.form['username'] password = encrypt_password(request.form['password']) try: conn.execute("INSERT INTO switches (hostname, ip, username, password) VALUES (?, ?, ?, ?)", (hostname, ip, username, password)) conn.commit() flash(f"Switch {hostname} hinzugefügt.") except sqlite3.IntegrityError: flash("Hostname existiert bereits oder Eingabefehler!") # Inline-Edit if request.method == 'POST' and 'edit_switch' in request.form: if not current_user.is_admin: flash("Zugriff verweigert!") return redirect(url_for('switches')) old_hostname = request.form['old_hostname'] hostname = request.form['hostname'] ip = request.form['ip'] username = request.form['username'] password = encrypt_password(request.form['password']) try: conn.execute(""" UPDATE switches SET hostname=?, ip=?, username=?, password=? WHERE hostname=? """, (hostname, ip, username, password, old_hostname)) conn.commit() flash(f"Switch {hostname} aktualisiert.") except sqlite3.IntegrityError: flash("Hostname existiert bereits oder Eingabefehler!") switches = conn.execute("SELECT hostname, ip, username FROM switches").fetchall() conn.close() return render_template('switche.html', switches=switches) @app.route('/switches/delete/', methods=['POST']) @login_required def delete_switch(hostname): conn = get_db_connection() devices = conn.execute("SELECT name FROM devices WHERE switch_hostname=?", (hostname,)).fetchall() if devices: device_names = [d['name'] for d in devices] flash(f"Folgende Geräte sind noch dem Switch {hostname} zugewiesen: {', '.join(device_names)}", "danger") conn.close() return redirect(url_for('switches')) conn.execute("DELETE FROM switches WHERE hostname=?", (hostname,)) conn.commit() conn.close() flash(f"Switch '{hostname}' gelöscht.", "success") return redirect(url_for('switches')) @app.route("/get_log") @login_required def get_log(): log_files = glob.glob("/var/log/rpi-*.log") if not log_files: return "Keine Logfiles gefunden." latest_log = max(log_files, key=os.path.getctime) try: with open(latest_log, "r") as f: return f.read() except Exception as e: return f"Fehler beim Lesen des Logs: {e}" @app.route('/logs') @login_required def logs(): # Intervall aus DB laden conn = sqlite3.connect("sqlite.db") c = conn.cursor() c.execute("SELECT value FROM settings WHERE key='interval'") row = c.fetchone() interval = int(row[0]) if row else 5 # Default 5 Minuten conn.close() # alle Logfiles mit Muster rpi-YYYYMMDDHHMMSS.log log_files = glob.glob("/var/log/rpi-*.log") if not log_files: return render_template('logs.html', log_content="Keine Logfiles gefunden.") # das neuste Logfile auswählen latest_log = max(log_files, key=os.path.getctime) try: with open(latest_log, "r") as f: log_content = f.read() except Exception as e: log_content = f"Fehler beim Lesen des Logs: {e}" return render_template('logs.html', log_content=log_content, log_name=os.path.basename(latest_log), interval=interval) def load_device_status(): """ Liest das aktuellste rpi-Logfile und extrahiert den letzten Status jedes Devices. Gibt ein Dictionary zurück: {Device-Name: 'online'/'offline'} """ status = {} log_files = glob.glob("/var/log/rpi-*.log") if not log_files: return status latest_log = max(log_files, key=os.path.getctime) # Jede Zeile des Logs lesen with open(latest_log, "r") as f: lines = f.readlines() # Regex für Ping-Ergebnisse online_re = re.compile(r"(\S+) ist erreichbar!") offline_re = re.compile(r"(\S+) ist nicht erreichbar!") for line in lines: line = line.strip() m_online = online_re.search(line) m_offline = offline_re.search(line) if m_online: status[m_online.group(1)] = 'online' elif m_offline: status[m_offline.group(1)] = 'offline' return status @app.route("/users", methods=["GET", "POST"]) @login_required def users(): if not current_user.is_admin: flash("Nur Admins dürfen Benutzer verwalten!") return redirect(url_for("index")) conn = sqlite3.connect("sqlite.db") conn.row_factory = sqlite3.Row c = conn.cursor() if request.method == "POST": # Neuen Benutzer hinzufügen if "add_user" in request.form: username = request.form["username"].strip() password = request.form["password"].strip() is_admin = int(request.form.get("is_admin", 0)) if username and password: pw_hash = bcrypt.generate_password_hash(password).decode("utf-8") try: c.execute( "INSERT INTO users (username, password, is_admin) VALUES (?, ?, ?)", (username, pw_hash, is_admin) ) conn.commit() flash(f"Benutzer '{username}' erfolgreich angelegt!") except sqlite3.IntegrityError: flash("Benutzername existiert bereits!") else: flash("Username und Passwort dürfen nicht leer sein!") # Rolle ändern elif "change_role" in request.form: user_id = request.form["user_id"] username = request.form.get("username", "").strip() is_admin = int(request.form.get("is_admin", 0)) if username: c.execute( "UPDATE users SET username=?, is_admin=? WHERE id=?", (username, is_admin, user_id) ) conn.commit() flash("Rolle und Username geändert!") else: flash("Username darf nicht leer sein!") # Passwort ändern elif "change_password" in request.form: user_id = request.form["user_id"] new_password = request.form.get("new_password", "").strip() if new_password: pw_hash = bcrypt.generate_password_hash(new_password).decode("utf-8") c.execute( "UPDATE users SET password=? WHERE id=?", (pw_hash, user_id) ) conn.commit() flash("Passwort erfolgreich geändert!") else: flash("Passwort darf nicht leer sein!") # Benutzer löschen elif "delete_user" in request.form: user_id = request.form["delete_user"] c.execute("DELETE FROM users WHERE id=?", (user_id,)) conn.commit() flash("Benutzer gelöscht!") # Alle Benutzer laden (GET oder nach POST) c.execute("SELECT id, username, is_admin FROM users") users_list = c.fetchall() conn.close() # Direkt rendern, Flash-Messages werden angezeigt return render_template("users.html", users=users_list) if __name__ == "__main__": app.run(host='0.0.0.0', port=5000, debug=True)