Unfinished session work
This commit is contained in:
parent
3f541a2dcb
commit
4ae27db51e
5 changed files with 53 additions and 12 deletions
31
app.py
31
app.py
|
@ -1,11 +1,14 @@
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
|
from base64 import b64decode, b64encode
|
||||||
from typing import Union
|
from typing import Union
|
||||||
|
from urllib.parse import quote, unquote
|
||||||
|
|
||||||
from flask import (Flask, flash, redirect, render_template, request,
|
from flask import (Flask, flash, redirect, render_template, request,
|
||||||
send_from_directory, url_for)
|
send_from_directory, url_for)
|
||||||
from flask_login import LoginManager, login_required, login_user, logout_user
|
from flask_login import LoginManager, login_required, login_user, logout_user
|
||||||
from urllib.parse import unquote
|
|
||||||
|
|
||||||
|
from auth import Auth
|
||||||
from forms import LoginForm
|
from forms import LoginForm
|
||||||
from latosa import LaToSa
|
from latosa import LaToSa
|
||||||
from user import User
|
from user import User
|
||||||
|
@ -34,18 +37,36 @@ def load_user(user_id) -> Union[User, None]:
|
||||||
@app.route('/', methods=['GET', 'POST'])
|
@app.route('/', methods=['GET', 'POST'])
|
||||||
def index():
|
def index():
|
||||||
i18n = latosa.get_i18n(request)
|
i18n = latosa.get_i18n(request)
|
||||||
redirect_to = request.args.get('redirect_to', None)
|
session = request.args.get('session', None)
|
||||||
|
client_id = request.args.get('client_id', None)
|
||||||
form = LoginForm()
|
form = LoginForm()
|
||||||
form.redirect_to.data = redirect_to
|
form.session.data = session
|
||||||
|
form.session.data = client_id
|
||||||
if request.method == 'POST':
|
if request.method == 'POST':
|
||||||
username = form.username.data
|
username = form.username.data
|
||||||
password = form.password.data
|
password = form.password.data
|
||||||
redirect_to = url_for('admin')
|
redirect_to = url_for('admin')
|
||||||
if form.redirect_to.data:
|
|
||||||
redirect_to = unquote(form.redirect_to.data)
|
|
||||||
user = latosa.login_user(username, password)
|
user = latosa.login_user(username, password)
|
||||||
if user:
|
if user:
|
||||||
login_user(user)
|
login_user(user)
|
||||||
|
session = unquote(b64decode(form.session.data))
|
||||||
|
client_id = form.client_id.data
|
||||||
|
if session and client_id:
|
||||||
|
session_auth = Auth()
|
||||||
|
session_data = session_auth.decrypt(client_id, session)
|
||||||
|
if 'redirect_to' in session_data:
|
||||||
|
response_data = {
|
||||||
|
'status': 'success',
|
||||||
|
'uid': user.uid,
|
||||||
|
'email': user.email,
|
||||||
|
'display_name': user.display_name,
|
||||||
|
'groups': user.groups,
|
||||||
|
'timestamp': time.time()
|
||||||
|
}
|
||||||
|
cyphertext = session_auth.encrypt(client_id, response_data)
|
||||||
|
b64 = quote(b64encode(cyphertext).decode('utf-8'))
|
||||||
|
url = session_data['redirect_to']
|
||||||
|
redirect_to = f'{url}?session={b64}'
|
||||||
return redirect(redirect_to)
|
return redirect(redirect_to)
|
||||||
flash(i18n['login']['failed'])
|
flash(i18n['login']['failed'])
|
||||||
return render_template('index.html', i18n=i18n, form=form)
|
return render_template('index.html', i18n=i18n, form=form)
|
||||||
|
|
17
auth.py
Normal file
17
auth.py
Normal file
|
@ -0,0 +1,17 @@
|
||||||
|
import json
|
||||||
|
|
||||||
|
from cryptography.fernet import Fernet
|
||||||
|
|
||||||
|
|
||||||
|
class Auth:
|
||||||
|
|
||||||
|
def get_key(self, client_id):
|
||||||
|
return Fernet.generate_key()
|
||||||
|
|
||||||
|
def decrypt(self, cyphertext, client_id) -> dict:
|
||||||
|
return json.loads(
|
||||||
|
Fernet(self.get_key(client_id)).decrypt(cyphertext).decode())
|
||||||
|
|
||||||
|
def encrypt(self, data, client_id) -> bytes:
|
||||||
|
return Fernet(self.get_key(client_id)).encrypt(
|
||||||
|
json.dumps(data).encode('utf-8'))
|
3
forms.py
3
forms.py
|
@ -6,4 +6,5 @@ class LoginForm(FlaskForm):
|
||||||
username = StringField('Username', validators=[DataRequired()])
|
username = StringField('Username', validators=[DataRequired()])
|
||||||
password = PasswordField('Password', validators=[DataRequired()])
|
password = PasswordField('Password', validators=[DataRequired()])
|
||||||
submit = SubmitField('Submit')
|
submit = SubmitField('Submit')
|
||||||
redirect_to = HiddenField('redirect_to')
|
session = HiddenField('session')
|
||||||
|
client_id = HiddenField('client_id')
|
||||||
|
|
|
@ -1,5 +1,6 @@
|
||||||
flask==3.0.2
|
cryptography==42.0.5
|
||||||
flask-login==0.6.3
|
|
||||||
flask-bcrypt==1.0.1
|
flask-bcrypt==1.0.1
|
||||||
|
flask-login==0.6.3
|
||||||
flask-wtf==1.2.1
|
flask-wtf==1.2.1
|
||||||
|
flask==3.0.2
|
||||||
pyyaml==6.0.1
|
pyyaml==6.0.1
|
||||||
|
|
9
user.py
9
user.py
|
@ -1,14 +1,15 @@
|
||||||
from flask import Flask
|
from flask import Flask
|
||||||
from flask_bcrypt import Bcrypt
|
from flask_bcrypt import Bcrypt
|
||||||
class User:
|
class User:
|
||||||
def __init__(self, app: Flask, uid: str, display_name: str, email:str, password: str, admin: bool = False):
|
def __init__(self, app: Flask, uid: str, display_name: str, email:str, password: str, admin: bool = False, groups: list[str] = []):
|
||||||
self.uid = uid
|
|
||||||
self.display_name = display_name
|
self.display_name = display_name
|
||||||
self.email = email
|
self.email = email
|
||||||
self.is_admin = admin
|
self.groups = groups
|
||||||
self.is_authenticated = False
|
|
||||||
self.is_active = False
|
self.is_active = False
|
||||||
|
self.is_admin = admin
|
||||||
self.is_anonymous = False
|
self.is_anonymous = False
|
||||||
|
self.is_authenticated = False
|
||||||
|
self.uid = uid
|
||||||
self.bcrypt = Bcrypt(app)
|
self.bcrypt = Bcrypt(app)
|
||||||
self.salt = self.get_salt()
|
self.salt = self.get_salt()
|
||||||
self.password_hash = self.bcrypt.generate_password_hash(password + self.salt).decode('utf-8')
|
self.password_hash = self.bcrypt.generate_password_hash(password + self.salt).decode('utf-8')
|
||||||
|
|
Loading…
Add table
Reference in a new issue