Compare commits

..

No commits in common. 'b36d58289f6074c8e95a25e6392bfd748581cada' and '03bebb9848358d54c4c84f6a2a7a25dedac8734a' have entirely different histories.

@ -19,93 +19,60 @@ from fastapi.templating import Jinja2Templates
# Security singleton # Security singleton
security = HTTPBasic() security = HTTPBasic()
uvicorn_logger = logging.getLogger("uvicorn.error") uvicorn_logger = logging.getLogger('uvicorn.error')
logger.handlers = uvicorn_logger.handlers logger.handlers = uvicorn_logger.handlers
class Inventory: class Inventory:
def __init__(self) -> None: def __init__(self) -> None:
try: try:
self.admin_password = os.environ["INVENT_ADMIN_PASSWORD"] self.admin_password = os.environ["INVENT_ADMIN_PASSWORD"]
except KeyError: except KeyError:
self.admin_password = uuid.uuid4().hex self.admin_password = uuid.uuid4().hex
logger.error( logger.error(f'INFO:\tINVENT_ADMIN_PASSWORD not set, admin password set to: `{self.admin_password}` for this session.')
f"INFO:\tINVENT_ADMIN_PASSWORD not set, admin password set to: `{self.admin_password}` for this session."
)
try: try:
self.disable_tofu = os.environ["INVENT_DISABLE_TOFU"].lower() in [ self.disable_tofu = os.environ["INVENT_DISABLE_TOFU"].lower() in [ 'true', 'yes', '1', 'y']
"true",
"yes",
"1",
"y",
]
except KeyError: except KeyError:
self.disable_tofu = False self.disable_tofu = False
try:
dirs = [ self.host_dir = os.environ["INVENT_HOST_DIR"]
{ except KeyError:
"name": "db", self.host_dir = '/var/cache/invent/hosts'
"key": "db_dir", try:
"default_path": "/etc/invent/db", self.image_dir = os.environ["INVENT_IMAGE_DIR"]
"envar": "INVENT_DB_DIR", except KeyError:
}, self.image_dir = '/var/cache/invent/images'
{ try:
"name": "hosts", self.db_dir = os.environ["INVENT_DB_DIR"]
"key": "host_dir", except KeyError:
"default_path": "/var/cache/invent/hosts", self.db_dir = '/etc/invent/db'
"envar": "INVENT_HOST_DIR", if not os.path.isdir(self.host_dir):
}, makedirs(self.host_dir)
{ if not os.path.isdir(self.image_dir):
"name": "images", makedirs(self.image_dir)
"key": "image_dir", if not os.path.isdir(self.db_dir):
"default_path": "/var/cache/invent/images", makedirs(self.db_dir)
"envar": "INVENT_IMAGE_DIR",
},
{
"name": "network",
"key": "network_dir",
"default_path": "/var/cache/invent/network",
"envar": "INVENT_NETWORK_DIR",
},
]
self.dirs = {}
for dir in dirs:
try:
self.dirs[dir["key"]] = os.environ[dir["envar"]]
except KeyError:
self.dirs[dir["key"]] = dir["default_path"]
if not os.path.isdir(self.dirs[dir["key"]]):
makedirs(self.dirs[dir["key"]])
self.admin_salt = uuid.uuid4().hex self.admin_salt = uuid.uuid4().hex
self.ph = PasswordHasher() self.ph = PasswordHasher()
self.db = sqlite3.connect(os.path.join(self.dirs["db_dir"], "users.db")) self.db = sqlite3.connect(os.path.join(self.db_dir, "users.db"))
self.cursor = self.db.cursor() self.cursor = self.db.cursor()
self.cursor.execute( self.cursor.execute("CREATE TABLE IF NOT EXISTS users(username, salt, hash, endpoint)")
"CREATE TABLE IF NOT EXISTS users(username, salt, hash, endpoint)" self.cursor.execute("CREATE TABLE IF NOT EXISTS reports(username, timestamp, endpoint)")
)
self.cursor.execute(
"CREATE TABLE IF NOT EXISTS reports(username, timestamp, endpoint)"
)
def get_or_create_user( def get_or_create_user(self, credentials: Annotated[HTTPBasicCredentials, Depends(security)], endpoint: str) -> tuple[str,str]:
self,
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
endpoint: str,
) -> tuple[str, str]:
username = credentials.username username = credentials.username
query = self.cursor.execute( query = self.cursor.execute(f"SELECT salt, hash FROM users WHERE username='{username}' and endpoint='{endpoint}'")
f"SELECT salt, hash FROM users WHERE username='{username}' and endpoint='{endpoint}'"
)
result = query.fetchone() result = query.fetchone()
# If the user is not in the database, we will trust the user and add it to the database # If the user is not in the database, we will trust the user and add it to the database
# TOFU: https://developer.mozilla.org/en-US/docs/Glossary/TOFU # TOFU: https://developer.mozilla.org/en-US/docs/Glossary/TOFU
if result is None and not self.disable_tofu: if result is None and not self.disable_tofu:
salt = uuid.uuid4().hex salt = uuid.uuid4().hex
hash = self.ph.hash(salt + credentials.password) hash = self.ph.hash(salt + credentials.password)
self.cursor.execute( self.cursor.execute(f"INSERT INTO users (username, salt, hash, endpoint) values('{username}', '{salt}', '{hash}', '{endpoint}')")
f"INSERT INTO users (username, salt, hash, endpoint) values('{username}', '{salt}', '{hash}', '{endpoint}')"
)
self.db.commit() self.db.commit()
return (salt, hash) return (salt, hash)
# FIXME: How can we best communicate that the user was not in the db? # FIXME: How can we best communicate that the user was not in the db?
@ -116,12 +83,7 @@ class Inventory:
else: else:
return result return result
def validate_credentials( def validate_credentials(self, credentials: Annotated[HTTPBasicCredentials, Depends(security)], salt: str, hash: str) -> bool:
self,
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
salt: str,
hash: str,
) -> bool:
password = credentials.password password = credentials.password
try: try:
self.ph.verify(hash, salt + password) self.ph.verify(hash, salt + password)
@ -129,44 +91,31 @@ class Inventory:
return False return False
return True return True
async def upload( async def upload(self, endpoint: str, file: UploadFile, name: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
self, dir = self.host_dir
endpoint: str, if endpoint == 'image':
file: UploadFile, dir = self.image_dir
name: str,
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
response: Response,
):
dir = self.dirs["host_dir"]
if endpoint == "image":
dir = self.dirs["image_dir"]
elif endpoint == "network":
dir = self.dirs["network_dir"]
if credentials.username != name: if credentials.username != name:
response.status_code = status.HTTP_403_FORBIDDEN response.status_code = status.HTTP_403_FORBIDDEN
return {"ERROR": "Username and endpoint does not match"} return {"ERROR": "Username and endpoint does not match"}
salt, hash = self.get_or_create_user(credentials, endpoint) salt, hash = self.get_or_create_user(credentials, endpoint)
if self.validate_credentials(credentials, salt, hash): if self.validate_credentials(credentials, salt, hash):
filename = os.path.join(dir, name + ".json") filename = os.path.join(dir, name + '.json')
with open(filename, "w") as fh: with open(filename, 'w') as fh:
contents: bytes = await file.read() contents: bytes = await file.read()
fh.write(contents.decode("utf-8")) fh.write(contents.decode('utf-8'))
return {"SUCCESS": f"File: {filename} saved"} return {"SUCCESS": f"File: {filename} saved"}
else: else:
try: try:
query = self.cursor.execute( query = self.cursor.execute(f"SELECT * FROM reports WHERE username='{credentials.username}' and endpoint={endpoint}")
f"SELECT * FROM reports WHERE username='{credentials.username}' and endpoint={endpoint}"
)
result = query.fetchone() result = query.fetchone()
except sqlite3.OperationalError: except sqlite3.OperationalError:
result = None result = None
if result is None: if result is None:
timestamp = int(time.time()) timestamp = int(time.time())
self.cursor.execute( self.cursor.execute(f"INSERT INTO reports (username, timestamp, endpoint) values('{credentials.username}','{timestamp}','{endpoint}')")
f"INSERT INTO reports (username, timestamp, endpoint) values('{credentials.username}','{timestamp}','{endpoint}')"
)
self.db.commit() self.db.commit()
response.status_code = status.HTTP_401_UNAUTHORIZED response.status_code = status.HTTP_401_UNAUTHORIZED
return {"ERROR": "Invalid password, this incident will be reported"} return {"ERROR": "Invalid password, this incident will be reported"}
@ -176,84 +125,33 @@ app = FastAPI()
inventory = Inventory() inventory = Inventory()
templates = Jinja2Templates(directory="templates") templates = Jinja2Templates(directory="templates")
@app.post("/host/{hostname}", status_code=status.HTTP_201_CREATED)
async def upload_host(file: UploadFile, hostname: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
return await inventory.upload('host', file, hostname, credentials, response)
@app.post("/image/{imagename}", status_code=status.HTTP_201_CREATED)
async def upload_image(file: UploadFile, imagename: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], response: Response):
return await inventory.upload('image', file, imagename, credentials, response)
@app.get("/admin", response_class=HTMLResponse) @app.get("/admin", response_class=HTMLResponse)
async def show_admin_interface( async def show_admin_interface(credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request):
credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request
):
hash = inventory.ph.hash(inventory.admin_salt + inventory.admin_password) hash = inventory.ph.hash(inventory.admin_salt + inventory.admin_password)
if ( if not inventory.validate_credentials(credentials, inventory.admin_salt, hash) and credentials.username != 'admin':
not inventory.validate_credentials(credentials, inventory.admin_salt, hash) return templates.TemplateResponse('unauthorized.html', {"request": request})
and credentials.username != "admin"
):
return templates.TemplateResponse("unauthorized.html", {"request": request})
query = inventory.cursor.execute(f"SELECT * FROM reports") query = inventory.cursor.execute(f"SELECT * FROM reports")
result = query.fetchall() result = query.fetchall()
reports = list() reports = list()
for res in result: for res in result:
reports.append({ reports.append({"username": res[0],"timestamp": datetime.datetime.fromtimestamp(int(res[1])), "endpoint": res[2]})
"username": res[0], return templates.TemplateResponse("admin.html", {"request": request, "reports": reports})
"timestamp": datetime.datetime.fromtimestamp(int(res[1])),
"endpoint": res[2],
})
return templates.TemplateResponse(
"admin.html", {"request": request, "reports": reports}
)
@app.post("/admin/{endpoint}/{name}", response_class=HTMLResponse) @app.post("/admin/{endpoint}/{name}", response_class=HTMLResponse)
async def delete_report_and_reset_user( async def delete_report_and_reset_user(endpoint: str, name: str, credentials: Annotated[HTTPBasicCredentials, Depends(security)], request: Request):
endpoint: str,
name: str,
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
request: Request,
):
hash = inventory.ph.hash(inventory.admin_salt + inventory.admin_password) hash = inventory.ph.hash(inventory.admin_salt + inventory.admin_password)
if ( if not inventory.validate_credentials(credentials, inventory.admin_salt, hash) and credentials.username != 'admin':
not inventory.validate_credentials(credentials, inventory.admin_salt, hash) return templates.TemplateResponse('unauthorized.html', {"request": request})
and credentials.username != "admin" inventory.cursor.execute(f"DELETE FROM reports where username='{name}' and endpoint='{endpoint}'")
):
return templates.TemplateResponse("unauthorized.html", {"request": request})
inventory.cursor.execute(
f"DELETE FROM reports where username='{name}' and endpoint='{endpoint}'"
)
inventory.db.commit() inventory.db.commit()
inventory.cursor.execute( inventory.cursor.execute(f"DELETE FROM users where username='{name}' and endpoint='{endpoint}'")
f"DELETE FROM users where username='{name}' and endpoint='{endpoint}'"
)
inventory.db.commit() inventory.db.commit()
return templates.TemplateResponse( return templates.TemplateResponse('delete_report_and_reset_user.html', {"request": request, "username": name, "endpoint": endpoint})
"delete_report_and_reset_user.html",
{"request": request, "username": name, "endpoint": endpoint},
)
@app.post("/host/{hostname}", status_code=status.HTTP_201_CREATED)
async def upload_host(
file: UploadFile,
hostname: str,
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
response: Response,
):
return await inventory.upload("host", file, hostname, credentials, response)
@app.post("/image/{imagename}", status_code=status.HTTP_201_CREATED)
async def upload_image(
file: UploadFile,
imagename: str,
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
response: Response,
):
return await inventory.upload("image", file, imagename, credentials, response)
@app.post("/network/{equipmenttype}", status_code=status.HTTP_201_CREATED)
async def upload_network(
file: UploadFile,
equipmenttype: str,
credentials: Annotated[HTTPBasicCredentials, Depends(security)],
response: Response,
):
return await inventory.upload("network", file, equipmenttype, credentials, response)

Loading…
Cancel
Save