Begin refactor

kano-refactor-lib
Micke Nordin 2 weeks ago
parent 33a65181a5
commit d1aec6bbb0
Signed by: micke
GPG Key ID: 0DA0A7A5708FE257

@ -2,86 +2,40 @@
import argparse import argparse
import getpass import getpass
import json
import os import os
import sys import sys
import urllib.parse
from os import environ, mkdir
from os.path import isdir, isfile, join
from typing import Union from typing import Union
from urllib.parse import urlparse
import argcomplete import argcomplete
import openstack
import openstack.config.loader
import requests import requests
import yaml
from requests.models import HTTPBasicAuth from requests.models import HTTPBasicAuth
from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError
from .config import Config
from .openstack import get_openstack_addresses
from .utils import error, output, setup_url, split_url
try: try:
from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError
except ImportError: except ImportError:
from requests.exceptions import InvalidJSONError as RequestsJSONDecodeError from requests.exceptions import InvalidJSONError as RequestsJSONDecodeError
# Helper functions class Knotctl:
def error(description: str, error: str) -> list[dict]:
response = [] def __init__(self):
reply = {} self.conf = Config()
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406 self.config = self.get_config()
reply["Code"] = 406 self.config_filename = self.conf.config_filename
reply["Description"] = description
reply["Error"] = error def get_config(self):
response.append(reply) config = self.conf.get_config()
return response if not config:
print("You need to configure knotctl before proceeding")
run_config()
def get_config(config_filename: str):
if not isfile(config_filename): return config
print("You need to configure knotctl before proceeding")
run_config(config_filename)
with open(config_filename, "r") as fh:
return yaml.safe_load(fh.read())
def get_openstack_addresses(cloud: str, name: str):
conn = openstack.connect(cloud=cloud)
# List the servers
server = conn.compute.find_server(name)
if server is None:
print("Server not found")
exit(1)
openstack_addresses = []
for network in server.addresses:
for address in server.addresses[network]:
openstack_addresses.append(address)
return openstack_addresses
def nested_out(input, tabs="") -> str:
string = ""
if isinstance(input, str) or isinstance(input, int):
string += "{}\n".format(input)
elif isinstance(input, dict):
for key, value in input.items():
string += "{}{}: {}".format(tabs, key,
nested_out(value, tabs + " "))
elif isinstance(input, list):
for entry in input:
string += "{}\n{}".format(tabs, nested_out(entry, tabs + " "))
return string
def output(response: list[dict], jsonout: bool = False):
try:
if jsonout:
print(json.dumps(response))
else:
print(nested_out(response))
except BrokenPipeError:
pass
# Define the runner for each command # Define the runner for each command
@ -152,7 +106,6 @@ def run_complete(shell: Union[None, str]):
def run_config( def run_config(
config_filename: str,
context: Union[None, str] = None, context: Union[None, str] = None,
baseurl: Union[None, str] = None, baseurl: Union[None, str] = None,
list_config: bool = False, list_config: bool = False,
@ -160,29 +113,18 @@ def run_config(
password: Union[None, str] = None, password: Union[None, str] = None,
current: Union[None, str] = None, current: Union[None, str] = None,
): ):
conf = Config()
if current: if current:
if os.path.islink(config_filename): print(conf.get_current())
actual_path = os.readlink(config_filename)
print(actual_path.split("-")[-1])
else:
print("none")
return return
config = {"baseurl": baseurl, "username": username, "password": password} config = {"baseurl": baseurl, "username": username, "password": password}
needed = [] needed = []
if context: if context:
symlink = f"{config_filename}-{context}" found = conf.set_context(context)
found = os.path.isfile(symlink)
if os.path.islink(config_filename):
os.remove(config_filename)
elif os.path.isfile(config_filename):
os.rename(config_filename, symlink)
os.symlink(symlink, config_filename)
config_filename = symlink
if found: if found:
return return
if list_config: if list_config:
config_data = get_config(config_filename) config_data = conf.get_config_data()
config_data.pop("password", None)
output(config_data) output(config_data)
return return
if not baseurl: if not baseurl:
@ -206,8 +148,7 @@ def run_config(
output(error("Can not configure without password", "No password")) output(error("Can not configure without password", "No password"))
sys.exit(1) sys.exit(1)
with open(config_filename, "w") as fh: conf.set_config(config)
fh.write(yaml.dump(config))
def run_delete(url: str, jsonout: bool, headers: dict): def run_delete(url: str, jsonout: bool, headers: dict):
@ -280,7 +221,7 @@ def run_openstack_sync(cloud: str, name: str, zone: str, headers: dict,
elif address.version == 6: elif address.version == 6:
rtype = "AAAA" rtype = "AAAA"
curripv6 = True curripv6 = True
if rtype and recor.type == rtype: if rtype and record.type == rtype:
if record.data == address.addr: if record.data == address.addr:
continue continue
else: else:
@ -361,91 +302,6 @@ def run_zone(url: str,
output(string, jsonout) output(string, jsonout)
# Set up the url
def setup_url(
baseurl: str,
arguments: Union[None, list[str]],
data: Union[None, str],
name: Union[None, str],
rtype: Union[None, str],
ttl: Union[None, str],
zone: Union[None, str],
) -> str:
url = baseurl + "/zones"
if zone:
if not zone.endswith("."):
zone += "."
url += "/{}".format(zone)
if name and zone:
if name.endswith(zone.rstrip(".")):
name += "."
url += "/records/{}".format(name)
if zone and name and rtype:
url += "/{}".format(rtype)
if data and zone and name and rtype:
url += "/{}".format(data)
if ttl and data and zone and name and rtype:
url += "/{}".format(ttl)
if data and zone and name and rtype and arguments:
url += "?"
for arg in arguments:
if not url.endswith("?"):
url += "&"
key, value = arg.split("=")
url += key + "=" + urllib.parse.quote_plus(value)
if ttl and (not rtype or not name or not zone):
output(
error(
"ttl only makes sense with rtype, name and zone",
"Missing parameter",
))
sys.exit(1)
if rtype and (not name or not zone):
output(
error(
"rtype only makes sense with name and zone",
"Missing parameter",
))
sys.exit(1)
if name and not zone:
output(error("name only makes sense with a zone", "Missing parameter"))
sys.exit(1)
return url
def split_url(url: str) -> dict:
parsed = urlparse(url, allow_fragments=False)
path = parsed.path
query = parsed.query
arguments: Union[None, list[str]] = query.split("&")
path_arr = path.split("/")
data: Union[None, str] = None
name: Union[None, str] = None
rtype: Union[None, str] = None
ttl: Union[None, str] = None
zone: Union[None, str] = None
if len(path_arr) > 2:
zone = path_arr[2]
if len(path_arr) > 4:
name = path_arr[4]
if len(path_arr) > 5:
rtype = path_arr[5]
if len(path_arr) > 6:
data = path_arr[6]
if len(path_arr) > 7:
ttl = path_arr[7]
return {
"arguments": arguments,
"data": data,
"name": name,
"rtype": rtype,
"ttl": ttl,
"zone": zone,
}
def get_parser() -> dict: def get_parser() -> dict:
description = """Manage DNS records with knot dns rest api: description = """Manage DNS records with knot dns rest api:
* https://gitlab.nic.cz/knot/knot-dns-rest""" * https://gitlab.nic.cz/knot/knot-dns-rest"""
@ -665,16 +521,10 @@ def main() -> int:
run_complete(args.shell) run_complete(args.shell)
return 0 return 0
# Make sure we have config knotctl = Knotctl()
config_basepath = join(environ["HOME"], ".knot")
config_filename = join(config_basepath, "config")
if not isdir(config_basepath):
mkdir(config_basepath)
if args.command == "config": if args.command == "config":
run_config( run_config(
config_filename,
args.context, args.context,
args.baseurl, args.baseurl,
args.list_config, args.list_config,
@ -684,7 +534,7 @@ def main() -> int:
) )
return 0 return 0
config = get_config(config_filename) config = knotctl.get_config()
baseurl = config["baseurl"] baseurl = config["baseurl"]
token = get_token(config) token = get_token(config)
if token == "": if token == "":

@ -0,0 +1,61 @@
#!/usr/bin/env python3
import os
from os import mkdir
from os.path import isdir, isfile, join
from typing import Union
import yaml
class Config:
def __init__(self):
# Make sure we have config
self.config_basepath = join(os.environ["HOME"], ".knot")
self.config_filename = join(self.config_basepath, "config")
if not isdir(self.config_basepath):
mkdir(self.config_basepath)
def get_config(self) -> Union[None, dict]:
if not isfile(self.config_filename):
return None
with open(self.config_filename, "r") as fh:
return yaml.safe_load(fh.read())
def get_config_data(self) -> dict:
config_data = self.get_config()
config_data.pop("password", None)
return config_data
def get_current(self) -> str:
if os.path.islink(self.config_filename):
actual_path = os.readlink(self.config_filename)
return actual_path.split("-")[-1]
else:
return "none"
def set_context(self, context) -> bool:
symlink = f"{self.config_filename}-{context}"
found = os.path.isfile(symlink)
if os.path.islink(self.config_filename):
os.remove(self.config_filename)
elif os.path.isfile(self.config_filename):
os.rename(self.config_filename, symlink)
os.symlink(symlink, self.config_filename)
self.config_filename = symlink
return found
def set_config(
self,
baseurl: str,
username: str,
password: str,
):
config = {
"baseurl": baseurl,
"username": username,
"password": password
}
with open(self.config_filename, "w") as fh:
fh.write(yaml.dump(config))

@ -0,0 +1,17 @@
import openstack
import openstack.config.loader
def get_openstack_addresses(cloud: str, name: str):
conn = openstack.connect(cloud=cloud)
# List the servers
server = conn.compute.find_server(name)
if server is None:
print("Server not found")
exit(1)
openstack_addresses = []
for network in server.addresses:
for address in server.addresses[network]:
openstack_addresses.append(address)
return openstack_addresses

@ -0,0 +1,123 @@
import json
import sys
import urllib.parse
from typing import Union
from urllib.parse import urlparse
def error(description: str, error: str):
response = []
reply = {}
# https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406
reply["Code"] = 406
reply["Description"] = description
reply["Error"] = error
response.append(reply)
return response
def nested_out(input, tabs="") -> str:
string = ""
if isinstance(input, str) or isinstance(input, int):
string += f"{input}\n"
elif isinstance(input, dict):
for key, value in input.items():
string += f"{tabs}{key}: {nested_out(value, tabs + " ")}"
elif isinstance(input, list):
for entry in input:
string += f"{tabs}\n{nested_out(entry, tabs + ' ')}"
return string
def output(response: list[dict], jsonout: bool = False):
try:
if jsonout:
print(json.dumps(response))
else:
print(nested_out(response))
except BrokenPipeError:
pass
def setup_url(
baseurl: str,
arguments: Union[None, list[str]],
data: Union[None, str],
name: Union[None, str],
rtype: Union[None, str],
ttl: Union[None, str],
zone: Union[None, str],
) -> str:
url = baseurl + "/zones"
if zone:
if not zone.endswith("."):
zone += "."
url += "/{}".format(zone)
if name and zone:
if name.endswith(zone.rstrip(".")):
name += "."
url += "/records/{}".format(name)
if zone and name and rtype:
url += "/{}".format(rtype)
if data and zone and name and rtype:
url += "/{}".format(data)
if ttl and data and zone and name and rtype:
url += "/{}".format(ttl)
if data and zone and name and rtype and arguments:
url += "?"
for arg in arguments:
if not url.endswith("?"):
url += "&"
key, value = arg.split("=")
url += key + "=" + urllib.parse.quote_plus(value)
if ttl and (not rtype or not name or not zone):
output(
error(
"ttl only makes sense with rtype, name and zone",
"Missing parameter",
))
sys.exit(1)
if rtype and (not name or not zone):
output(
error(
"rtype only makes sense with name and zone",
"Missing parameter",
))
sys.exit(1)
if name and not zone:
output(error("name only makes sense with a zone", "Missing parameter"))
sys.exit(1)
return url
def split_url(url: str) -> dict:
parsed = urlparse(url, allow_fragments=False)
path = parsed.path
query = parsed.query
arguments: Union[None, list[str]] = query.split("&")
path_arr = path.split("/")
data: Union[None, str] = None
name: Union[None, str] = None
rtype: Union[None, str] = None
ttl: Union[None, str] = None
zone: Union[None, str] = None
if len(path_arr) > 2:
zone = path_arr[2]
if len(path_arr) > 4:
name = path_arr[4]
if len(path_arr) > 5:
rtype = path_arr[5]
if len(path_arr) > 6:
data = path_arr[6]
if len(path_arr) > 7:
ttl = path_arr[7]
return {
"arguments": arguments,
"data": data,
"name": name,
"rtype": rtype,
"ttl": ttl,
"zone": zone,
}
Loading…
Cancel
Save