From d1aec6bbb030fee6d747f43cbd5906ee69194104 Mon Sep 17 00:00:00 2001
From: Micke Nordin <kano@sunet.se>
Date: Fri, 17 Jan 2025 11:44:01 +0100
Subject: [PATCH] Begin refactor

---
 knotctl/__init__.py           | 198 +++++-----------------------------
 knotctl/config/__init__.py    |  61 +++++++++++
 knotctl/openstack/__init__.py |  17 +++
 knotctl/utils/__init__.py     | 123 +++++++++++++++++++++
 4 files changed, 225 insertions(+), 174 deletions(-)
 create mode 100755 knotctl/config/__init__.py
 create mode 100644 knotctl/openstack/__init__.py
 create mode 100644 knotctl/utils/__init__.py

diff --git a/knotctl/__init__.py b/knotctl/__init__.py
index a768fd2..399dc98 100755
--- a/knotctl/__init__.py
+++ b/knotctl/__init__.py
@@ -2,86 +2,40 @@
 
 import argparse
 import getpass
-import json
 import os
 import sys
-import urllib.parse
-from os import environ, mkdir
-from os.path import isdir, isfile, join
 from typing import Union
-from urllib.parse import urlparse
 
 import argcomplete
-import openstack
-import openstack.config.loader
 import requests
-import yaml
 from requests.models import HTTPBasicAuth
 from simplejson.errors import JSONDecodeError as SimplejsonJSONDecodeError
 
+from .config import Config
+
+from .openstack import get_openstack_addresses
+from .utils import error, output, setup_url, split_url
+
 try:
     from requests.exceptions import JSONDecodeError as RequestsJSONDecodeError
 except ImportError:
     from requests.exceptions import InvalidJSONError as RequestsJSONDecodeError
 
 
-# Helper functions
-def error(description: str, error: str) -> list[dict]:
-    response = []
-    reply = {}
-    # https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406
-    reply["Code"] = 406
-    reply["Description"] = description
-    reply["Error"] = error
-    response.append(reply)
-    return response
+class Knotctl:
 
+    def __init__(self):
+        self.conf = Config()
+        self.config = self.get_config()
+        self.config_filename = self.conf.config_filename
 
-def get_config(config_filename: str):
-    if not isfile(config_filename):
-        print("You need to configure knotctl before proceeding")
-        run_config(config_filename)
-    with open(config_filename, "r") as fh:
-        return yaml.safe_load(fh.read())
+    def get_config(self):
+        config = self.conf.get_config()
+        if not config:
+            print("You need to configure knotctl before proceeding")
+            run_config()
 
-
-def get_openstack_addresses(cloud: str, name: str):
-    conn = openstack.connect(cloud=cloud)
-
-    # List the servers
-    server = conn.compute.find_server(name)
-    if server is None:
-        print("Server not found")
-        exit(1)
-    openstack_addresses = []
-    for network in server.addresses:
-        for address in server.addresses[network]:
-            openstack_addresses.append(address)
-    return openstack_addresses
-
-
-def nested_out(input, tabs="") -> str:
-    string = ""
-    if isinstance(input, str) or isinstance(input, int):
-        string += "{}\n".format(input)
-    elif isinstance(input, dict):
-        for key, value in input.items():
-            string += "{}{}: {}".format(tabs, key,
-                                        nested_out(value, tabs + " "))
-    elif isinstance(input, list):
-        for entry in input:
-            string += "{}\n{}".format(tabs, nested_out(entry, tabs + " "))
-    return string
-
-
-def output(response: list[dict], jsonout: bool = False):
-    try:
-        if jsonout:
-            print(json.dumps(response))
-        else:
-            print(nested_out(response))
-    except BrokenPipeError:
-        pass
+        return config
 
 
 # Define the runner for each command
