Add admin interface for managing reports

main
Micke Nordin 2 years ago
parent 3a0d02af86
commit 6774b94caf
Signed by: micke
GPG Key ID: 0DA0A7A5708FE257

@ -6,6 +6,7 @@ RUN useradd --system --create-home --home-dir /app --shell /bin/bash invent
USER invent
WORKDIR /app
COPY ./main.py .
COPY ./templates ./templates/
COPY ./requirements.txt .
RUN pip install --no-cache-dir --requirement requirements.txt
EXPOSE 8000/tcp

@ -1,25 +1,40 @@
import sys
#!/usr/bin/env python3
import datetime
import logging
import os
import os.path
import sqlite3
import time
import uuid
from os import makedirs
import time
from typing import Annotated
from argon2 import PasswordHasher
from argon2.exceptions import VerifyMismatchError
from fastapi import Depends, FastAPI, Response, UploadFile, status
from fastapi import Depends, FastAPI, Request, Response, UploadFile, status
from fastapi.logger import logger
from fastapi.responses import HTMLResponse
from fastapi.security import HTTPBasic, HTTPBasicCredentials
from fastapi.templating import Jinja2Templates
# Security singleton
security = HTTPBasic()
uvicorn_logger = logging.getLogger('uvicorn.error')
logger.handlers = uvicorn_logger.handlers
class Inventory:
def __init__(self):
def __init__(self) -> None:
try:
self.admin_password = os.environ["INVENT_ADMIN_PASSWORD"]
except KeyError:
self.admin_password = uuid.uuid4().hex
logger.error(f'INFO:\tINVENT_ADMIN_PASSWORD not set, admin password set to: `{self.admin_password}` for this session.')
try:
self.disable_tofu = os.environ["INVENT_DISABLE_TOFU"].lower() in [ 'true', 'yes', '1', 'y']
except KeyError:
self.disable_tofu = False
try:
self.host_dir = os.environ["INVENT_HOST_DIR"]
except KeyError:
@ -39,26 +54,32 @@ class Inventory:
if not os.path.isdir(self.db_dir):
makedirs(self.db_dir)
self.admin_salt = uuid.uuid4().hex
self.ph = PasswordHasher()
self.db = sqlite3.connect(os.path.join(self.db_dir, "users.db"))
self.cursor = self.db.cursor()
self.cursor.execute("CREATE TABLE IF NOT EXISTS users(username, salt, hash)")
self.cursor.execute("CREATE TABLE IF NOT EXISTS users(username, salt, hash, endpoint)")
self.cursor.execute("CREATE TABLE IF NOT EXISTS reports(username, timestamp, endpoint)")
def get_or_create_user(self, credentials: Annotated[HTTPBasicCredentials, Depends(security)]) -> tuple[str,str]:
def get_or_create_user(self, credentials: Annotated[HTTPBasicCredentials, Depends(security)], endpoint: str) -> tuple[str,str]:
username = credentials.username
query = self.cursor.execute(f"SELECT salt, hash FROM users WHERE username='{username}'")
query = self.cursor.execute(f"SELECT salt, hash FROM users WHERE username='{username}' and endpoint='{endpoint}'")
result = query.fetchone()
# If the user is not in the database, we will trust the user and add it to the database
# TOFU: https://developer.mozilla.org/en-US/docs/Glossary/TOFU
if result is None:
if result is None and not self.disable_tofu:
salt = uuid.uuid4().hex
hash = self.ph.hash(salt + credentials.password)
self.cursor.execute(f"INSERT INTO users (username, salt, hash) values('{username}', '{salt}', '{hash}')")
self.cursor.execute(f"INSERT INTO users (username, salt, hash, endpoint) values('{username}', '{salt}', '{hash}', '{endpoint}')")
self.db.commit()
return (salt, hash)
# FIXME: How can we best communicate that the user was not in the db?
if result is None and self.disable_tofu:
salt = uuid.uuid4().hex
hash = self.ph.hash(salt + uuid.uuid4().hex)
return (salt, hash)
else:
return result
@ -77,8 +98,8 @@ class Inventory:
if credentials.username != name:
response.status_code = status.HTTP_403_FORBIDDEN
return {"ERROR": "Username and andpoint does not match"}
salt, hash = self.get_or_create_user(credentials)
return {"ERROR": "Username and endpoint does not match"}
salt, hash = self.get_or_create_user(credentials, endpoint)
if self.validate_credentials(credentials, salt, hash):
filename = os.path.join(dir, name + '.json')
with open(filename, 'w') as fh:
@ -87,11 +108,14 @@ class Inventory:
return {"SUCCESS": f"File: {filename} saved"}
else:
try:
query = self.cursor.execute(f"SELECT * FROM reports WHERE username='{credentials.username}' and endpoint={endpoint}")
result = query.fetchone()
except sqlite3.OperationalError:
result = None
if result is None:
timestamp = int(time.time())
self.cursor.execute(f"INSERT INTO reports (username, timestamp) values('{credentials.username}','{endpoint}', '{timestamp}')")
self.cursor.execute(f"INSERT INTO reports (username, timestamp, endpoint) values('{credentials.username}','{timestamp}','{endpoint}')")
self.db.commit()
response.status_code = status.HTTP_401_UNAUTHORIZED
return {"ERROR": "Invalid password, this incident will be reported"}
@ -99,6 +123,7 @@ class Inventory:
app = FastAPI()
inventory = Inventory()
templates = Jinja2Templates(directory="templates")
@app.post("/host/{hostname}", status_code=status.HTTP_201_CREATED)
async def upload_host(file: UploadFile, hostname: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
@ -107,3 +132,26 @@ async def upload_host(file: UploadFile, hostname: str, credentials: Annotated[HT
@app.post("/image/{imagename}", status_code=status.HTTP_201_CREATED)
async def upload_image(file: UploadFile, imagename: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
return await inventory.upload('image', file, imagename, credentials, response)
@app.get("/admin", response_class=HTMLResponse)
async def show_admin_interface(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request):
hash = inventory.ph.hash(inventory.admin_salt + inventory.admin_password)
if not inventory.validate_credentials(credentials, inventory.admin_salt, hash) and credentials.username != 'admin':
return templates.TemplateResponse('unauthorized.html', {"request": request})
query = inventory.cursor.execute(f"SELECT * FROM reports")
result = query.fetchall()
reports = list()
for res in result:
reports.append({"username": res[0],"timestamp": datetime.datetime.fromtimestamp(int(res[1])), "endpoint": res[2]})
return templates.TemplateResponse("admin.html", {"request": request, "reports": reports})
@app.post("/admin/{endpoint}/{name}", response_class=HTMLResponse)
async def delete_report_and_reset_user(endpoint: str, name: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request):
hash = inventory.ph.hash(inventory.admin_salt + inventory.admin_password)
if not inventory.validate_credentials(credentials, inventory.admin_salt, hash) and credentials.username != 'admin':
return templates.TemplateResponse('unauthorized.html', {"request": request})
inventory.cursor.execute(f"DELETE FROM reports where username='{name}' and endpoint='{endpoint}'")
inventory.db.commit()
inventory.cursor.execute(f"DELETE FROM users where username='{name}' and endpoint='{endpoint}'")
inventory.db.commit()
return templates.TemplateResponse('delete_report_and_reset_user.html', {"request": request, "username": name, "endpoint": endpoint})

@ -0,0 +1,25 @@
<!DOCTYPE html>
<html>
<head>
<title>Inventory administration</title>
</head>
<body>
<h1>Reports</h1>
{% if reports|length > 0 %}
<form>
<ul>
{% for report in reports %}
<li>User: {{ report.username }}, endpoint: {{ report.endpoint }}, timestamp: {{ report.timestamp }} <button
type="submit" formmethod="post" formaction="/admin/{{ report.endpoint }}/{{ report.username }}" id="{{ report.endpoint }}.{{ report.username }}">Reset</button>
</li>
{% endfor %}
<ul />
</form>
{% else %}
<p>You have no reports at this time</p>
{% endif %}
</body>
</html>

@ -0,0 +1,15 @@
<!DOCTYPE html>
<html>
<head>
<title>Deleted</title>
<meta http-equiv="refresh" content="3;url=/admin" />
</head>
<body>
<h1>Success</h1>
<p>{{endpoint}}/{{username}} deleted</p>
<p>Redirecting back to /admin</p>
</body>
</html>

@ -0,0 +1,12 @@
<!DOCTYPE html>
<html>
<head>
<title>Unauthorized</title>
</head>
<body>
<h1>Unauthorized</h1>
</body>
</html>
Loading…
Cancel
Save