diff --git a/knotctl/__init__.py b/knotctl/__init__.py index a768fd2..399dc98 100755 --- a/knotctl/__init__.py +++ b/knotctl/__init__.py @@ -2,86 +2,40 @@ import argparse import getpass -import json import os import sys -import urllib.parse -from os import environ, mkdir -from os.path import isdir, isfile, join from typing import Union -from urllib.parse import urlparse import argcomplete -import openstack -import openstack.config.loader import requests -import yaml from requests.models import HTTPBasicAuth from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError +from .config import Config + +from .openstack import get_openstack_addresses +from .utils import error, output, setup_url, split_url + try: from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError except ImportError: from requests.exceptions import InvalidJSONError as RequestsJSONDecodeError -# Helper functions -def error(description: str, error: str) -> list[dict]: - response = [] - reply = {} - # https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406 - reply["Code"] = 406 - reply["Description"] = description - reply["Error"] = error - response.append(reply) - return response - - -def get_config(config_filename: str): - if not isfile(config_filename): - print("You need to configure knotctl before proceeding") - run_config(config_filename) - with open(config_filename, "r") as fh: - return yaml.safe_load(fh.read()) - - -def get_openstack_addresses(cloud: str, name: str): - conn = openstack.connect(cloud=cloud) - - # List the servers - server = conn.compute.find_server(name) - if server is None: - print("Server not found") - exit(1) - openstack_addresses = [] - for network in server.addresses: - for address in server.addresses[network]: - openstack_addresses.append(address) - return openstack_addresses - - -def nested_out(input, tabs="") -> str: - string = "" - if isinstance(input, str) or isinstance(input, int): - string += "{}\n".format(input) - elif isinstance(input, dict): - for key, value in input.items(): - string += "{}{}: {}".format(tabs, key, - nested_out(value, tabs + " ")) - elif isinstance(input, list): - for entry in input: - string += "{}\n{}".format(tabs, nested_out(entry, tabs + " ")) - return string - - -def output(response: list[dict], jsonout: bool = False): - try: - if jsonout: - print(json.dumps(response)) - else: - print(nested_out(response)) - except BrokenPipeError: - pass +class Knotctl: + + def __init__(self): + self.conf = Config() + self.config = self.get_config() + self.config_filename = self.conf.config_filename + + def get_config(self): + config = self.conf.get_config() + if not config: + print("You need to configure knotctl before proceeding") + run_config() + + return config # Define the runner for each command @@ -152,7 +106,6 @@ def run_complete(shell: Union[None, str]): def run_config( - config_filename: str, context: Union[None, str] = None, baseurl: Union[None, str] = None, list_config: bool = False, @@ -160,29 +113,18 @@ def run_config( password: Union[None, str] = None, current: Union[None, str] = None, ): + conf = Config() if current: - if os.path.islink(config_filename): - actual_path = os.readlink(config_filename) - print(actual_path.split("-")[-1]) - else: - print("none") + print(conf.get_current()) return config = {"baseurl": baseurl, "username": username, "password": password} needed = [] if context: - symlink = f"{config_filename}-{context}" - found = os.path.isfile(symlink) - if os.path.islink(config_filename): - os.remove(config_filename) - elif os.path.isfile(config_filename): - os.rename(config_filename, symlink) - os.symlink(symlink, config_filename) - config_filename = symlink + found = conf.set_context(context) if found: return if list_config: - config_data = get_config(config_filename) - config_data.pop("password", None) + config_data = conf.get_config_data() output(config_data) return if not baseurl: @@ -206,8 +148,7 @@ def run_config( output(error("Can not configure without password", "No password")) sys.exit(1) - with open(config_filename, "w") as fh: - fh.write(yaml.dump(config)) + conf.set_config(config) def run_delete(url: str, jsonout: bool, headers: dict): @@ -280,7 +221,7 @@ def run_openstack_sync(cloud: str, name: str, zone: str, headers: dict, elif address.version == 6: rtype = "AAAA" curripv6 = True - if rtype and recor.type == rtype: + if rtype and record.type == rtype: if record.data == address.addr: continue else: @@ -361,91 +302,6 @@ def run_zone(url: str, output(string, jsonout) -# Set up the url -def setup_url( - baseurl: str, - arguments: Union[None, list[str]], - data: Union[None, str], - name: Union[None, str], - rtype: Union[None, str], - ttl: Union[None, str], - zone: Union[None, str], -) -> str: - url = baseurl + "/zones" - if zone: - if not zone.endswith("."): - zone += "." - url += "/{}".format(zone) - if name and zone: - if name.endswith(zone.rstrip(".")): - name += "." - url += "/records/{}".format(name) - if zone and name and rtype: - url += "/{}".format(rtype) - if data and zone and name and rtype: - url += "/{}".format(data) - if ttl and data and zone and name and rtype: - url += "/{}".format(ttl) - if data and zone and name and rtype and arguments: - url += "?" - for arg in arguments: - if not url.endswith("?"): - url += "&" - key, value = arg.split("=") - url += key + "=" + urllib.parse.quote_plus(value) - - if ttl and (not rtype or not name or not zone): - output( - error( - "ttl only makes sense with rtype, name and zone", - "Missing parameter", - )) - sys.exit(1) - if rtype and (not name or not zone): - output( - error( - "rtype only makes sense with name and zone", - "Missing parameter", - )) - sys.exit(1) - if name and not zone: - output(error("name only makes sense with a zone", "Missing parameter")) - sys.exit(1) - return url - - -def split_url(url: str) -> dict: - parsed = urlparse(url, allow_fragments=False) - path = parsed.path - query = parsed.query - arguments: Union[None, list[str]] = query.split("&") - path_arr = path.split("/") - data: Union[None, str] = None - name: Union[None, str] = None - rtype: Union[None, str] = None - ttl: Union[None, str] = None - zone: Union[None, str] = None - if len(path_arr) > 2: - zone = path_arr[2] - if len(path_arr) > 4: - name = path_arr[4] - if len(path_arr) > 5: - rtype = path_arr[5] - if len(path_arr) > 6: - data = path_arr[6] - if len(path_arr) > 7: - ttl = path_arr[7] - - return { - "arguments": arguments, - "data": data, - "name": name, - "rtype": rtype, - "ttl": ttl, - "zone": zone, - } - - def get_parser() -> dict: description = """Manage DNS records with knot dns rest api: * https://gitlab.nic.cz/knot/knot-dns-rest""" @@ -665,16 +521,10 @@ def main() -> int: run_complete(args.shell) return 0 - # Make sure we have config - config_basepath = join(environ["HOME"], ".knot") - config_filename = join(config_basepath, "config") - - if not isdir(config_basepath): - mkdir(config_basepath) + knotctl = Knotctl() if args.command == "config": run_config( - config_filename, args.context, args.baseurl, args.list_config, @@ -684,7 +534,7 @@ def main() -> int: ) return 0 - config = get_config(config_filename) + config = knotctl.get_config() baseurl = config["baseurl"] token = get_token(config) if token == "": diff --git a/knotctl/config/__init__.py b/knotctl/config/__init__.py new file mode 100755 index 0000000..7c2faff --- /dev/null +++ b/knotctl/config/__init__.py @@ -0,0 +1,61 @@ +#!/usr/bin/env python3 +import os +from os import mkdir +from os.path import isdir, isfile, join +from typing import Union + +import yaml + + +class Config: + + def __init__(self): + # Make sure we have config + self.config_basepath = join(os.environ["HOME"], ".knot") + self.config_filename = join(self.config_basepath, "config") + if not isdir(self.config_basepath): + mkdir(self.config_basepath) + + def get_config(self) -> Union[None, dict]: + if not isfile(self.config_filename): + return None + with open(self.config_filename, "r") as fh: + return yaml.safe_load(fh.read()) + + def get_config_data(self) -> dict: + config_data = self.get_config() + config_data.pop("password", None) + return config_data + + def get_current(self) -> str: + if os.path.islink(self.config_filename): + actual_path = os.readlink(self.config_filename) + return actual_path.split("-")[-1] + else: + return "none" + + def set_context(self, context) -> bool: + symlink = f"{self.config_filename}-{context}" + found = os.path.isfile(symlink) + if os.path.islink(self.config_filename): + os.remove(self.config_filename) + elif os.path.isfile(self.config_filename): + os.rename(self.config_filename, symlink) + os.symlink(symlink, self.config_filename) + self.config_filename = symlink + return found + + def set_config( + self, + baseurl: str, + username: str, + password: str, + ): + config = { + "baseurl": baseurl, + "username": username, + "password": password + } + + with open(self.config_filename, "w") as fh: + fh.write(yaml.dump(config)) diff --git a/knotctl/openstack/__init__.py b/knotctl/openstack/__init__.py new file mode 100644 index 0000000..537dde3 --- /dev/null +++ b/knotctl/openstack/__init__.py @@ -0,0 +1,17 @@ +import openstack +import openstack.config.loader + + +def get_openstack_addresses(cloud: str, name: str): + conn = openstack.connect(cloud=cloud) + + # List the servers + server = conn.compute.find_server(name) + if server is None: + print("Server not found") + exit(1) + openstack_addresses = [] + for network in server.addresses: + for address in server.addresses[network]: + openstack_addresses.append(address) + return openstack_addresses diff --git a/knotctl/utils/__init__.py b/knotctl/utils/__init__.py new file mode 100644 index 0000000..80e53f6 --- /dev/null +++ b/knotctl/utils/__init__.py @@ -0,0 +1,123 @@ +import json +import sys +import urllib.parse +from typing import Union +from urllib.parse import urlparse + + +def error(description: str, error: str): + response = [] + reply = {} + # https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406 + reply["Code"] = 406 + reply["Description"] = description + reply["Error"] = error + response.append(reply) + return response + + +def nested_out(input, tabs="") -> str: + string = "" + if isinstance(input, str) or isinstance(input, int): + string += f"{input}\n" + elif isinstance(input, dict): + for key, value in input.items(): + string += f"{tabs}{key}: {nested_out(value, tabs + " ")}" + elif isinstance(input, list): + for entry in input: + string += f"{tabs}\n{nested_out(entry, tabs + ' ')}" + return string + + +def output(response: list[dict], jsonout: bool = False): + try: + if jsonout: + print(json.dumps(response)) + else: + print(nested_out(response)) + except BrokenPipeError: + pass + + +def setup_url( + baseurl: str, + arguments: Union[None, list[str]], + data: Union[None, str], + name: Union[None, str], + rtype: Union[None, str], + ttl: Union[None, str], + zone: Union[None, str], +) -> str: + url = baseurl + "/zones" + if zone: + if not zone.endswith("."): + zone += "." + url += "/{}".format(zone) + if name and zone: + if name.endswith(zone.rstrip(".")): + name += "." + url += "/records/{}".format(name) + if zone and name and rtype: + url += "/{}".format(rtype) + if data and zone and name and rtype: + url += "/{}".format(data) + if ttl and data and zone and name and rtype: + url += "/{}".format(ttl) + if data and zone and name and rtype and arguments: + url += "?" + for arg in arguments: + if not url.endswith("?"): + url += "&" + key, value = arg.split("=") + url += key + "=" + urllib.parse.quote_plus(value) + + if ttl and (not rtype or not name or not zone): + output( + error( + "ttl only makes sense with rtype, name and zone", + "Missing parameter", + )) + sys.exit(1) + if rtype and (not name or not zone): + output( + error( + "rtype only makes sense with name and zone", + "Missing parameter", + )) + sys.exit(1) + if name and not zone: + output(error("name only makes sense with a zone", "Missing parameter")) + sys.exit(1) + return url + + +def split_url(url: str) -> dict: + parsed = urlparse(url, allow_fragments=False) + path = parsed.path + query = parsed.query + arguments: Union[None, list[str]] = query.split("&") + path_arr = path.split("/") + data: Union[None, str] = None + name: Union[None, str] = None + rtype: Union[None, str] = None + ttl: Union[None, str] = None + zone: Union[None, str] = None + if len(path_arr) > 2: + zone = path_arr[2] + if len(path_arr) > 4: + name = path_arr[4] + if len(path_arr) > 5: + rtype = path_arr[5] + if len(path_arr) > 6: + data = path_arr[6] + if len(path_arr) > 7: + ttl = path_arr[7] + + return { + "arguments": arguments, + "data": data, + "name": name, + "rtype": rtype, + "ttl": ttl, + "zone": zone, + }