mirror of https://github.com/SUNET/invent.git
uvicorn main:app --reload The reciver will accept a jsonfile and save it to a configurable location. Three environment variables can be set with configuration INVENT_HOST_DIR, INVENT_IMAGE_DIR and INVENT_DB_DIR It has two endpoints: host/ and image/ and you can post files like this curl -X POST -H 'accept: application/json' \ -H 'Content-Type: multipart/form-data' \ -F 'file=@test.json' \ --user test.sunet.se:a-very-secret-password \ http://127.0.0.1:8000/host/test.sunet.se The reciver uses a TOFU security model, so the user will be trusted as long as the password is the same one that was used the first time, if not the incident will be reported.main
parent
d11a18a653
commit
e63f62075b
@ -0,0 +1,109 @@
|
||||
import sys
|
||||
#!/usr/bin/env python3
|
||||
import os
|
||||
import os.path
|
||||
import sqlite3
|
||||
import uuid
|
||||
from os import makedirs
|
||||
import time
|
||||
from typing import Annotated
|
||||
|
||||
from argon2 import PasswordHasher
|
||||
from argon2.exceptions import VerifyMismatchError
|
||||
from fastapi import Depends, FastAPI, Response, UploadFile, status
|
||||
from fastapi.security import HTTPBasic, HTTPBasicCredentials
|
||||
|
||||
# Security singleton
|
||||
security = HTTPBasic()
|
||||
|
||||
class Inventory:
|
||||
|
||||
def __init__(self):
|
||||
|
||||
try:
|
||||
self.host_dir = os.environ["INVENT_HOST_DIR"]
|
||||
except KeyError:
|
||||
self.host_dir = '/var/cache/invent/hosts'
|
||||
try:
|
||||
self.image_dir = os.environ["INVENT_IMAGE_DIR"]
|
||||
except KeyError:
|
||||
self.image_dir = '/var/cache/invent/images'
|
||||
try:
|
||||
self.db_dir = os.environ["INVENT_DB_DIR"]
|
||||
except KeyError:
|
||||
self.db_dir = '/etc/invent/db'
|
||||
if not os.path.isdir(self.host_dir):
|
||||
makedirs(self.host_dir)
|
||||
if not os.path.isdir(self.image_dir):
|
||||
makedirs(self.image_dir)
|
||||
if not os.path.isdir(self.db_dir):
|
||||
makedirs(self.db_dir)
|
||||
|
||||
self.ph = PasswordHasher()
|
||||
self.db = sqlite3.connect(os.path.join(self.db_dir, "users.db"))
|
||||
self.cursor = self.db.cursor()
|
||||
self.cursor.execute("CREATE TABLE IF NOT EXISTS users(username, salt, hash)")
|
||||
self.cursor.execute("CREATE TABLE IF NOT EXISTS reports(username, timestamp, endpoint)")
|
||||
|
||||
|
||||
|
||||
def get_or_create_user(self, credentials: Annotated[HTTPBasicCredentials, Depends(security)]) -> tuple[str,str]:
|
||||
username = credentials.username
|
||||
query = self.cursor.execute(f"SELECT salt, hash FROM users WHERE username='{username}'")
|
||||
result = query.fetchone()
|
||||
# If the user is not in the database, we will trust the user and add it to the database
|
||||
# TOFU: https://developer.mozilla.org/en-US/docs/Glossary/TOFU
|
||||
if result is None:
|
||||
salt = uuid.uuid4().hex
|
||||
hash = self.ph.hash(salt + credentials.password)
|
||||
self.cursor.execute(f"INSERT INTO users (username, salt, hash) values('{username}', '{salt}', '{hash}')")
|
||||
self.db.commit()
|
||||
return (salt, hash)
|
||||
else:
|
||||
return result
|
||||
|
||||
def validate_credentials(self,credentials: Annotated[HTTPBasicCredentials, Depends(security)], salt: str, hash: str) -> bool:
|
||||
password = credentials.password
|
||||
try:
|
||||
self.ph.verify(hash, salt + password)
|
||||
except VerifyMismatchError:
|
||||
return False
|
||||
return True
|
||||
|
||||
async def upload(self, endpoint: str, file: UploadFile, name: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
|
||||
dir = self.host_dir
|
||||
if endpoint == 'image':
|
||||
dir = self.image_dir
|
||||
|
||||
if credentials.username != name:
|
||||
response.status_code = status.HTTP_403_FORBIDDEN
|
||||
return {"ERROR": "Username and andpoint does not match"}
|
||||
salt, hash = self.get_or_create_user(credentials)
|
||||
if self.validate_credentials(credentials, salt, hash):
|
||||
filename = os.path.join(dir, name + '.json')
|
||||
with open(filename, 'w') as fh:
|
||||
contents: bytes = await file.read()
|
||||
fh.write(contents.decode('utf-8'))
|
||||
|
||||
return {"SUCCESS": f"File: {filename} saved"}
|
||||
else:
|
||||
query = self.cursor.execute(f"SELECT * FROM reports WHERE username='{credentials.username}' and endpoint={endpoint}")
|
||||
result = query.fetchone()
|
||||
if result is None:
|
||||
timestamp = int(time.time())
|
||||
self.cursor.execute(f"INSERT INTO reports (username, timestamp) values('{credentials.username}','{endpoint}', '{timestamp}')")
|
||||
self.db.commit()
|
||||
response.status_code = status.HTTP_401_UNAUTHORIZED
|
||||
return {"ERROR": "Invalid password, this incident will be reported"}
|
||||
|
||||
|
||||
app = FastAPI()
|
||||
inventory = Inventory()
|
||||
|
||||
@app.post("/host/{hostname}", status_code=status.HTTP_201_CREATED)
|
||||
async def upload_host(file: UploadFile, hostname: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
|
||||
return await inventory.upload('host', file, hostname, credentials, response)
|
||||
|
||||
@app.post("/image/{imagename}", status_code=status.HTTP_201_CREATED)
|
||||
async def upload_image(file: UploadFile, imagename: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
|
||||
return await inventory.upload('image', file, imagename, credentials, response)
|
@ -0,0 +1,5 @@
|
||||
fastapi[all]
|
||||
uvicorn[standard]
|
||||
python-multipart
|
||||
argon2-cffi
|
||||
uuid
|
Loading…
Reference in new issue