@@ -152,7 +106,6 @@ def run_complete(shell: Union[None, str]):
 
 
 def run_config(
-    config_filename: str,
     context: Union[None, str] = None,
     baseurl: Union[None, str] = None,
     list_config: bool = False,
@@ -160,29 +113,18 @@ def run_config(
     password: Union[None, str] = None,
     current: Union[None, str] = None,
 ):
+    conf = Config()
     if current:
-        if os.path.islink(config_filename):
-            actual_path = os.readlink(config_filename)
-            print(actual_path.split("-")[-1])
-        else:
-            print("none")
+        print(conf.get_current())
         return
     config = {"baseurl": baseurl, "username": username, "password": password}
     needed = []
     if context:
-        symlink = f"{config_filename}-{context}"
-        found = os.path.isfile(symlink)
-        if os.path.islink(config_filename):
-            os.remove(config_filename)
-        elif os.path.isfile(config_filename):
-            os.rename(config_filename, symlink)
-        os.symlink(symlink, config_filename)
-        config_filename = symlink
+        found = conf.set_context(context)
         if found:
             return
     if list_config:
-        config_data = get_config(config_filename)
-        config_data.pop("password", None)
+        config_data = conf.get_config_data()
         output(config_data)
         return
     if not baseurl:
@@ -206,8 +148,7 @@ def run_config(
             output(error("Can not configure without password", "No password"))
             sys.exit(1)
 
-    with open(config_filename, "w") as fh:
-        fh.write(yaml.dump(config))
+    conf.set_config(config)
 
 
 def run_delete(url: str, jsonout: bool, headers: dict):
@@ -280,7 +221,7 @@ def run_openstack_sync(cloud: str, name: str, zone: str, headers: dict,
                 elif address.version == 6:
                     rtype = "AAAA"
                     curripv6 = True
-                if rtype and recor.type == rtype:
+                if rtype and record.type == rtype:
                     if record.data == address.addr:
                         continue
                     else:
@@ -361,91 +302,6 @@ def run_zone(url: str,
         output(string, jsonout)
 
 
-# Set up the url
-def setup_url(
-    baseurl: str,
-    arguments: Union[None, list[str]],
-    data: Union[None, str],
-    name: Union[None, str],
-    rtype: Union[None, str],
-    ttl: Union[None, str],
-    zone: Union[None, str],
-) -> str:
-    url = baseurl + "/zones"
-    if zone:
-        if not zone.endswith("."):
-            zone += "."
-        url += "/{}".format(zone)
-    if name and zone:
-        if name.endswith(zone.rstrip(".")):
-            name += "."
-        url += "/records/{}".format(name)
-    if zone and name and rtype:
-        url += "/{}".format(rtype)
-    if data and zone and name and rtype:
-        url += "/{}".format(data)
-    if ttl and data and zone and name and rtype:
-        url += "/{}".format(ttl)
-    if data and zone and name and rtype and arguments:
-        url += "?"
-        for arg in arguments:
-            if not url.endswith("?"):
-                url += "&"
-            key, value = arg.split("=")
-            url += key + "=" + urllib.parse.quote_plus(value)
-
-    if ttl and (not rtype or not name or not zone):
-        output(
-            error(
-                "ttl only makes sense with rtype, name and zone",
-                "Missing parameter",
-            ))
-        sys.exit(1)
-    if rtype and (not name or not zone):
-        output(
-            error(
-                "rtype only makes sense with name and zone",
-                "Missing parameter",
-            ))
-        sys.exit(1)
-    if name and not zone:
-        output(error("name only makes sense with a zone", "Missing parameter"))
-        sys.exit(1)
-    return url
-
-
-def split_url(url: str) -> dict:
-    parsed = urlparse(url, allow_fragments=False)
-    path = parsed.path
-    query = parsed.query
-    arguments: Union[None, list[str]] = query.split("&")
-    path_arr = path.split("/")
-    data: Union[None, str] = None
-    name: Union[None, str] = None
-    rtype: Union[None, str] = None
-    ttl: Union[None, str] = None
-    zone: Union[None, str] = None
-    if len(path_arr) > 2:
-        zone = path_arr[2]
-    if len(path_arr) > 4:
-        name = path_arr[4]
-    if len(path_arr) > 5:
-        rtype = path_arr[5]
-    if len(path_arr) > 6:
-        data = path_arr[6]
-    if len(path_arr) > 7:
-        ttl = path_arr[7]
-
-    return {
-        "arguments": arguments,
-        "data": data,
-        "name": name,
-        "rtype": rtype,
-        "ttl": ttl,
-        "zone": zone,
-    }
-
-
 def get_parser() -> dict:
     description = """Manage DNS records with knot dns rest api:
         * https://gitlab.nic.cz/knot/knot-dns-rest"""
@@ -665,16 +521,10 @@ def main() -> int:
         run_complete(args.shell)
         return 0
 
-    # Make sure we have config
-    config_basepath = join(environ["HOME"], ".knot")
-    config_filename = join(config_basepath, "config")
-
-    if not isdir(config_basepath):
-        mkdir(config_basepath)
+    knotctl = Knotctl()
 
     if args.command == "config":
         run_config(
-            config_filename,
             args.context,
             args.baseurl,
             args.list_config,
@@ -684,7 +534,7 @@ def main() -> int:
         )
         return 0
 
-    config = get_config(config_filename)
+    config = knotctl.get_config()
     baseurl = config["baseurl"]
     token = get_token(config)
     if token == "":
diff --git a/knotctl/config/__init__.py b/knotctl/config/__init__.py
new file mode 100755
index 0000000..7c2faff
--- /dev/null
+++ b/knotctl/config/__init__.py
@@ -0,0 +1,61 @@
+#!/usr/bin/env python3
+import os
+from os import mkdir
+from os.path import isdir, isfile, join
+from typing import Union
+
+import yaml
+
+
+class Config:
+
+    def __init__(self):
+        # Make sure we have config
+        self.config_basepath = join(os.environ["HOME"], ".knot")
+        self.config_filename = join(self.config_basepath, "config")
+        if not isdir(self.config_basepath):
+            mkdir(self.config_basepath)
+
+    def get_config(self) -> Union[None, dict]:
+        if not isfile(self.config_filename):
+            return None
+        with open(self.config_filename, "r") as fh:
+            return yaml.safe_load(fh.read())
+
+    def get_config_data(self) -> dict:
+        config_data = self.get_config()
+        config_data.pop("password", None)
+        return config_data
+
+    def get_current(self) -> str:
+        if os.path.islink(self.config_filename):
+            actual_path = os.readlink(self.config_filename)
+            return actual_path.split("-")[-1]
+        else:
+            return "none"
+
+    def set_context(self, context) -> bool:
+        symlink = f"{self.config_filename}-{context}"
+        found = os.path.isfile(symlink)
+        if os.path.islink(self.config_filename):
+            os.remove(self.config_filename)
+        elif os.path.isfile(self.config_filename):
+            os.rename(self.config_filename, symlink)
+        os.symlink(symlink, self.config_filename)
+        self.config_filename = symlink
+        return found
+
+    def set_config(
+        self,
+        baseurl: str,
+        username: str,
+        password: str,
+    ):
+        config = {
+            "baseurl": baseurl,
+            "username": username,
+            "password": password
+        }
+
+        with open(self.config_filename, "w") as fh:
+            fh.write(yaml.dump(config))
diff --git a/knotctl/openstack/__init__.py b/knotctl/openstack/__init__.py
new file mode 100644
index 0000000..537dde3
--- /dev/null
+++ b/knotctl/openstack/__init__.py
@@ -0,0 +1,17 @@
+import openstack
+import openstack.config.loader
+
+
+def get_openstack_addresses(cloud: str, name: str):
+    conn = openstack.connect(cloud=cloud)
+
+    # List the servers
+    server = conn.compute.find_server(name)
+    if server is None:
+        print("Server not found")
+        exit(1)
+    openstack_addresses = []
+    for network in server.addresses:
+        for address in server.addresses[network]:
+            openstack_addresses.append(address)
+    return openstack_addresses
diff --git a/knotctl/utils/__init__.py b/knotctl/utils/__init__.py
new file mode 100644
index 0000000..80e53f6
--- /dev/null
+++ b/knotctl/utils/__init__.py
@@ -0,0 +1,123 @@
+import json
+import sys
+import urllib.parse
+from typing import Union
+from urllib.parse import urlparse
+
+
+def error(description: str, error: str):
+    response = []
+    reply = {}
+    # https://developer.mozilla.org/en-US/docs/Web/HTTP/Status/406
+    reply["Code"] = 406
+    reply["Description"] = description
+    reply["Error"] = error
+    response.append(reply)
+    return response
+
+
+def nested_out(input, tabs="") -> str:
+    string = ""
+    if isinstance(input, str) or isinstance(input, int):
+        string += f"{input}\n"
+    elif isinstance(input, dict):
+        for key, value in input.items():
+            string += f"{tabs}{key}: {nested_out(value, tabs + " ")}"
+    elif isinstance(input, list):
+        for entry in input:
+            string += f"{tabs}\n{nested_out(entry, tabs + ' ')}"
+    return string
+
+
+def output(response: list[dict], jsonout: bool = False):
+    try:
+        if jsonout:
+            print(json.dumps(response))
+        else:
+            print(nested_out(response))
+    except BrokenPipeError:
+        pass
+
+
+def setup_url(
+    baseurl: str,
+    arguments: Union[None, list[str]],
+    data: Union[None, str],
+    name: Union[None, str],
+    rtype: Union[None, str],
+    ttl: Union[None, str],
+    zone: Union[None, str],
+) -> str:
+    url = baseurl + "/zones"
+    if zone:
+        if not zone.endswith("."):
+            zone += "."
+        url += "/{}".format(zone)
+    if name and zone:
+        if name.endswith(zone.rstrip(".")):
+            name += "."
+        url += "/records/{}".format(name)
+    if zone and name and rtype:
+        url += "/{}".format(rtype)
+    if data and zone and name and rtype:
+        url += "/{}".format(data)
+    if ttl and data and zone and name and rtype:
+        url += "/{}".format(ttl)
+    if data and zone and name and rtype and arguments:
+        url += "?"
+        for arg in arguments:
+            if not url.endswith("?"):
+                url += "&"
+            key, value = arg.split("=")
+            url += key + "=" + urllib.parse.quote_plus(value)
+
+    if ttl and (not rtype or not name or not zone):
+        output(
+            error(
+                "ttl only makes sense with rtype, name and zone",
+                "Missing parameter",
+            ))
+        sys.exit(1)
+    if rtype and (not name or not zone):
+        output(
+            error(
+                "rtype only makes sense with name and zone",
+                "Missing parameter",
+            ))
+        sys.exit(1)
+    if name and not zone:
+        output(error("name only makes sense with a zone", "Missing parameter"))
+        sys.exit(1)
+    return url
+
+
+def split_url(url: str) -> dict:
+    parsed = urlparse(url, allow_fragments=False)
+    path = parsed.path
+    query = parsed.query
+    arguments: Union[None, list[str]] = query.split("&")
+    path_arr = path.split("/")
+    data: Union[None, str] = None
+    name: Union[None, str] = None
+    rtype: Union[None, str] = None
+    ttl: Union[None, str] = None
+    zone: Union[None, str] = None
+    if len(path_arr) > 2:
+        zone = path_arr[2]
+    if len(path_arr) > 4:
+        name = path_arr[4]
+    if len(path_arr) > 5:
+        rtype = path_arr[5]
+    if len(path_arr) > 6:
+        data = path_arr[6]
+    if len(path_arr) > 7:
+        ttl = path_arr[7]
+
+    return {
+        "arguments": arguments,
+        "data": data,
+        "name": name,
+        "rtype": rtype,
+        "ttl": ttl,
+        "zone": zone,
+    }