mirror of https://github.com/SUNET/invent.git
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
110 lines
4.5 KiB
110 lines
4.5 KiB
import sys
|
|
#!/usr/bin/env python3
|
|
import os
|
|
import os.path
|
|
import sqlite3
|
|
import uuid
|
|
from os import makedirs
|
|
import time
|
|
from typing import Annotated
|
|
|
|
from argon2 import PasswordHasher
|
|
from argon2.exceptions import VerifyMismatchError
|
|
from fastapi import Depends, FastAPI, Response, UploadFile, status
|
|
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
|
|
|
# Security singleton
|
|
security = HTTPBasic()
|
|
|
|
class Inventory:
|
|
|
|
def __init__(self):
|
|
|
|
try:
|
|
self.host_dir = os.environ["INVENT_HOST_DIR"]
|
|
except KeyError:
|
|
self.host_dir = '/var/cache/invent/hosts'
|
|
try:
|
|
self.image_dir = os.environ["INVENT_IMAGE_DIR"]
|
|
except KeyError:
|
|
self.image_dir = '/var/cache/invent/images'
|
|
try:
|
|
self.db_dir = os.environ["INVENT_DB_DIR"]
|
|
except KeyError:
|
|
self.db_dir = '/etc/invent/db'
|
|
if not os.path.isdir(self.host_dir):
|
|
makedirs(self.host_dir)
|
|
if not os.path.isdir(self.image_dir):
|
|
makedirs(self.image_dir)
|
|
if not os.path.isdir(self.db_dir):
|
|
makedirs(self.db_dir)
|
|
|
|
self.ph = PasswordHasher()
|
|
self.db = sqlite3.connect(os.path.join(self.db_dir, "users.db"))
|
|
self.cursor = self.db.cursor()
|
|
self.cursor.execute("CREATE TABLE IF NOT EXISTS users(username, salt, hash)")
|
|
self.cursor.execute("CREATE TABLE IF NOT EXISTS reports(username, timestamp, endpoint)")
|
|
|
|
|
|
|
|
def get_or_create_user(self, credentials: Annotated[HTTPBasicCredentials, Depends(security)]) -> tuple[str,str]:
|
|
username = credentials.username
|
|
query = self.cursor.execute(f"SELECT salt, hash FROM users WHERE username='{username}'")
|
|
result = query.fetchone()
|
|
# If the user is not in the database, we will trust the user and add it to the database
|
|
# TOFU: https://developer.mozilla.org/en-US/docs/Glossary/TOFU
|
|
if result is None:
|
|
salt = uuid.uuid4().hex
|
|
hash = self.ph.hash(salt + credentials.password)
|
|
self.cursor.execute(f"INSERT INTO users (username, salt, hash) values('{username}', '{salt}', '{hash}')")
|
|
self.db.commit()
|
|
return (salt, hash)
|
|
else:
|
|
return result
|
|
|
|
def validate_credentials(self,credentials: Annotated[HTTPBasicCredentials, Depends(security)], salt: str, hash: str) -> bool:
|
|
password = credentials.password
|
|
try:
|
|
self.ph.verify(hash, salt + password)
|
|
except VerifyMismatchError:
|
|
return False
|
|
return True
|
|
|
|
async def upload(self, endpoint: str, file: UploadFile, name: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
|
|
dir = self.host_dir
|
|
if endpoint == 'image':
|
|
dir = self.image_dir
|
|
|
|
if credentials.username != name:
|
|
response.status_code = status.HTTP_403_FORBIDDEN
|
|
return {"ERROR": "Username and andpoint does not match"}
|
|
salt, hash = self.get_or_create_user(credentials)
|
|
if self.validate_credentials(credentials, salt, hash):
|
|
filename = os.path.join(dir, name + '.json')
|
|
with open(filename, 'w') as fh:
|
|
contents: bytes = await file.read()
|
|
fh.write(contents.decode('utf-8'))
|
|
|
|
return {"SUCCESS": f"File: {filename} saved"}
|
|
else:
|
|
query = self.cursor.execute(f"SELECT * FROM reports WHERE username='{credentials.username}' and endpoint={endpoint}")
|
|
result = query.fetchone()
|
|
if result is None:
|
|
timestamp = int(time.time())
|
|
self.cursor.execute(f"INSERT INTO reports (username, timestamp) values('{credentials.username}','{endpoint}', '{timestamp}')")
|
|
self.db.commit()
|
|
response.status_code = status.HTTP_401_UNAUTHORIZED
|
|
return {"ERROR": "Invalid password, this incident will be reported"}
|
|
|
|
|
|
app = FastAPI()
|
|
inventory = Inventory()
|
|
|
|
@app.post("/host/{hostname}", status_code=status.HTTP_201_CREATED)
|
|
async def upload_host(file: UploadFile, hostname: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
|
|
return await inventory.upload('host', file, hostname, credentials, response)
|
|
|
|
@app.post("/image/{imagename}", status_code=status.HTTP_201_CREATED)
|
|
async def upload_image(file: UploadFile, imagename: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
|
|
return await inventory.upload('image', file, imagename, credentials, response)
|