Files
Aruba-PoE/srv/poe_manager/app.py

550 lines
20 KiB
Python

#u!/usr/bin/env python3
from flask import Flask, render_template, request, redirect, url_for, flash
from flask_login import LoginManager, login_user, login_required, logout_user, UserMixin, current_user
from flask_bcrypt import Bcrypt
from cryptography.fernet import Fernet
import sqlite3
import glob, os, re
app = Flask(__name__)
app.secret_key = "309cc4d5ce1fe7486ae25cbd232bbdfe6a72539c03f0127d372186dbdc0fc928"
bcrypt = Bcrypt(app)
login_manager = LoginManager()
login_manager.login_view = "login"
login_manager.init_app(app)
DB_PATH = "/srv/poe_manager/sqlite.db"
class User(UserMixin):
def __init__(self, id_, username, is_admin):
self.id = id_
self.username = username
self.is_admin = is_admin
def get_interval_seconds():
conn = get_db_connection()
row = conn.execute("SELECT value FROM settings WHERE key='check_interval'").fetchone()
conn.close()
return int(row['value']) if row else 300
with open("/srv/poe_manager/fernet.key", "rb") as f:
fernet = Fernet(f.read())
def encrypt_password(password: str) -> str:
return fernet.encrypt(password.encode()).decode()
def decrypt_password(token: str) -> str:
return fernet.decrypt(token.encode()).decode()
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def get_devices():
"""
Liefert eine Liste aller Devices aus der Datenbank als Dictionaries.
"""
conn = get_db_connection()
devices = conn.execute("SELECT mac, rpi_ip, port, name, switch_hostname, is_active FROM devices ORDER BY name ASC").fetchall()
conn.close()
return devices
@login_manager.user_loader
def load_user(user_id):
conn = get_db_connection()
user = conn.execute("SELECT * FROM users WHERE id = ?", (user_id,)).fetchone()
conn.close()
if user:
return User(user['id'], user['username'], user['is_admin'])
return None
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
conn = get_db_connection()
user = conn.execute("SELECT * FROM users WHERE username = ?", (username,)).fetchone()
conn.close()
if user and bcrypt.check_password_hash(user['password'], password):
login_user(User(user['id'], user['username'], user['is_admin']))
return redirect(url_for('index'))
else:
flash("Ungültiger Benutzername oder Passwort")
return render_template('login.html')
@app.route('/logout')
@login_required
def logout():
logout_user()
return redirect(url_for('login'))
@app.route("/")
@login_required
def index():
conn = sqlite3.connect("sqlite.db")
c = conn.cursor()
# is_active mit abfragen
c.execute("SELECT mac, name, is_active FROM devices ORDER BY name ASC")
devices = c.fetchall()
conn.close()
# Status aus letztem Log ermitteln
import glob, os
log_files = glob.glob("/var/log/rpi-*.log")
status_dict = {}
if log_files:
latest_log = max(log_files, key=os.path.getctime)
with open(latest_log, "r") as f:
for line in f:
for dev in devices:
if dev[1] in line:
status_dict[dev[0]] = "online" if "erreichbar" in line else "offline"
return render_template("index.html", devices=devices, status=status_dict)
@app.route("/settings", methods=["GET", "POST"])
@login_required
def settings():
if not current_user.is_admin:
flash("Nur Admins dürfen die Einstellungen ändern!")
return redirect(url_for("index"))
# Aktuellen Prüfintervall laden
conn = sqlite3.connect("sqlite.db")
c = conn.cursor()
c.execute("SELECT value FROM settings WHERE key='interval'")
row = c.fetchone()
interval = int(row[0]) if row else 5 # Default 5 Minuten
conn.close()
if request.method == "POST":
new_interval = int(request.form["interval"])
conn = sqlite3.connect("sqlite.db")
c = conn.cursor()
# upsert
c.execute("INSERT INTO settings (key, value) VALUES (?, ?) "
"ON CONFLICT(key) DO UPDATE SET value=excluded.value",
("interval", new_interval))
conn.commit()
conn.close()
# rpi-check.service neu starten
import subprocess
subprocess.run(["systemctl", "restart", "rpi-check.service"])
flash(f"Intervall auf {new_interval} Minuten gesetzt und Service neu gestartet!")
return redirect(url_for("settings"))
return render_template("settings.html", interval=interval)
@app.route('/devices', methods=['GET', 'POST'])
@login_required
def devices():
conn = get_db_connection()
switches = conn.execute("SELECT hostname FROM switches").fetchall()
if request.method == 'POST':
# -----------------------
# Gerät hinzufügen
# -----------------------
if 'add_device' in request.form:
if not current_user.is_admin:
flash("Zugriff verweigert!")
return redirect(url_for('devices'))
mac = request.form.get('mac')
rpi_ip = request.form.get('rpi_ip')
port = request.form.get('port')
name = request.form.get('name')
switch_hostname = request.form.get('switch_hostname')
is_active = 1 if 'is_active' in request.form else 0
if not all([mac, rpi_ip, port, name, switch_hostname]):
flash("Alle Felder müssen ausgefüllt sein!")
return redirect(url_for('devices'))
# Prüfen auf doppelte IP
ip_device = conn.execute("SELECT name FROM devices WHERE rpi_ip=?", (rpi_ip,)).fetchone()
if ip_device:
flash(f"IP-Adresse existiert bereits für Gerät '{ip_device['name']}'!")
return redirect(url_for('devices'))
# Prüfen auf doppelte MAC
mac_device = conn.execute("SELECT name FROM devices WHERE mac=?", (mac,)).fetchone()
if mac_device:
flash(f"MAC-Adresse existiert bereits für Gerät '{mac_device['name']}'!")
return redirect(url_for('devices'))
try:
conn.execute("""
INSERT INTO devices (mac, rpi_ip, port, name, switch_hostname, is_active)
VALUES (?, ?, ?, ?, ?, ?)
""", (mac, rpi_ip, port, name, switch_hostname, is_active))
conn.commit()
flash(f"Gerät {name} hinzugefügt.")
except sqlite3.IntegrityError:
flash("Fehler beim Hinzufügen des Geräts!")
# -----------------------
# Gerät bearbeiten
# -----------------------
elif 'edit_device' in request.form:
if not current_user.is_admin:
flash("Zugriff verweigert!")
return redirect(url_for('devices'))
old_mac = request.form.get('old_mac')
mac = request.form.get('mac')
rpi_ip = request.form.get('rpi_ip')
port = request.form.get('port')
name = request.form.get('name')
switch_hostname = request.form.get('switch_hostname') or None
is_active = 1 if 'is_active' in request.form else 0
# --- Prüfen, ob es sich um eine Switch-Änderung handelt ---
if mac is None and rpi_ip is None and port is None and name is None and switch_hostname:
# Nur den Switch ändern
device = conn.execute("SELECT name, switch_hostname FROM devices WHERE mac=?", (old_mac,)).fetchone()
if not device:
flash("Gerät nicht gefunden!")
return redirect(url_for('devices'))
old_switch = device['switch_hostname'] or "unbekannt"
device_name = device['name']
try:
conn.execute("""
UPDATE devices
SET switch_hostname=?
WHERE mac=?
""", (switch_hostname, old_mac))
conn.commit()
flash(f"Switch von {device_name} geändert: {old_switch}{switch_hostname}")
except sqlite3.IntegrityError:
flash("Fehler beim Ändern des Switch!")
return redirect(url_for('devices'))
if not all([old_mac, mac, rpi_ip, port, name]):
flash("Alle Felder müssen ausgefüllt sein!")
return redirect(url_for('devices'))
# Prüfen auf doppelte IP außer das aktuelle Gerät
ip_device = conn.execute("SELECT name FROM devices WHERE rpi_ip=? AND mac<>?", (rpi_ip, old_mac)).fetchone()
if ip_device:
flash(f"IP-Adresse existiert bereits für Gerät '{ip_device['name']}'!")
return redirect(url_for('devices'))
# Prüfen auf doppelte MAC außer das aktuelle Gerät
mac_device = conn.execute("SELECT name FROM devices WHERE mac=? AND mac<>?", (mac, old_mac)).fetchone()
if mac_device:
flash(f"MAC-Adresse existiert bereits für Gerät '{mac_device['name']}'!")
return redirect(url_for('devices'))
try:
conn.execute("""
UPDATE devices
SET mac=?, rpi_ip=?, port=?, name=?, is_active=?
WHERE mac=?
""", (mac, rpi_ip, port, name, is_active, old_mac))
conn.commit()
flash(f"Gerät {name} aktualisiert.")
except sqlite3.IntegrityError:
flash("Fehler beim Aktualisieren des Geräts!")
# -----------------------
# Gerät aktivieren/deaktivieren
# -----------------------
elif 'toggle_device' in request.form:
mac = request.form.get('toggle_device')
device = conn.execute("SELECT is_active, name FROM devices WHERE mac=?", (mac,)).fetchone()
if device:
new_status = 0 if device['is_active'] else 1
conn.execute("UPDATE devices SET is_active=? WHERE mac=?", (new_status, mac))
conn.commit()
status_text = "deaktiviert" if new_status == 0 else "aktiviert"
flash(f"Gerät '{device['name']}' wurde {status_text}.")
else:
flash("Gerät nicht gefunden!")
return redirect(url_for('devices'))
# -----------------------
# Gerät löschen
# -----------------------
elif 'delete_device' in request.form:
if not current_user.is_admin:
flash("Zugriff verweigert!")
return redirect(url_for('devices'))
del_mac = request.form.get('delete_device')
if del_mac:
device = conn.execute("SELECT name FROM devices WHERE mac=?", (del_mac,)).fetchone()
hostname = device['name'] if device else del_mac
conn.execute("DELETE FROM devices WHERE mac=?", (del_mac,))
conn.commit()
flash(f"Gerät {hostname} gelöscht.")
else:
flash("Keine MAC-Adresse übermittelt!")
return redirect(url_for('devices'))
# -----------------------
# Devices für Anzeige
# -----------------------
devices = conn.execute("""
SELECT devices.mac, devices.rpi_ip, devices.port, devices.name, devices.is_active,
switches.hostname AS switch_hostname
FROM devices
LEFT JOIN switches ON devices.switch_hostname = switches.hostname
ORDER BY switches.hostname ASC
""").fetchall()
conn.close()
return render_template('devices.html', devices=devices, switches=switches)
@app.route('/devices/toggle/<mac>', methods=['POST'])
@login_required
def toggle_device(mac):
if not current_user.is_admin:
flash("Zugriff verweigert!")
return redirect(url_for('devices'))
conn = get_db_connection()
device = conn.execute("SELECT is_active, name FROM devices WHERE mac=?", (mac,)).fetchone()
if not device:
flash("Gerät nicht gefunden!")
conn.close()
return redirect(url_for('devices'))
new_status = 0 if device['is_active'] else 1
conn.execute("UPDATE devices SET is_active=? WHERE mac=?", (new_status, mac))
conn.commit()
conn.close()
status_text = "deaktiviert" if new_status == 0 else "aktiviert"
flash(f"Gerät '{device['name']}' wurde {status_text}.")
return redirect(url_for('devices'))
@app.route('/switches', methods=['GET', 'POST'])
@login_required
def switches():
conn = get_db_connection()
# Inline-Add
if request.method == 'POST' and 'add_switch' in request.form:
if not current_user.is_admin:
flash("Zugriff verweigert!")
return redirect(url_for('switches'))
hostname = request.form['hostname']
ip = request.form['ip']
username = request.form['username']
password = encrypt_password(request.form['password'])
try:
conn.execute("INSERT INTO switches (hostname, ip, username, password) VALUES (?, ?, ?, ?)",
(hostname, ip, username, password))
conn.commit()
flash(f"Switch {hostname} hinzugefügt.")
except sqlite3.IntegrityError:
flash("Hostname existiert bereits oder Eingabefehler!")
# Inline-Edit
if request.method == 'POST' and 'edit_switch' in request.form:
if not current_user.is_admin:
flash("Zugriff verweigert!")
return redirect(url_for('switches'))
old_hostname = request.form['old_hostname']
hostname = request.form['hostname']
ip = request.form['ip']
username = request.form['username']
password = encrypt_password(request.form['password'])
try:
conn.execute("""
UPDATE switches
SET hostname=?, ip=?, username=?, password=?
WHERE hostname=?
""", (hostname, ip, username, password, old_hostname))
conn.commit()
flash(f"Switch {hostname} aktualisiert.")
except sqlite3.IntegrityError:
flash("Hostname existiert bereits oder Eingabefehler!")
@app.route('/switches/delete/<int:id>', methods=['POST'])
def delete_switch(id):
switch = Switch.query.get_or_404(id)
# Prüfen, ob noch Geräte diesem Switch zugeordnet sind
assigned_devices = Device.query.filter_by(switch_hostname=switch.hostname).all()
if assigned_devices:
# Liste der Gerätenamen
device_names = [d.hostname for d in assigned_devices]
flash(f"Folgende Geräte sind noch dem Switch '{switch.hostname}' zugewiesen: {', '.join(device_names)}", "danger")
return redirect(url_for('switches'))
try:
db.session.delete(switch)
db.session.commit()
flash(f"Switch '{switch.hostname}' wurde gelöscht.", "success")
except Exception as e:
db.session.rollback()
flash(f"Fehler beim Löschen des Switches: {e}", "danger")
return redirect(url_for('switches'))
@app.route("/get_log")
@login_required
def get_log():
log_files = glob.glob("/var/log/rpi-*.log")
if not log_files:
return "Keine Logfiles gefunden."
latest_log = max(log_files, key=os.path.getctime)
try:
with open(latest_log, "r") as f:
return f.read()
except Exception as e:
return f"Fehler beim Lesen des Logs: {e}"
@app.route('/logs')
@login_required
def logs():
# Intervall aus DB laden
conn = sqlite3.connect("sqlite.db")
c = conn.cursor()
c.execute("SELECT value FROM settings WHERE key='interval'")
row = c.fetchone()
interval = int(row[0]) if row else 5 # Default 5 Minuten
conn.close()
# alle Logfiles mit Muster rpi-YYYYMMDDHHMMSS.log
log_files = glob.glob("/var/log/rpi-*.log")
if not log_files:
return render_template('logs.html', log_content="Keine Logfiles gefunden.")
# das neuste Logfile auswählen
latest_log = max(log_files, key=os.path.getctime)
try:
with open(latest_log, "r") as f:
log_content = f.read()
except Exception as e:
log_content = f"Fehler beim Lesen des Logs: {e}"
return render_template('logs.html', log_content=log_content, log_name=os.path.basename(latest_log), interval=interval)
def load_device_status():
"""
Liest das aktuellste rpi-Logfile und extrahiert den letzten Status jedes Devices.
Gibt ein Dictionary zurück: {Device-Name: 'online'/'offline'}
"""
status = {}
log_files = glob.glob("/var/log/rpi-*.log")
if not log_files:
return status
latest_log = max(log_files, key=os.path.getctime)
# Jede Zeile des Logs lesen
with open(latest_log, "r") as f:
lines = f.readlines()
# Regex für Ping-Ergebnisse
online_re = re.compile(r"(\S+) ist erreichbar!")
offline_re = re.compile(r"(\S+) ist nicht erreichbar!")
for line in lines:
line = line.strip()
m_online = online_re.search(line)
m_offline = offline_re.search(line)
if m_online:
status[m_online.group(1)] = 'online'
elif m_offline:
status[m_offline.group(1)] = 'offline'
return status
@app.route("/users", methods=["GET", "POST"])
@login_required
def users():
if not current_user.is_admin:
flash("Nur Admins dürfen Benutzer verwalten!")
return redirect(url_for("index"))
conn = sqlite3.connect("sqlite.db")
conn.row_factory = sqlite3.Row
c = conn.cursor()
if request.method == "POST":
# Neuen Benutzer hinzufügen
if "add_user" in request.form:
username = request.form["username"].strip()
password = request.form["password"].strip()
is_admin = int(request.form.get("is_admin", 0))
if username and password:
pw_hash = bcrypt.generate_password_hash(password).decode("utf-8")
try:
c.execute(
"INSERT INTO users (username, password, is_admin) VALUES (?, ?, ?)",
(username, pw_hash, is_admin)
)
conn.commit()
flash(f"Benutzer '{username}' erfolgreich angelegt!")
except sqlite3.IntegrityError:
flash("Benutzername existiert bereits!")
else:
flash("Username und Passwort dürfen nicht leer sein!")
# Rolle ändern
elif "change_role" in request.form:
user_id = request.form["user_id"]
username = request.form.get("username", "").strip()
is_admin = int(request.form.get("is_admin", 0))
if username:
c.execute(
"UPDATE users SET username=?, is_admin=? WHERE id=?",
(username, is_admin, user_id)
)
conn.commit()
flash("Rolle und Username geändert!")
else:
flash("Username darf nicht leer sein!")
# Passwort ändern
elif "change_password" in request.form:
user_id = request.form["user_id"]
new_password = request.form.get("new_password", "").strip()
if new_password:
pw_hash = bcrypt.generate_password_hash(new_password).decode("utf-8")
c.execute(
"UPDATE users SET password=? WHERE id=?",
(pw_hash, user_id)
)
conn.commit()
flash("Passwort erfolgreich geändert!")
else:
flash("Passwort darf nicht leer sein!")
# Benutzer löschen
elif "delete_user" in request.form:
user_id = request.form["delete_user"]
c.execute("DELETE FROM users WHERE id=?", (user_id,))
conn.commit()
flash("Benutzer gelöscht!")
# Alle Benutzer laden (GET oder nach POST)
c.execute("SELECT id, username, is_admin FROM users")
users_list = c.fetchall()
conn.close()
# Direkt rendern, Flash-Messages werden angezeigt
return render_template("users.html", users=users_list)
if __name__ == "__main__":
app.run(host='0.0.0.0', port=5000, debug=True)