#u!/usr/bin/env python3 from flask import Flask, render_template, request, redirect, url_for, flash from flask_login import LoginManager, login_user, login_required, logout_user, UserMixin, current_user from flask_bcrypt import Bcrypt from cryptography.fernet import Fernet from datetime import datetime import sqlite3, glob, os, re app = Flask(__name__) app.secret_key = "309cc4d5ce1fe7486ae25cbd232bbdfe6a72539c03f0127d372186dbdc0fc928" bcrypt = Bcrypt(app) login_manager = LoginManager() login_manager.login_view = "login" login_manager.init_app(app) DB_PATH = "/srv/poe_manager/sqlite.db" class User(UserMixin): def __init__(self, id_, username, is_admin): self.id = id_ self.username = username self.is_admin = is_admin def get_interval_seconds(): conn = get_db_connection() row = conn.execute("SELECT value FROM settings WHERE key='check_interval'").fetchone() conn.close() return int(row['value']) if row else 300 with open("/srv/poe_manager/fernet.key", "rb") as f: fernet = Fernet(f.read()) def encrypt_password(password: str) -> str: return fernet.encrypt(password.encode()).decode() def decrypt_password(token: str) -> str: return fernet.decrypt(token.encode()).decode() def get_db_connection(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def get_devices(): """ Liefert eine Liste aller Devices aus der Datenbank als Dictionaries. """ conn = get_db_connection() devices = conn.execute("SELECT mac, rpi_ip, port, name, switch_hostname, is_active FROM devices ORDER BY name ASC").fetchall() conn.close() return devices @login_manager.user_loader def load_user(user_id): conn = get_db_connection() user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone() conn.close() if user: return User(user['id'], user['username'], user['is_admin']) return None @app.route('/login', methods=['GET', 'POST']) def login(): if request.method == 'POST': username = request.form['username'] password = request.form['password'] conn = get_db_connection() user = conn.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchone() conn.close() if user and bcrypt.check_password_hash(user['password'], password): login_user(User(user['id'], user['username'], user['is_admin'])) return redirect(url_for('index')) else: flash("Ungültiger Benutzername oder Passwort") return render_template('login.html') @app.route('/logout') @login_required def logout(): logout_user() return redirect(url_for('login')) def get_last_seen(dev_name: str): """Letztes Mal, dass ein Gerät erreichbar war.""" log_files = glob.glob("/var/log/rpi-*.log") if not log_files: return None latest_time = None # alle Logs durchgehen for logfile in sorted(log_files): with open(logfile, "r") as f: for line in f: line = line.strip() if f"{dev_name} ist erreichbar!" in line: try: ts_str = line.split(" ")[0] + " " + line.split(" ")[1] # "YYYY-MM-DD HH:MM:SS" ts = datetime.strptime(ts_str, "%Y-%m-%d %H:%M:%S") if latest_time is None or ts > latest_time: latest_time = ts except Exception: continue if latest_time: date_str = latest_time.strftime("Zuletzt Online am %d.%m.%Y") time_str = latest_time.strftime("um %H:%M Uhr") return f"{date_str}\n{time_str}" return None @app.route("/") @login_required def index(): # Geräte aus DB laden conn = sqlite3.connect("sqlite.db") c = conn.cursor() c.execute("SELECT mac, name, is_active FROM devices ORDER BY name ASC") devices = c.fetchall() # Intervall aus DB laden c.execute("SELECT value FROM settings WHERE key='interval'") row = c.fetchone() interval = int(row[0]) if row else 5 conn.close() # Status aus Logdateien ermitteln log_files = glob.glob("/var/log/rpi-*.log") status_dict = {} last_seen_dict = {} if log_files: latest_log = max(log_files, key=os.path.getctime) with open(latest_log, "r") as f: lines = f.readlines() for dev in devices: last_status = None for line in reversed(lines): if f"{dev[1]} ist erreichbar!" in line: last_status = "online" break elif f"{dev[1]} ist nicht erreichbar!" in line: last_status = "offline" break if last_status: status_dict[dev[0]] = last_status if last_status == "offline": last_seen_dict[dev[0]] = get_last_seen(dev[1]) else: status_dict[dev[0]] = "unbekannt" return render_template("index.html", devices=devices, status=status_dict, last_seen=last_seen_dict, interval=interval) @app.route("/settings", methods=["GET", "POST"]) @login_required def settings(): if not current_user.is_admin: flash("Nur Admins dürfen die Einstellungen ändern!") return redirect(url_for("index")) # Aktuellen Prüfintervall laden conn = sqlite3.connect("sqlite.db") c = conn.cursor() c.execute("SELECT value FROM settings WHERE key='interval'") row = c.fetchone() interval = int(row[0]) if row else 5 # Default 5 Minuten conn.close() if request.method == "POST": new_interval = int(request.form["interval"]) # Minuten und Sekunden berechnen interval_minutes = new_interval check_interval_seconds = new_interval * 60 conn = sqlite3.connect("sqlite.db") c = conn.cursor() # interval (Minuten) c.execute(""" INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value """, ("interval", interval_minutes)) # check_interval (Sekunden) c.execute(""" INSERT INTO settings (key, value) VALUES (?, ?) ON CONFLICT(key) DO UPDATE SET value=excluded.value """, ("check_interval", check_interval_seconds)) conn.commit() conn.close() # rpi-check.service neu starten import subprocess subprocess.run(["systemctl", "restart", "rpi-check.service"]) flash(f"Intervall auf {interval_minutes} Minuten gesetzt und Service neu gestartet!") return redirect(url_for("settings")) return render_template("settings.html", interval=interval) @app.route('/devices', methods=['GET', 'POST']) @login_required def devices(): conn = get_db_connection() switches = conn.execute("SELECT hostname FROM switches").fetchall() if request.method == 'POST': # ----------------------- # Gerät hinzufügen # ----------------------- if 'add_device' in request.form: if not current_user.is_admin: flash("Zugriff verweigert!") return redirect(url_for('devices')) mac = request.form.get('mac') rpi_ip = request.form.get('rpi_ip') port = request.form.get('port') name = request.form.get('name') switch_hostname = request.form.get('switch_hostname') is_active = 1 if 'is_active' in request.form else 0 if not all([mac, rpi_ip, name]): flash("Alle Felder müssen ausgefüllt sein!") return redirect(url_for('devices')) # Prüfen auf doppelte IP ip_device = conn.execute("SELECT name FROM devices WHERE rpi_ip=?", (rpi_ip,)).fetchone() if ip_device: flash(f"IP-Adresse existiert bereits für Gerät '{ip_device['name']}'!") return redirect(url_for('devices')) # Prüfen auf doppelte MAC mac_device = conn.execute("SELECT name FROM devices WHERE mac=?", (mac,)).fetchone() if mac_device: flash(f"MAC-Adresse existiert bereits für Gerät '{mac_device['name']}'!") return redirect(url_for('devices')) try: conn.execute(""" INSERT INTO devices (mac, rpi_ip, port, name, switch_hostname, is_active) VALUES (?, ?, ?, ?, ?, ?) """, (mac, rpi_ip, port, name, switch_hostname, is_active)) conn.commit() flash(f"Gerät {name} hinzugefügt.") except sqlite3.IntegrityError: flash("Fehler beim Hinzufügen des Geräts!") # ----------------------- # Gerät bearbeiten # ----------------------- elif 'edit_device' in request.form: if not current_user.is_admin: flash("Zugriff verweigert!") return redirect(url_for('devices')) old_mac = request.form.get('old_mac') mac = request.form.get('mac') rpi_ip = request.form.get('rpi_ip') port = request.form.get('port') or None name = request.form.get('name') switch_hostname = request.form.get('switch_hostname') or None is_active = 1 if 'is_active' in request.form else 0 # --- Nur Switch ändern --- # Prüfen, ob nur das Switch-Feld gesendet wurde und die anderen Felder leer sind if 'switch_hostname' in request.form and all(not f for f in [mac, rpi_ip, name]): device = conn.execute("SELECT name, switch_hostname FROM devices WHERE mac=?", (old_mac,)).fetchone() if not device: flash("Gerät nicht gefunden!") return redirect(url_for('devices')) old_switch = device['switch_hostname'] or "unbekannt" device_name = device['name'] switch_hostname = request.form.get('switch_hostname') or "" try: conn.execute(""" UPDATE devices SET switch_hostname=? WHERE mac=? """, (switch_hostname, old_mac)) conn.commit() flash(f"Switch von {device_name} geändert: {old_switch} → {switch_hostname or 'Kein Switch'}") except sqlite3.IntegrityError: flash("Fehler beim Ändern des Switch!") return redirect(url_for('devices')) # --- Normales Gerät bearbeiten --- # Pflichtfelder prüfen if not all([old_mac, mac, rpi_ip, name]): flash("Felder 'MAC', 'IP' und 'Name' müssen ausgefüllt sein!") return redirect(url_for('devices')) # Prüfen auf doppelte IP außer das aktuelle Gerät ip_device = conn.execute( "SELECT name FROM devices WHERE rpi_ip=? AND mac<>?", (rpi_ip, old_mac) ).fetchone() if ip_device: flash(f"IP-Adresse existiert bereits für Gerät '{ip_device['name']}'!") return redirect(url_for('devices')) # Prüfen auf doppelte MAC außer das aktuelle Gerät mac_device = conn.execute( "SELECT name FROM devices WHERE mac=? AND mac<>?", (mac, old_mac) ).fetchone() if mac_device: flash(f"MAC-Adresse existiert bereits für Gerät '{mac_device['name']}'!") return redirect(url_for('devices')) # Update durchführen try: conn.execute(""" UPDATE devices SET mac=?, rpi_ip=?, port=?, name=?, is_active=? WHERE mac=? """, (mac, rpi_ip, port, name, is_active, old_mac)) conn.commit() flash(f"Gerät {name} aktualisiert.") except sqlite3.IntegrityError: flash("Fehler beim Aktualisieren des Geräts!") # ----------------------- # Gerät löschen # ----------------------- elif 'delete_device' in request.form: if not current_user.is_admin: flash("Zugriff verweigert!") return redirect(url_for('devices')) del_mac = request.form.get('delete_device') if del_mac: device = conn.execute("SELECT name FROM devices WHERE mac=?", (del_mac,)).fetchone() hostname = device['name'] if device else del_mac conn.execute("DELETE FROM devices WHERE mac=?", (del_mac,)) conn.commit() flash(f"Gerät {hostname} gelöscht.") else: flash("Keine MAC-Adresse übermittelt!") return redirect(url_for('devices')) # ----------------------- # Devices für Anzeige # ----------------------- devices = conn.execute(""" SELECT devices.mac, devices.rpi_ip, devices.port, devices.name, devices.is_active, switches.hostname AS switch_hostname FROM devices LEFT JOIN switches ON devices.switch_hostname = switches.hostname ORDER BY switches.hostname ASC, devices.name ASC """).fetchall() conn.close() return render_template('devices.html', devices=devices, switches=switches) @app.route('/devices/toggle/', methods=['POST']) @login_required def toggle_device(mac): if not current_user.is_admin: return {"success": False, "msg": "Zugriff verweigert!"}, 403 conn = get_db_connection() device = conn.execute("SELECT is_active, name FROM devices WHERE mac=?", (mac,)).fetchone() if not device: conn.close() return {"success": False, "msg": "Gerät nicht gefunden!"}, 404 new_status = 0 if device['is_active'] else 1 conn.execute("UPDATE devices SET is_active=? WHERE mac=?", (new_status, mac)) conn.commit() conn.close() status_text = "deaktiviert" if new_status == 0 else "aktiviert" return {"success": True, "msg": f"Gerät {device['name']} wurde {status_text}.", "new_status": new_status} @app.route('/switches', methods=['GET', 'POST']) @login_required def switches(): conn = get_db_connection() # Inline-Add if request.method == 'POST' and 'add_switch' in request.form: if not current_user.is_admin: flash("Zugriff verweigert!") return redirect(url_for('switches')) hostname = request.form['hostname'] ip = request.form['ip'] username = request.form['username'] password = encrypt_password(request.form['password']) try: conn.execute("INSERT INTO switches (hostname, ip, username, password) VALUES (?, ?, ?, ?)", (hostname, ip, username, password)) conn.commit() flash(f"Switch {hostname} hinzugefügt.") except sqlite3.IntegrityError: flash("Hostname existiert bereits oder Eingabefehler!") # Inline-Edit if request.method == 'POST' and 'edit_switch' in request.form: if not current_user.is_admin: flash("Zugriff verweigert!") return redirect(url_for('switches')) old_hostname = request.form['old_hostname'] hostname = request.form['hostname'] ip = request.form['ip'] username = request.form['username'] password = encrypt_password(request.form['password']) try: conn.execute(""" UPDATE switches SET hostname=?, ip=?, username=?, password=? WHERE hostname=? """, (hostname, ip, username, password, old_hostname)) conn.commit() flash(f"Switch {hostname} aktualisiert.") except sqlite3.IntegrityError: flash("Hostname existiert bereits oder Eingabefehler!") switches = conn.execute("SELECT hostname, ip, username FROM switches").fetchall() conn.close() return render_template('switche.html', switches=switches) @app.route('/switches/delete/', methods=['POST']) @login_required def delete_switch(hostname): conn = get_db_connection() devices = conn.execute("SELECT name FROM devices WHERE switch_hostname=?", (hostname,)).fetchall() if devices: device_names = [d['name'] for d in devices] flash(f"Folgende Geräte sind noch dem Switch {hostname} zugewiesen: {', '.join(device_names)}", "danger") conn.close() return redirect(url_for('switches')) conn.execute("DELETE FROM switches WHERE hostname=?", (hostname,)) conn.commit() conn.close() flash(f"Switch '{hostname}' gelöscht.", "success") return redirect(url_for('switches')) @app.route("/get_log") @login_required def get_log(): log_files = glob.glob("/var/log/rpi-*.log") if not log_files: return "Keine Logfiles gefunden." latest_log = max(log_files, key=os.path.getctime) try: with open(latest_log, "r") as f: return f.read() except Exception as e: return f"Fehler beim Lesen des Logs: {e}" @app.route('/logs') @login_required def logs(): # Intervall aus DB laden conn = sqlite3.connect("sqlite.db") c = conn.cursor() c.execute("SELECT value FROM settings WHERE key='interval'") row = c.fetchone() interval = int(row[0]) if row else 5 # Default 5 Minuten conn.close() # alle Logfiles mit Muster rpi-YYYYMMDDHHMMSS.log log_files = glob.glob("/var/log/rpi-*.log") if not log_files: return render_template('logs.html', log_content="Keine Logfiles gefunden.") # das neuste Logfile auswählen latest_log = max(log_files, key=os.path.getctime) try: with open(latest_log, "r") as f: log_content = f.read() except Exception as e: log_content = f"Fehler beim Lesen des Logs: {e}" return render_template('logs.html', log_content=log_content, log_name=os.path.basename(latest_log), interval=interval) def load_device_status(): status = {} # Devices aus DB laden (Name → MAC) conn = sqlite3.connect("sqlite.db") conn.row_factory = sqlite3.Row devices = conn.execute("SELECT mac, name FROM devices").fetchall() name_to_mac = {d['name']: d['mac'] for d in devices} conn.close() # Logfile log_files = glob.glob("/var/log/rpi-*.log") if not log_files: return status latest_log = max(log_files, key=os.path.getctime) online_re = re.compile(r"(\S+) ist erreichbar!") offline_re = re.compile(r"(\S+) ist nicht erreichbar!") with open(latest_log, "r") as f: for line in f: line = line.strip() m_online = online_re.search(line) m_offline = offline_re.search(line) if m_online: name = m_online.group(1) mac = name_to_mac.get(name) if mac: status[mac] = 'online' elif m_offline: name = m_offline.group(1) mac = name_to_mac.get(name) if mac: status[mac] = 'offline' return status @app.route("/users", methods=["GET", "POST"]) @login_required def users(): if not current_user.is_admin: flash("Nur Admins dürfen Benutzer verwalten!") return redirect(url_for("index")) conn = sqlite3.connect("sqlite.db") conn.row_factory = sqlite3.Row c = conn.cursor() if request.method == "POST": # Neuen Benutzer hinzufügen if "add_user" in request.form: username = request.form["username"].strip() password = request.form["password"].strip() is_admin = int(request.form.get("is_admin", 0)) if username and password: pw_hash = bcrypt.generate_password_hash(password).decode("utf-8") try: c.execute( "INSERT INTO users (username, password, is_admin) VALUES (?, ?, ?)", (username, pw_hash, is_admin) ) conn.commit() flash(f"Benutzer '{username}' erfolgreich angelegt!") except sqlite3.IntegrityError: flash("Benutzername existiert bereits!") else: flash("Username und Passwort dürfen nicht leer sein!") # Rolle ändern elif "change_role" in request.form: user_id = request.form["user_id"] username = request.form.get("username", "").strip() is_admin = int(request.form.get("is_admin", 0)) if username: c.execute( "UPDATE users SET username=?, is_admin=? WHERE id=?", (username, is_admin, user_id) ) conn.commit() flash("Rolle und Username geändert!") else: flash("Username darf nicht leer sein!") # Passwort ändern elif "change_password" in request.form: user_id = request.form["user_id"] new_password = request.form.get("new_password", "").strip() if new_password: pw_hash = bcrypt.generate_password_hash(new_password).decode("utf-8") c.execute( "UPDATE users SET password=? WHERE id=?", (pw_hash, user_id) ) conn.commit() flash("Passwort erfolgreich geändert!") else: flash("Passwort darf nicht leer sein!") # Benutzer löschen elif "delete_user" in request.form: user_id = request.form["delete_user"] c.execute("DELETE FROM users WHERE id=?", (user_id,)) conn.commit() flash("Benutzer gelöscht!") # Alle Benutzer laden (GET oder nach POST) c.execute("SELECT id, username, is_admin FROM users") users_list = c.fetchall() conn.close() # Direkt rendern, Flash-Messages werden angezeigt return render_template("users.html", users=users_list) if __name__ == "__main__": app.run(host='0.0.0.0', port=5000, debug=True)